RabbitMQ介绍与PHP应⽤,及碰到问题解决
⼀. RabbitMQ 简介
MQ全称为Message Queue, 消息队列(MQ)是⼀种应⽤程序对应⽤程序的通信⽅法。应⽤程序通过读写出⼊队列的消息(针对应⽤程序的数据)来通信,⽽⽆需专⽤连接来链接它们。
消息传递指的是程序之间通过在消息中发送数据进⾏通信,⽽不是通过直接调⽤彼此来通信,直接调⽤通常是⽤于诸如远程过程调⽤的技术。
排队指的是应⽤程序通过队列来通信。队列的使⽤除去了接收和发送应⽤程序同时执⾏的要求。
RabbitMQ是使⽤Erlang语⾔开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是⾯向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多⽤在企业系统内,对数据⼀致性、稳定性和可靠性要求很⾼的场景,对性能和吞吐量的要求还在其次。
⼆. RabbitMQ 使⽤场景
1. 解耦(为⾯向服务的架构(SOA)提供基本的最终⼀致性实现)
场景说明:⽤户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调⽤库存系统的接⼝。
传统模式的缺点:
假如库存系统⽆法访问,则订单减库存将失败,从⽽导致订单失败
订单系统与库存系统耦合
引⼊消息队列
订单系统:⽤户下单后,订单系统完成持久化处理,将消息写⼊消息队列,返回⽤户订单下单成功
库存系统:订阅下单的消息,采⽤拉/推的⽅式,获取下单信息,库存系统根据下单信息,进⾏库存操作
假如:在下单时库存系统不能正常使⽤。也不影响正常下单,因为下单后,订单系统写⼊消息队列就不再关⼼其他的后续操作了。实现订单系统与库存系统的应⽤解耦
为了保证库存肯定有,可以将队列⼤⼩设置成库存数量,或者采⽤其他⽅式解决。
基于消息的模型,关⼼的是“通知”,⽽⾮“处理”。
短信、邮件通知、缓存刷新等操作使⽤消息队列进⾏通知。
消息队列和RPC的区别与⽐较:
RPC: 异步调⽤,及时获得调⽤结果,具有强⼀致性结果,关⼼业务调⽤处理结果。
消息队列:两次异步RPC调⽤,将调⽤内容在队列中进⾏转储,并选择合适的时机进⾏投递(错峰流控)
2. 异步提升效率
场景说明:⽤户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串⾏的⽅式;2.并⾏⽅式
(1)串⾏⽅式:将注册信息写⼊数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端
(2)并⾏⽅式:将注册信息写⼊数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串⾏的差别是,并⾏的⽅式可以提⾼处理的时间
3)引⼊消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:
3. 流量削峰
流量削锋也是消息队列中的常⽤场景,⼀般在秒杀或团抢活动中使⽤⼴泛
应⽤场景:系统其他时间A系统每秒请求量就100个,系统可以稳定运⾏。系统每天晚间⼋点有秒杀活动,每秒并发请求量增⾄1万条,但是系统最⼤的处理能⼒只能每秒处理1000个请求,于是系统崩溃,服务器宕机。
之前架构:⼤量⽤户(100万⽤户)通过浏览器在晚上⼋点⾼峰期同时参与秒杀活动。⼤量的请求涌⼊我们的系统中,⾼峰期达到每秒钟5000个请求,⼤量的请求打到MySQL 上,每秒钟预计执⾏3000条SQL。
但是⼀般的MySQL每秒钟扛住2000个请求就不错了,如果达到3000个请求的话可能MySQL直接就瘫痪了,从⽽系统⽆法被使⽤。但是⾼峰期过了之后,就成了低峰期,可能也就1万⽤户访问系统,每秒的请求数量也就50个左右,整个系统⼏乎没有任何压⼒。
引⼊MQ:100万⽤户在⾼峰期的时候,每秒请求有5000个请求左右,将这5000请求写⼊MQ⾥⾯,系统A每秒最多只能处理2000请求,因为MySQL每秒只能处理2000个请求。
系统A从MQ中慢慢拉取请求,每秒就拉取2000个请求,不要超过⾃⼰每秒能处理的请求数量即可。MQ,每秒5000个请求进来,结果只有2000个请求出去,所以在秒杀期间(将近⼀⼩时)可能会有⼏⼗万或者⼏百万的请求积压在MQ中。
这个短暂的⾼峰期积压是没问题的,因为⾼峰期过了之后,每秒就只有50个请求进⼊MQ了,但是系统还是按照每秒2000个请求的速度在处理,所以说,只要⾼峰期⼀过,系统就会快速将积压的消息消费掉。
我们在此计算⼀下,每秒在MQ积压3000条消息,1分钟会积压18万,1⼩时积压1000万条消息,⾼峰期过后,1个多⼩时就可以将积压的1000万消息消费掉。
三. 引⼊消息队列的优缺点
优点
优点就是以上的那些场景应⽤,就是在特殊场景下有其对应的好处,解耦、异步、削峰。
缺点
系统的可⽤性降低
系统引⼊的外部依赖越多,系统越容易挂掉,本来只是A系统调⽤BCD三个系统接⼝就好,ABCD四个系统不报错整个系统会正常运⾏。引⼊了MQ之后,虽然
ABCD系统没出错,但MQ挂了以后,整个系统也会崩溃。
系统的复杂性提⾼
引⼊了MQ之后,需要考虑的问题也变得多了,如何保证消息没有重复消费?如何保证消息不丢失?怎么保证消息传递的顺序?
⼀致性问题
A系统发送完消息直接返回成功,但是BCD系统之中若有系统写库失败,则会产⽣数据不⼀致的问题。
PHP处理RabbitMQ消息队列的应⽤
安装php-amqplib
php-amqplib是⼀个纯PHP库,使⽤它,基于PHP的脚本客户端就可以轻松的连接和操作RabbitMQ。我们使⽤composer来安装。
composer require php-amqplib/php-amqplib
⽰例说明
⽣产者(Producer)和消费者(Consumer)是消息队列的基本概念,⽣产者是指⽣产消息的⼀⽅,也是消息发送⽅,消费者就是消费消息的⼀⽅,也是消息接收⽅,队列就是存储消息的⼀个缓存区。
本实例将由⽣产者发送很多消息给消息队列,由多个消费者来消费队列中的消息。我们可以想象这样的场景:⽪鞋⽣产打包打包车间,不断有成品鞋进⼊传送带(消息队列)等
待操作⼯⼈(消费者)将⽪鞋打包。
因为等待打包的鞋⼦特别多,我们需要安排多个打包⼯⼈在传送带两边,及时从传送带取出成品鞋,然后装箱打包。我们要求是要确保⼯⼈最后打包好的⽪鞋数量⼀双不少,不能因为打包⼯⼈操作慢或者个⼈原因暂时离开⽣产线,导致最终打包数不⼀致。
消息发送
⽣产者将消息发送给队列,⾄于谁来消费(处理)这些消息,⽣产者不管。
消息队列(MQ),⽤来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。⼀个消息可投⼊⼀个或多个队列。消息⼀直在队列⾥⾯,等待消费者连接到这个队列将其取⾛。
消息到达队列中后,如果没有⼀个消费者来处理消息的话,我们希望队列中的消息不要丢弃,也就是消息持久化。在⽣产者和消费者中都要将queue_declare第3个参数设置为true,表⽰让消息队列持久化。
$channel->queue_declare($queue, false, true, false, false);
此外,我们可以确保即使RabbitMQ重启了,消息队列不会丢失,在⽣产者端设置:'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT。
现在我们建⽴⽣产者⽂件sender.php,我们假设服务端已经安装好RabbitMQ,并且开放好对应端⼝。
<?php
/**
* @Author: Helloweba
* @sender.php
* @消息⽣产者-分发任务
*/
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$queue = 'worker';
//$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$connection = new AMQPStreamConnection(
'192.168.0.100',
56720,
'helloweba',  //user
'helloweba',  //password
'test'  //vhost
);
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false); //第3个参数设置为true,表⽰让消息队列持久化
for ($i = 0; $i < 100; $i++) {
$arr = [
'id' => 'message_' . $i,
'order_id' => str_replace('.', '' , microtime(true)) . mt_rand(10, 99) . $i,
'content' => 'helloweba-' . time()
];
$data = json_encode($arr);
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); ////设置rabbitmq重启后也不会丢失队列,或者设置为'delivery_mode' => 2
$channel->basic_publish($msg, '', $queue);
echo 'Send message: ' . $data . PHP_EOL;
}
$channel->close();
$connection->close();
上述代码中,我们模拟了⽣产者向队列中发送了100条订单消息。
消息接收
消费者是指完成消息的接收和处理的客户端程序,消费者就如同⽣产线上的操作⼯⼈,他们按照操作规程从传送带上取出产品后有序的完成后续⼯作任务。
实际项⽬中,如果消费者处理消息能⼒不够时,就要开启多个消费者来消费队列中的消息。默认情况下,RabbitMQ将会把队列中的消息平均分配给每个消费者。如果消费者要对分配到的消息任务处理时间很长(耗时任务),那么处理消息任务的时候就有可能会遇到意外。
⽐如某个消费者断电了,或者出故障了,那它正在处理的消息会怎么办?这⾥就是RabbitMQ的消息确认机制,为了保证数据不丢失,RabbitMQ会将未处理完的消息分配给下⼀个消费者处理。
此外RabbitMQ还可以设置公平分配消息任务,不会给某个消费者同时分配多个消息处理任务,因为消费者⽆法同时处理多个消息任务。
换句话说,RabbitMQ在处理和确认消息之前,不会向消费者发送新的消息,⽽是将消息分发给下⼀个不忙的消费者。
$channel->basic_qos(null, 1, null); //处理和确认完消息后再消费新的消息
我们现在建⽴消费者⽂件receiver.php,代码如下:
<?php
/**
* @Author: Helloweba
* @receiver.php
* @消息消费者-接收端
*/
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$queue = 'worker';
//$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$connection = new AMQPStreamConnection('192.168.0.100', 56720, 'helloweba', 'helloweba', 'test');
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C' . PHP_EOL;
$callback = function($msg){
echo " Received message:", $msg->body, PHP_EOL;
sleep(1);  //模拟耗时执⾏
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null); //处理和确认完消息后再消费新的消息
$channel->basic_consume($queue, '', false, false, false, false, $callback); //第4个参数值为false表⽰启⽤消息确认
while(count($channel->callbacks)) {
$channel->wait();
php简单秒杀实例代码}
$channel->close();
$connection->close();
模拟测试
现在我们运⾏多个消费者终端,可以打开多个ssh客户端,client1和client2运⾏:php receive.php
然后再开个终端,运⾏⽣产者:php sender.php
由于消费者是阻塞运⾏的,他们会⼀直等待队列中的消息,当有消息就会去取出来处理。我们可以模拟将其中某个客户端中断,即断开某个消费者。然后再看消息是不是被其他消费者接收处理了。同样我们可以模拟将客户端全部重启,看看队列中的消息是否没有丢失。
当client1中断连接RabbitMQ后,再次运⾏连接RabbitMQ,在client2中看到的消息处理情况,注意看图中的消息id。
client1:
client2:
转载:
[========]
实现过程中出现了如下问题:
PHP Fatal error: Uncaught PhpAmqpLib\Exception\AMQPConnectionClosedException: Broken pipe or closed connection in /data/bogiang/srm/public/test/php/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php:171
原因:rabbitmq客户端断开了,是没有进⾏⼼跳检查
解决办法:加⼼跳检查~~~
参考:

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。