PHP+RabbitMQ实现消息队列

admin 2023-10-14 346 阅读 0评论

在我们实际的开发中,经常会遇到一些高并发、大流量和需要异步实现业务的一些场景,所以我们大多会想到使用消息队列来提高系统的承载、安全性。在很多消息中间件的工具中,我们公司一直在使用RabbitMQ,比如Laravel、TP框架中使用 php-amqplib

目前公司用到的一些场景如:送礼时生成的订单,用户充值、提现时产生的订单

RabbitMQ是一个功能强大的消息队列工具,被广泛用于构建可扩展、可靠性的应用程序。

今天说下简单的PHP+RabbitMQ实现最基础的消息队列(以面向过程为例)。

准备在下一次的文章中写下 RabbitMQ的

  • Ack(confirm机制)

  • 如何保证消息投递成功

  • 消息的幂等性及幂等性实现方案

  • return 机制

  • 消息限流

  • 限流及手动Ack实现

  • 重回队列

  • 死信队列


废话不多说,用代码和图文来展示吧

前提我们要在自己的开发环境中安装好mq和启动好mq。


第一步安装、启动可以参考这篇文章

Centos8 安装rabbitmq

第二步:们创建一个生产

// 连接rabbitmq$config = [    'host' => '127.0.0.1',    'vhost' => '/',    'port' => 5672,    'login' => 'admin',    'password' => '123456'];
// 连接 Broker 创建 rabbitmq 连接$conn = new AMQPConnection($config);$conn->connect();
$ch = new AMQPChannel($conn); // 创建通道$ex = new AMQPExchange($ch); // 创建交换机$routingKey = "key_1";          // 声明路由键$exchangeName = "exchange_1"; // 声明交换机名称$ex->setName($exchangeName); // 设置交换机名称$ex->setType(AMQP_EX_TYPE_DIRECT); // 设置交换机类型$ex->setFlags(AMQP_DURABLE); // 设置交换机持久类型:1-不持久到磁盘(服务器宕机重启后消息丢失)  2-持久化到磁盘// 声明交换机$ex->declareExchange();
try {    // 使用 for 循环生产10条消息    for ($i = 1; $i <= 10; $i++) {        $msg = [            "data" => "生产消息_" .$i        ];        //[ 'delivery_mode' => 2, 'expiration' => 10000 ] delivery_mode=2 消息持久,expiration(毫秒)消息有效期(通过死信队列实现消息延迟)        $ex->publish(json_encode($msg, 320), $routingKey, 1, ['delivery_mode' => 2]);    }} catch (Exception $e) {    exit("error:" .$e->getMessage() . '-' . $e->getFile() . '-' . $e->getLine());}

第三步:我们再创建一个消费者

$config = [    'host' => 'localhost',    'vhost' => '/',    'port' => '5672',    'login' => 'admin',    'password' => '123456'];
// 连接 Broker 创建 rabbitmq 连接$conn = new AMQPConnection($config);try {    $conn->connect();} catch (\Exception $e) {    exit($e->getMessage());}

try {    $ch = new AMQPChannel($conn); // 创建通道    $ex = new AMQPExchange($ch); // 创建交换机    $routingKey = "key_1"// 声明路由键    $exchangeName = "exchange_1"// 声明交换机名称        $ex->setName($exchangeName); // 设置交换机名称        $ex->setType(AMQP_EX_TYPE_DIRECT); // 设置交换机类型    // 设置交换机持久类型:1-不持久到磁盘(服务器宕机重启后消息丢失)  2-持久化到磁盘    $ex->setFlags(AMQP_DURABLE);      $ex->declareExchange(); // 声明交换机
   // 创建一个消息队列    $q = new AMQPQueue($ch);    $q->setName('queue_1'); // 设置队列名称    $q->setFlags(AMQP_DURABLE); // 设置队列持久性(保证消息不丢失)    $q->declareQueue(); // 声明消息队列    $q->bind($ex->getName(), $routingKey); // 绑定交换机和路由键
   // AMQP_AUTOACK 自动响应    // Queues 中的 Unacked是未响应消息。rabbitmq里积累unacked一定数目之后是不能被消费的。正确的做法是加上自动响应或者在回调里面加上手动响应ack.    $q->consume("receive", AMQP_AUTOACK);} catch (AMQPConnectionException $e) {    exit("AMQPChannel:" .$e->getMessage() . '-' . $e->getFile() . '-' . $e->getLine());} catch (AMQPExchangeException $e) {    exit("AMQPExchange:" .$e->getMessage() . '-' . $e->getFile() . '-' . $e->getLine());}
// 封装一个接受消息并输出消息的方法function receive($envelope, $queue) {    echo $envelope->getBody().PHP_EOL;};


第四步:打开我们的虚拟机,执行 php producer.php

图片

此时能看到mq的控制台中有了交换机:exchange_1

再执行 php consumer.php

控制台中查看  Connections 已经有了admin这个用户的连接

图片

Channel 通道了也有刚刚consumer 创建的通道,及控制台里的各选项,铁子们可以一边测试一边看看具体的内容。

最后我们看结果

图片

控制台中已经有消息输出了,当然这只是我们在模拟的测试。真正的业务场景可能比案例更复杂,需要各种数据的合法性校验,然后才去消费,然后还要保证消息的投递成功、消费成功、不重复消费等。

发表评论

快捷回复: 表情:
aoman baiyan bishi bizui cahan ciya dabing daku deyi doge fadai fanu fendou ganga guzhang haixiu hanxiao zuohengheng zhuakuang zhouma zhemo zhayanjian zaijian yun youhengheng yiwen yinxian xu xieyanxiao xiaoku xiaojiujie xia wunai wozuimei weixiao weiqu tuosai tu touxiao tiaopi shui se saorao qiudale qinqin qiaoda piezui penxue nanguo liulei liuhan lenghan leiben kun kuaikule ku koubi kelian keai jingya jingxi jingkong jie huaixiao haqian aini OK qiang quantou shengli woshou gouyin baoquan aixin bangbangtang xiaoyanger xigua hexie pijiu lanqiu juhua hecai haobang caidao baojin chi dan kulou shuai shouqiang yangtuo youling
提交
评论列表 (有 0 条评论, 346人围观)

最近发表

热门文章

最新留言

热门推荐

标签列表