PHP+RabbitMQ实现消息队列
在我们实际的开发中,经常会遇到一些高并发、大流量和需要异步实现业务的一些场景,所以我们大多会想到使用消息队列来提高系统的承载、安全性。在很多消息中间件的工具中,我们公司一直在使用RabbitMQ,比如Laravel、TP框架中使用 php-amqplib
目前公司用到的一些场景如:送礼时生成的订单,用户充值、提现时产生的订单
RabbitMQ是一个功能强大的消息队列工具,被广泛用于构建可扩展、可靠性的应用程序。
今天说下简单的PHP+RabbitMQ实现最基础的消息队列(以面向过程为例)。
准备在下一次的文章中写下 RabbitMQ的
Ack(confirm机制)
如何保证消息投递成功
消息的幂等性及幂等性实现方案
return 机制
消息限流
限流及手动Ack实现
重回队列
死信队列
废话不多说,用代码和图文来展示吧
前提我们要在自己的开发环境中安装好mq和启动好mq。
第一步安装、启动可以参考这篇文章
第二步:我们创建一个生产者
// 连接rabbitmq
$config = [
'host' => '127.0.0.1',
'vhost' => '/',
'port' => 5672,
'login' => 'admin',
'password' => '123456'
];
// 连接 Broker 创建 rabbitmq 连接
$conn = new AMQPConnection($config);
$conn->connect();
$ch = new AMQPChannel($conn); // 创建通道
$ex = new AMQPExchange($ch); // 创建交换机
$routingKey = "key_1"; // 声明路由键
$exchangeName = "exchange_1"; // 声明交换机名称
$ex->setName($exchangeName); // 设置交换机名称
$ex->setType(AMQP_EX_TYPE_DIRECT); // 设置交换机类型
$ex->setFlags(AMQP_DURABLE); // 设置交换机持久类型:1-不持久到磁盘(服务器宕机重启后消息丢失) 2-持久化到磁盘
// 声明交换机
$ex->declareExchange();
try {
// 使用 for 循环生产10条消息
for ($i = 1; $i <= 10; $i++) {
$msg = [
"data" => "生产消息_" .$i
];
//[ 'delivery_mode' => 2, 'expiration' => 10000 ] delivery_mode=2 消息持久,expiration(毫秒)消息有效期(通过死信队列实现消息延迟)
$ex->publish(json_encode($msg, 320), $routingKey, 1, ['delivery_mode' => 2]);
}
} catch (Exception $e) {
exit("error:" .$e->getMessage() . '-' . $e->getFile() . '-' . $e->getLine());
}
第三步:我们再创建一个消费者
$config = [
'host' => 'localhost',
'vhost' => '/',
'port' => '5672',
'login' => 'admin',
'password' => '123456'
];
// 连接 Broker 创建 rabbitmq 连接
$conn = new AMQPConnection($config);
try {
$conn->connect();
} catch (\Exception $e) {
exit($e->getMessage());
}
try {
$ch = new AMQPChannel($conn); // 创建通道
$ex = new AMQPExchange($ch); // 创建交换机
$routingKey = "key_1"; // 声明路由键
$exchangeName = "exchange_1"; // 声明交换机名称
$ex->setName($exchangeName); // 设置交换机名称
$ex->setType(AMQP_EX_TYPE_DIRECT); // 设置交换机类型
// 设置交换机持久类型:1-不持久到磁盘(服务器宕机重启后消息丢失) 2-持久化到磁盘
$ex->setFlags(AMQP_DURABLE);
$ex->declareExchange(); // 声明交换机
// 创建一个消息队列
$q = new AMQPQueue($ch);
$q->setName('queue_1'); // 设置队列名称
$q->setFlags(AMQP_DURABLE); // 设置队列持久性(保证消息不丢失)
$q->declareQueue(); // 声明消息队列
$q->bind($ex->getName(), $routingKey); // 绑定交换机和路由键
// AMQP_AUTOACK 自动响应
// Queues 中的 Unacked是未响应消息。rabbitmq里积累unacked一定数目之后是不能被消费的。正确的做法是加上自动响应或者在回调里面加上手动响应ack.
$q->consume("receive", AMQP_AUTOACK);
} catch (AMQPConnectionException $e) {
exit("AMQPChannel:" .$e->getMessage() . '-' . $e->getFile() . '-' . $e->getLine());
} catch (AMQPExchangeException $e) {
exit("AMQPExchange:" .$e->getMessage() . '-' . $e->getFile() . '-' . $e->getLine());
}
// 封装一个接受消息并输出消息的方法
function receive($envelope, $queue) {
echo $envelope->getBody().PHP_EOL;
};
第四步:打开我们的虚拟机,执行 php producer.php
此时能看到mq的控制台中有了交换机:exchange_1
再执行 php consumer.php
控制台中查看 Connections 已经有了admin这个用户的连接
Channel 通道了也有刚刚consumer 创建的通道,及控制台里的各选项,铁子们可以一边测试一边看看具体的内容。
最后我们看结果
控制台中已经有消息输出了,当然这只是我们在模拟的测试。真正的业务场景可能比案例更复杂,需要各种数据的合法性校验,然后才去消费,然后还要保证消息的投递成功、消费成功、不重复消费等。
发表评论