RabbitMQ中exchange、route、queue的关系
从AMQP协议可以看出,MessageQueue、Exchange和Binding构成了AMQP协议的核⼼,下⾯我们就围绕这三个主要组件从应⽤使⽤的⾓度全⾯的介绍如何利⽤Rabbit MQ构建消息队列以及使⽤过程中的注意事项。
1. 声明MessageQueue
在Rabbit MQ中,⽆论是⽣产者发送消息还是消费者接受消息,都⾸先需要声明⼀个MessageQueue。这就存在⼀个问题,是⽣产者声明还是消费者声明呢?要解决这个问题,⾸先需要明确:
a)消费者是⽆法订阅或者获取不存在的MessageQueue中信息。
b)消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。
在明⽩了上述两点以后,就容易理解如果是消费者去声明Queue,就有可能会出现在声明Queue之前,⽣产者已发送的消息被丢弃的隐患。如果应⽤能够通过消息重发的机制允许消息丢失,则使⽤此⽅案没有任何问题。但是如果不能接受该⽅案,这就需要⽆论是⽣产者还是消费者,在发送或者接受消息前,都需要去尝试建⽴消息队列。这⾥有⼀点需要明确,如果客户端尝试建⽴⼀个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建⽴成功的。
如果⼀个消费者在⼀个信道中正在监听某⼀个队列的消息,Rabbit MQ是不允许该消费者在同⼀个channel去声明其他队列的。Rabbit MQ中,可以通过queue.declare命令声明⼀个队列,可以设置该队列以下属性:
a) Exclusive:排他队列,如果⼀个队列被声明为排他队列,该队列仅对⾸次声明它的连接可见,并在连接断开时⾃动删除。这⾥需要注意三点:其⼀,排他队列是基于连接可见的,同⼀连接的不同信道是可以同时访问同⼀个连接创建的排他队列的。其⼆,“⾸次”,如果⼀个连接已经声明了⼀个排他队列,其他连接是不允许建⽴同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,⼀旦连接关闭或者客户端退出,该排他队列都会被⾃动删除的。这种队列适⽤于只限于⼀个客户端发送读取消息的应⽤场景。
b)  Auto-delete:⾃动删除,如果该队列没有任何订阅的消费者的话,该队列会被⾃动删除。这种队列适⽤于临时队列。
c)  Durable:持久化,这个会在后⾯作为专门⼀个章节讨论。
d)  其他选项,例如如果⽤户仅仅想查询某⼀个队列是否已存在,如果不存在,不想建⽴该队列,仍然可以调⽤queue.declare,只不过需要将参数passive设为true,传给queue.declare,如果该队列已存在,则会返回true;如果不存在,则会返回Error,但是不会创建新的队列。
2. ⽣产者发送消息
在AMQP模型中,Exchange是接受⽣产者消息并将消息路由到消息队列的关键组件。ExchangeType和Binding决定了消息的路由规则。所以⽣产者想要发送消息,⾸先必须要声明⼀个Exchange和该Exchange对应的Binding。可以通过 ExchangeDeclare和BindingDeclare完成。在Rabbit MQ中,声明⼀个Exchange需要三个参数:ExchangeName,Excha
ngeType和Durable。ExchangeName 是该Exchange的名字,该属性在创建Binding和⽣产者通过publish推送消息时需要指定。ExchangeType,指Exchange的类型,在RabbitMQ中,有三种类型的Exchange:direct ,fanout和topic,不同的Exchange会表现出不同路由⾏为。Durable是该Exchange的持久化属性,这个会在消息持久化章节讨论。声明⼀个Binding需要提供⼀个QueueName,ExchangeName和BindingKey。下⾯我们就分析⼀下不同的ExchangeType表现出的不同路由规则。
⽣产者在发送消息时,都需要指定⼀个RoutingKey和Exchange,Exchange在接到该RoutingKey以后,会判断该ExchangeType:
a) 如果是Direct类型,则会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进⾏⽐较,如果相等,则发送到该Binding对应的Queue中。
b)  如果是  Fanout  类型,则会将消息发送给所有与该  Exchange  定义过  Binding  的所有  Queues  中去,其实是⼀种⼴播⾏为。
c)如果是Topic类型,则会按照正则表达式,对RoutingKey与BindingKey进⾏匹配,如果匹配成功,则发送到对应的Queue中。
3. 消费者订阅消息
在RabbitMQ中消费者有2种⽅式获取队列中的消息:
a)  ⼀种是通过sume命令,订阅某⼀个队列中的消息,channel会⾃动在处理完上⼀条消息之后,接收下⼀条消息。(同⼀个channel消息处理是串⾏的)。除⾮关闭channel或者取消订阅,否则客户端将会⼀直接收队列的消息。
b)  另外⼀种⽅式是通过命令主动获取队列中的消息,但是绝对不可以通过循环调⽤来代替sume,这是因为 RabbitMQ在实际执⾏的时候,是⾸先consume某⼀个队列,然后检索第⼀条消息,然后再取消订阅。如果是⾼吞吐率的消费者,最好还是建议使⽤sume。
如果有多个消费者同时订阅同⼀个队列的话,RabbitMQ是采⽤循环的⽅式分发消息的,每⼀条消息只能被⼀个订阅者接收。例如,有队列Queue,其中ClientA和ClientB都Consume了该队列,MessageA到达队列后,被分派到ClientA,ClientA服务器收到响应,服务器删除MessageA;再有⼀条消息MessageB抵达队列,服务器根据“循环推送”原则,将消息会发给ClientB,然后收到ClientB的确认后,删除MessageB;等到再下⼀条消息时,服务器会再将消息发送给ClientA。
这⾥我们可以看出,消费者再接到消息以后,都需要给服务器发送⼀条确认命令,这个即可以在handleDelivery⾥显⽰的调⽤basic.ack 实现,也可以在Consume某个队列的时候,设置autoACK属性为true实现。这个ACK仅仅是通知服务器可以安全的删除该消息,⽽不是通知⽣产者,与RPC不同。如果消费者在接到消息以后还没来得及返回ACK就断开了连接,消息服务器会重传该消息给下⼀个订阅者,如果没有订阅者就会存储该消息。
既然RabbitMQ提供了ACK某⼀个消息的命令,当然也提供了Reject某⼀个消息的命令。当客户端发⽣错误,调⽤ject命令拒绝某⼀个消息时,可以设置⼀个requeue的属性,如果为true,则消息服务器会重传该消息给下⼀个订阅者;如果为false,则会直接删除该消息。当然,也可以通过ack,让消息服务器直接删除该消息并且不会重传。
4. 持久化:
Rabbit MQ默认是不持久队列、Exchange、Binding以及队列中的消息的,这意味着⼀旦消息服务器重启,所有已声明的队
列,Exchange,Binding以及队列中的消息都会丢失。通过设置Exchange和MessageQueue的durable属性为true,可以使得队列和Exchange持久化,但是这还不能使得队列中的消息持久化,这需要⽣产者在发送消息的时候,将delivery mode设置为2,只有这3个全部设置完成后,才能保证服务器重启不会对现有的队列造成影响。这⾥需要注意的是,只有durable为true的Exchange和durable为ture的Queues 才能绑定,否则在绑定时,RabbitMQ都会抛错的。持久化会对RabbitMQ的性能造成⽐较⼤的影响,可能会下降10倍不⽌。
5. 事务:
对事务的⽀持是AMQP协议的⼀个重要特性。假设当⽣产者将⼀个持久化消息发送给服务器时,因为consume命令本⾝没有任何Response返回,所以即使服务器崩溃,没有持久化该消息,⽣产者也⽆法获知该消息已经丢失。如果此时使⽤
事务,即通过txSelect()开启⼀个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可以保证,如果txCommit()提交了,则该消息⼀定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会服务器就收。当然Rabbit MQ也提供了txRollback()命令⽤于回滚某⼀个事务。
6. Confirm机制:
使⽤事务固然可以保证只有提交的事务,才会被服务器执⾏。但是这样同时也将客户端与消息服务器同步起来,这背离了消息队列解耦的本质。Rabbit MQ提供了⼀个更加轻量级的机制来保证⽣产者可以感知服务器消息是否已被路由到正确的队列中——Confirm。如果设置channel为confirm状态,则通过该channel发送的消息都会被分配⼀个唯⼀的ID,然后⼀旦该消息被正确的路由到匹配的队列中后,服务器会返回给⽣产者⼀个Confirm,该Confirm包含该消息的ID,这样⽣产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。Confirm机制的最⼤优点在于异步,⽣产者在发送消息以后,即可继续执⾏其他任务。⽽服务器返回Confirm后,会触发⽣产者的回调函数,⽣产者在回调函数中处理Confirm信息。如果消息服务器发⽣异常,导致该消息丢失,会返回给⽣产者⼀个nack,表⽰消息已经丢失,这样⽣产者就可以通过重发消息,保证消息不丢失。Confirm机制在性能上要⽐事务优越很多。但是Confirm机制,⽆法进⾏回滚,就是⼀旦服务器崩溃,⽣产者⽆法得到Confirm信息,⽣产者其实本⾝也不知道该消息吃否已经被持久化,只有继续重发来保证消息不丢失,但是如果原先已经持久化的消息,并不会被回滚,这样队列中就会存在两条相同的消息,系统需要⽀持去重。
其他:
Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投⼊到⼀个或多个队列。
Binding:绑定,它的作⽤就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进⾏消息投递。
vhost:虚拟主机,⼀个broker⾥可以开设多个vhost,⽤作不同⽤户的权限分离。
producer:消息⽣产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接⾥,可建⽴多个channel,每个channel代表⼀个会话任务。
消息队列的使⽤过程⼤概如下:
(1)客户端连接到消息队列服务器,打开⼀个channel。
(2)客户端声明⼀个exchange,并设置相关属性。
(3)客户端声明⼀个queue,并设置相关属性。
(4)客户端使⽤routing key,在exchange和queue之间建⽴好绑定关系。
(5)客户端投递消息到exchange。
Exchanges, queues, and bindings
exchanges, queues, and bindings是三个基础的概念, 他们的作⽤是:exchanges are where producers publish their messages, queues are where the messages end up and are received by consumers, and bindings are how the messages get routed from the exchange to particular queues.
下⾯我们⽤⼀副简单的思维导图把上⾯的概念组织起来:
上⾯还提到了⼀个vhost的概念,vhost是为了组织exchanges, queues, and bindings提出的概念,我们就从它开始讲起:
VHost
Vhosts也是AMQP的⼀个基础概念,连接到RabbitMQ默认就有⼀个名为"/"的vhost可⽤,本地调试的时候可以直接使⽤这
个默认的vhost.这个"/"的访问可以使⽤guest⽤户名(密码guest)访问.可以使⽤rabbitmqctl⼯具修改这个账户的权限和密码,这在⽣产环境是必须要关注的. 出于安全和可移植性的考虑,⼀个vhost内的exchange不能绑定到其他的vhost.
可以按照业务功能组来规划vhost,在集环境中只要在某个节点创建vhost就会在整个集内的节点都创建该vhost.VHost和权限都不能通过AMQP协议创建,在RabbitMQ中都是使⽤rabbitmqctl进⾏创建,管理.
如何创建vhost
vhost和permission(权限)信息是并不是通过AMQP创建⽽是通过rabbitmqctl⼯具来添加,管理的.
说完vhost我们就来看看重中之重的消息:Message
Message
消息由两部分组成:  payload and  label. "payload"是实际要传输的数据,⾄于数据的格式RabbitMQ并不关⼼,"label"描述payload,包括exchange name 和可选的topic tag.消息⼀旦到了consumer那⾥就只有payload部分了,label部分并没有带过来.RabbitMQ并不告诉你消息是谁发出的.这好⽐你收到⼀封信但是信封上是空⽩的.当然想知道是谁发的还是有办法的,在消息内容中包含发送者的信息就可以了.
消息的consumer和producer对应的概念是sending和receiving并不对应client和server.通过channel我们可以创建很多并⾏的传输 TCP链接不再成为瓶颈,我们可以把RabbitMQ当做应⽤程序级别的路由器.
Consumer消息的接收⽅式
Consumer有两种⽅式接收消息:
通过sume 订阅队列.channel将进⼊接收模式直到你取消订阅.订阅模式下Consumer只要上⼀条消息处理完成(ACK或拒绝),就会主动接收新消息.如果消息到达queue就希望得到尽快处理,也应该使⽤sume命令.
还有⼀种情况,我们不需要⼀直保持订阅,只要使⽤命令主动获取消息即可.当前消息处理完成之后,继续获取消息需要主动执⾏ 不要"在循环中使⽤"t当做另外⼀种形式的sume,因为这种做法相⽐sume有额外的成本:本质上就是先订阅queue取回⼀条消息之后取消订阅.Consumer吞吐量⼤的情况下通常都会使⽤sume.
要是没有Consumer怎么办?
如果消息没有Consumer就会⽼⽼实实呆在队列⾥⾯.
多个Consumer订阅同⼀个队列
只要Consumer订阅了queue,消息就会发送到该Consumer.我们的问题是这种情况下queue中的消息是如何分发的?
如果⼀个rabbit queue有多个consumer,具体到队列中的某条消息只会发送到其中的⼀个Consumer.
消息确认
所有接收到的消息都要求发送响应消息(ACK).这⾥有两种⽅式⼀种是Consumer使⽤basic.ack明确发送ACK,⼀种是订阅queue的时候指定auto_ack为true,这样消息⼀到Consumer那⾥RabbitMQ就会认为消息已经得到ACK.
要注意的是这⾥的响应和消息的发送者没有丝毫关系,ACK只是Consumer向RabbitMQ确认消息已经正确的接收到消息,RabbitMQ可以安全移除该消息,仅此⽽已.
没有正确响应怎么办
如果Consumer接收了⼀个消息就还没有发送ACK就与RabbitMQ断开了,RabbitMQ会认为这条消息没有投递成功会重新投递到别的Consumer.如果你的应⽤程序崩掉了,你可以设置备⽤程序来继续完成消息的处理.
如果Consumer本⾝逻辑有问题没有发送ACK的处理,RabbitMQ不会再向该Consumer发送消息.RabbitMQ会认为这个Consumer还没有处理完上⼀条消息,没有能⼒继续接收新消息.我们可以善加利⽤这⼀机制,如果需要处理过程是相当复杂的,应⽤程序可以延迟发送ACK直到处理完成为⽌.这可以有效控制应⽤程序这边的负载,不致于被⼤量消息冲击.
拒绝消息
由于要拒绝消息,所以ACK响应消息还没有发出,所以这⾥拒绝消息可以有两种选择:
1.Consumer直接断开RabbitMQ 这样RabbitMQ将把这条消息重新排队,交由其它Consumer处理.这个⽅法在RabbitMQ各版本都⽀持.这样做的坏处就是连接断开增加了RabbitMQ的额外负担,特别是consumer出现异常每条消息都⽆法正常处理的时候.
2. RabbitMQ 2.0.0可以使⽤ ject 命令,收到该命令RabbitMQ会重新投递到其它的Consumer.如果设置requeue为false,RabbitMQ会直接将消息从queue中移除.
其实还有⼀种选择就是直接忽略这条消息并发送ACK,当你明确直到这条消息是异常的不会有Consumer能处理,可以这样做抛弃异常数据.为什么要发送ject消息⽽不是ACK?RabbitMQ后⾯的版本可能会引⼊"dead letter"队列,如果想利⽤dead letter做点⽂章就使⽤
消息持久化
消息的持久化需要在消息投递的时候设置delivery mode值为2.由于消息实际存储于queue之中,"⽪之不存⽑将焉附"逻辑上,消息持久化同时要求exchange和queue也是持久化的.这是消息持久化必须满⾜的三个条件.
持久化的代价就是性能损失,磁盘IO远远慢于RAM(使⽤SSD会显著提⾼消息持久化的性能) , 持久化会⼤⼤降低RabbitMQ每秒可处理的消息.两者的性能差距可能在10倍以上.
消息恢复
consumer从durable queue中取回⼀条消息之后并发回了ACK消息,RabbitMQ就会将其标记,⽅便后续垃圾回收.如果⼀条持久化的消息没有被consumer取⾛,RabbitMQ重启之后会⾃动重建exchange和queue(以及bingding关系),消息通过持久化⽇志重建再次进⼊对应的queues,exchanges.
⽪之不存,⽑将焉附?紧接着我们看看消息实际存放的地⽅:Queue
Queue
Queues是Massage的落脚点和等待接收的地⽅,消息除⾮被扔进⿊洞否则就会被安置在⼀个Queue⾥⾯.Queue很适合做负载均
衡,RabbitMQ可以在若⼲consumer中间实现轮流调度(Round-Robin).
如何创建队列
consumer和producer都可以创建Queue,如果consumer来创建,避免consumer订阅⼀个不存在的Queue的情况,但是这⾥要承担⼀种风险:消息已经投递但是consumer尚未创建队列,那么消息就会被扔到⿊洞,换句话说消息丢了;避免这种情况的好办法就是producer和consumer都尝试创建⼀下queue. 如果consumer在已经订阅了另外⼀个Queue的情况下⽆法完成新Queue的创建,必须取消之前的订阅将Channel置为传输模式("transmit")才能创建新的Channel.
创建Queue的时候通常要指定名字,名字⽅便consumer订阅.即使你不指定Rabbit会给它分配⼀个随机的名字,这在使⽤临时匿名队列完成RPC-over-AMQP调⽤时会⾮常有⽤.
创建Queue的时候还有两个⾮常有⽤的选项:
exclusive—When set to true, your queue becomes private and can only be consumed by your app. This is useful when you need to limit a queue to only one consumer.
auto-delete—The queue is automatically deleted when the last consumer unsubscribes.
如果要创建只有⼀个consumer使⽤的临时queue可以组合使⽤auto-delete和 sumer⼀旦断开连接该队列⾃动删除.
重复创建Queue会怎样?如果Queue创建的选项完全⼀致的话,RabbitMQ直接返回成功,如果名称相同但是创建选项不⼀致就会返回创建失败.如果是想检查Queue是否存在,可以设置queue.declare命令的passive 选项为true:如果队列存在就会返回成功,如果队列不存在会报错且不会执⾏创建逻辑.
消息是如何从动态路由到不同的队列的?这就看下⾯的内容了
bindings and exchanges
消息如何发送到队列
消息是如何发送到队列的?这就要说到AMQP bindings and exchanges. 投递消息到queue都是经由exchange完成的,和⽣活中的邮件投递⼀样也需要遵循⼀定的规则,在RabbitMQ中规则是通过routing key把queue绑定到exchange上,这种绑定关系即binding.消息发送到RabbitMQ 都会携带⼀个routing key(哪怕是空的key),RabbitMQ会根据bindings匹配routing key,如果匹配成功消息会转发到指定Queue,如果没有匹配到queue消息就会被扔到⿊洞.
如何发送到多个队列
消息是分发到多个队列的?AMQP协议⾥⾯定义了⼏种不同类型的exchange:direct, fanout, topic, and headers. 每⼀种都实现了⼀种 routing 算法. header的路由消息并不依赖routing key⽽是去匹配AMQP消息的header部分,这和下⾯提到的direct exchange如出⼀辙,但是性能要差很多,在实际场景中⼏乎不会被⽤到.
direct exchange  routing key完全匹配才转发
fanout exchange 不理会routing key,消息直接⼴播到所有绑定的queue
topic exchange  对routing key模式匹配
正则匹配到第一个关键字就停止exchange持久化
创建queue和exchange默认情况下都是没有持久化的,节点重启之后queue和exchange就会消失,这⾥需要特别指定queue和exchange的durable属性.
Consumer是直接创建TCP链接到RabbitMQ吗?下⾯就是答案:
Channel
⽆论是要发布消息还是要获取消息 ,应⽤程序都需要通过TCP连接到RabbitMQ.应⽤程序连接并通过权限认证之后就要创建Channel来执⾏AMQP命令.Channel是建⽴在实际TCP连接之上通信管道,这⾥之所以引⼊channel的概念⽽不是直接通过TCP链接直接发送AMQP命令,是出于两⽅⾯的考虑:建⽴上成百上千的TCP链接,⼀⽅⾯浪费了TCP链接,⼀⽅⾯很快会触及系统瓶颈.引⼊了Channel之后多个进程与RabbitMQ 的通信可以在⼀条TCP链接上完成.我们可以把TCP类⽐做光缆,那么Channel就像光缆中的⼀根根光纤.
参考资料
[1] Rabbits and warrens
[2] 兔⼦和兔⼦窝

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。