PHP+RabbitMQ实现消息队列

admin 2023-10-14 247 阅读 0评论

在我们实际的开发中,经常会遇到一些高并发、大流量和需要异步实现业务的一些场景,所以我们大多会想到使用消息队列来提高系统的承载、安全性。在很多消息中间件的工具中,我们公司一直在使用RabbitMQ,比如Laravel、TP框架中使用 php-amqplib

目前公司用到的一些场景如:送礼时生成的订单,用户充值、提现时产生的订单

RabbitMQ是一个功能强大的消息队列工具,被广泛用于构建可扩展、可靠性的应用程序。

今天说下简单的PHP+RabbitMQ实现最基础的消息队列(以面向过程为例)。

准备在下一次的文章中写下 RabbitMQ的

  • Ack(confirm机制)

  • 如何保证消息投递成功

  • 消息的幂等性及幂等性实现方案

  • return 机制

  • 消息限流

  • 限流及手动Ack实现

  • 重回队列

  • 死信队列


废话不多说,用代码和图文来展示吧

前提我们要在自己的开发环境中安装好mq和启动好mq。


第一步安装、启动可以参考这篇文章

Centos8 安装rabbitmq

第二步:们创建一个生产

// 连接rabbitmq$config = [    'host' => '127.0.0.1',    'vhost' => '/',    'port' => 5672,    'login' => 'admin',    'password' => '123456'];
// 连接 Broker 创建 rabbitmq 连接$conn = new AMQPConnection($config);$conn->connect();
$ch = new AMQPChannel($conn); // 创建通道$ex = new AMQPExchange($ch); // 创建交换机$routingKey = "key_1";          // 声明路由键$exchangeName = "exchange_1"; // 声明交换机名称$ex->setName($exchangeName); // 设置交换机名称$ex->setType(AMQP_EX_TYPE_DIRECT); // 设置交换机类型$ex->setFlags(AMQP_DURABLE); // 设置交换机持久类型:1-不持久到磁盘(服务器宕机重启后消息丢失)  2-持久化到磁盘// 声明交换机$ex->declareExchange();
try {    // 使用 for 循环生产10条消息    for ($i = 1; $i <= 10; $i++) {        $msg = [            "data" => "生产消息_" .$i        ];        //[ 'delivery_mode' => 2, 'expiration' => 10000 ] delivery_mode=2 消息持久,expiration(毫秒)消息有效期(通过死信队列实现消息延迟)        $ex->publish(json_encode($msg, 320), $routingKey, 1, ['delivery_mode' => 2]);    }} catch (Exception $e) {    exit("error:" .$e->getMessage() . '-' . $e->getFile() . '-' . $e->getLine());}

第三步:我们再创建一个消费者

$config = [    'host' => 'localhost',    'vhost' => '/',    'port' => '5672',    'login' => 'admin',    'password' => '123456'];
// 连接 Broker 创建 rabbitmq 连接$conn = new AMQPConnection($config);try {    $conn->connect();} catch (\Exception $e) {    exit($e->getMessage());}

try {    $ch = new AMQPChannel($conn); // 创建通道    $ex = new AMQPExchange($ch); // 创建交换机    $routingKey = "key_1"// 声明路由键    $exchangeName = "exchange_1"// 声明交换机名称        $ex->setName($exchangeName); // 设置交换机名称        $ex->setType(AMQP_EX_TYPE_DIRECT); // 设置交换机类型    // 设置交换机持久类型:1-不持久到磁盘(服务器宕机重启后消息丢失)  2-持久化到磁盘    $ex->setFlags(AMQP_DURABLE);      $ex->declareExchange(); // 声明交换机
   // 创建一个消息队列    $q = new AMQPQueue($ch);    $q->setName('queue_1'); // 设置队列名称    $q->setFlags(AMQP_DURABLE); // 设置队列持久性(保证消息不丢失)    $q->declareQueue(); // 声明消息队列    $q->bind($ex->getName(), $routingKey); // 绑定交换机和路由键
   // AMQP_AUTOACK 自动响应    // Queues 中的 Unacked是未响应消息。rabbitmq里积累unacked一定数目之后是不能被消费的。正确的做法是加上自动响应或者在回调里面加上手动响应ack.    $q->consume("receive", AMQP_AUTOACK);} catch (AMQPConnectionException $e) {    exit("AMQPChannel:" .$e->getMessage() . '-' . $e->getFile() . '-' . $e->getLine());} catch (AMQPExchangeException $e) {    exit("AMQPExchange:" .$e->getMessage() . '-' . $e->getFile() . '-' . $e->getLine());}
// 封装一个接受消息并输出消息的方法function receive($envelope, $queue) {    echo $envelope->getBody().PHP_EOL;};


第四步:打开我们的虚拟机,执行 php producer.php

图片

此时能看到mq的控制台中有了交换机:exchange_1

再执行 php consumer.php

控制台中查看  Connections 已经有了admin这个用户的连接

图片

Channel 通道了也有刚刚consumer 创建的通道,及控制台里的各选项,铁子们可以一边测试一边看看具体的内容。

最后我们看结果

图片

控制台中已经有消息输出了,当然这只是我们在模拟的测试。真正的业务场景可能比案例更复杂,需要各种数据的合法性校验,然后才去消费,然后还要保证消息的投递成功、消费成功、不重复消费等。

发表评论

快捷回复: 表情:
Addoil Applause Badlaugh Bomb Coffee Fabulous Facepalm Feces Frown Heyha Insidious KeepFighting NoProb PigHead Shocked Sinistersmile Slap Social Sweat Tolaugh Watermelon Witty Wow Yeah Yellowdog
提交
评论列表 (有 0 条评论, 247人围观)

最近发表

热门文章

最新留言

热门推荐

标签列表