PHP+RabbitMQ实现消息队列
在我们实际的开发中,经常会遇到一些高并发、大流量和需要异步实现业务的一些场景,所以我们大多会想到使用消息队列来提高系统的承载、安全性。在很多消息中间件的工具中,我们公司一直在使用RabbitMQ,比如Laravel、TP框架中使用 php-amqplib
目前公司用到的一些场景如:送礼时生成的订单,用户充值、提现时产生的订单
RabbitMQ是一个功能强大的消息队列工具,被广泛用于构建可扩展、可靠性的应用程序。
今天说下简单的PHP+RabbitMQ实现最基础的消息队列(以面向过程为例)。
准备在下一次的文章中写下 RabbitMQ的
Ack(confirm机制)
如何保证消息投递成功
消息的幂等性及幂等性实现方案
return 机制
消息限流
限流及手动Ack实现
重回队列
死信队列
废话不多说,用代码和图文来展示吧
前提我们要在自己的开发环境中安装好mq和启动好mq。
第一步安装、启动可以参考这篇文章
第二步:我们创建一个生产者
// 连接rabbitmq$config = [ 'host' => '127.0.0.1', 'vhost' => '/', 'port' => 5672, 'login' => 'admin', 'password' => '123456'];// 连接 Broker 创建 rabbitmq 连接$conn = new AMQPConnection($config);$conn->connect();$ch = new AMQPChannel($conn); // 创建通道$ex = new AMQPExchange($ch); // 创建交换机$routingKey = "key_1"; // 声明路由键$exchangeName = "exchange_1"; // 声明交换机名称$ex->setName($exchangeName); // 设置交换机名称$ex->setType(AMQP_EX_TYPE_DIRECT); // 设置交换机类型$ex->setFlags(AMQP_DURABLE); // 设置交换机持久类型:1-不持久到磁盘(服务器宕机重启后消息丢失) 2-持久化到磁盘// 声明交换机$ex->declareExchange();try { // 使用 for 循环生产10条消息 for ($i = 1; $i <= 10; $i++) { $msg = [ "data" => "生产消息_" .$i ]; //[ 'delivery_mode' => 2, 'expiration' => 10000 ] delivery_mode=2 消息持久,expiration(毫秒)消息有效期(通过死信队列实现消息延迟) $ex->publish(json_encode($msg, 320), $routingKey, 1, ['delivery_mode' => 2]); }} catch (Exception $e) { exit("error:" .$e->getMessage() . '-' . $e->getFile() . '-' . $e->getLine());}
第三步:我们再创建一个消费者
$config = [ 'host' => 'localhost', 'vhost' => '/', 'port' => '5672', 'login' => 'admin', 'password' => '123456'];// 连接 Broker 创建 rabbitmq 连接$conn = new AMQPConnection($config);try { $conn->connect();} catch (\Exception $e) { exit($e->getMessage());}try { $ch = new AMQPChannel($conn); // 创建通道 $ex = new AMQPExchange($ch); // 创建交换机 $routingKey = "key_1"; // 声明路由键 $exchangeName = "exchange_1"; // 声明交换机名称 $ex->setName($exchangeName); // 设置交换机名称 $ex->setType(AMQP_EX_TYPE_DIRECT); // 设置交换机类型 // 设置交换机持久类型:1-不持久到磁盘(服务器宕机重启后消息丢失) 2-持久化到磁盘 $ex->setFlags(AMQP_DURABLE); $ex->declareExchange(); // 声明交换机 // 创建一个消息队列 $q = new AMQPQueue($ch); $q->setName('queue_1'); // 设置队列名称 $q->setFlags(AMQP_DURABLE); // 设置队列持久性(保证消息不丢失) $q->declareQueue(); // 声明消息队列 $q->bind($ex->getName(), $routingKey); // 绑定交换机和路由键 // AMQP_AUTOACK 自动响应 // Queues 中的 Unacked是未响应消息。rabbitmq里积累unacked一定数目之后是不能被消费的。正确的做法是加上自动响应或者在回调里面加上手动响应ack. $q->consume("receive", AMQP_AUTOACK);} catch (AMQPConnectionException $e) { exit("AMQPChannel:" .$e->getMessage() . '-' . $e->getFile() . '-' . $e->getLine());} catch (AMQPExchangeException $e) { exit("AMQPExchange:" .$e->getMessage() . '-' . $e->getFile() . '-' . $e->getLine());}// 封装一个接受消息并输出消息的方法function receive($envelope, $queue) { echo $envelope->getBody().PHP_EOL;};
第四步:打开我们的虚拟机,执行 php producer.php
此时能看到mq的控制台中有了交换机:exchange_1
再执行 php consumer.php
控制台中查看 Connections 已经有了admin这个用户的连接
Channel 通道了也有刚刚consumer 创建的通道,及控制台里的各选项,铁子们可以一边测试一边看看具体的内容。
最后我们看结果
控制台中已经有消息输出了,当然这只是我们在模拟的测试。真正的业务场景可能比案例更复杂,需要各种数据的合法性校验,然后才去消费,然后还要保证消息的投递成功、消费成功、不重复消费等。


发表评论