Python之RabbitMQ的使⽤
今天总结⼀下Python关于Rabbitmq的使⽤
  RabbitMQ,其实也是⼀种队列,那和前⾯说的线程queue和进程queue有什么区别呢?
    线程queue只能在同⼀个进程下进⾏数据交互
    进程queue只能在⽗进程和⼦进程之间,或者同⼀⽗进程下的⼦进程之间做数据交互
    如果需要对不同进程(eg:和qq)两个独⽴的程序间通信
⽅法1就是直接把数据写在硬盘(disk)上然后各⾃的进程读取数据就可以,但是由于硬盘的读写速度太慢,效率太低
⽅法2⾃⼰写个socket,直接做数据交互,问题是如果改变程序,或者再加⼀个程序,需要对写好的socket进⾏修改,还要处理黏包什么的复杂的连接关系,维护成本太⾼。⽅法3,利⽤已有的中间商(代理)。这个broker其实就是封装好的socket,我们拿来直接⽤就好了。
   这⾥的broker,就有RabbitMQ,ZeroMQ,ActiveMQ等等等等。
⼀.安装及环境配置
    windows的安装和配置⽅法较为简单,直接安装就好了
    Rabbit⽀持多种语⾔: Java, .NET, PHP, Python, JavaScript, Ruby, Go这些常⽤语⾔都⽀持
如图所⽰,python操作RabbitMQ需要的模块有上述⼏种选择,我们⽤最简单的pika,⽤pip直接安装
pip install pika
⼆.RabbitMQ的使⽤
这⾥所有的⽤法都是基于RabbitMQ是⼯作在‘localhost’上,并且端⼝号为15672,能在浏览器⾥访问localhost:15672这个地址。
1.消息分发(基础版)
这就是RabbitMQ最简单的⼯作模式,p为⽣产者(Producer),⽣产者发送message给queue,queue再把消息发送⾄消费者c(Customer)
先看看⽣产者⾄队列(send)这个过程
import pika
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
我们先建⽴了⼀个链接,然后就需要定义⼀个队列,队列的名字就暂时定位‘hello'
channel.queue_declare(queue='hello')
在RabbitMQ⾥消息并不能直接发送给队列,所有的信息发送都要通过⼀个exchange,但是这⾥我们先把这个exchange定义成⼀个空的字符串,后⾯在将他的具体⽤法channel.basic_publish(exchange='',
routing_key='hello',
body='123')
在发送确认完成后,可以将连接关闭
connect.close()
这就是send端的代码
import pika
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='123')
print("[x] Sent 'hello world!'")
connect.close()
RabbitMQ_basic_producer
运⾏了send代码后我们可以在terminal⾥RabbitMQ安装⽬录下sbin⽂件夹⾥查看⼀下消息队列
rabbitmqctl.bat list_queues
如果是Linux命令为
sudo rabbitmqctl list_queues
这⾥就说明了队列信息和消息状态。
然后再看⼀下消费者这⼀端的代码是什么样的,同样,先要建⽴连接并定义好队列名
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
channel.queue_declare(queue='hello')
这⾥可能有个疑问:我们不是在⽣产者⾥已经定义了队列名吗?为什么在消费者⾥还要定义呢?
因为在实际⼯作中,我们并不能确定是⽣产者还是消费者先⼀步运⾏,如果队列名没有定义的话运⾏时候是会报错的。下⾯就是对消息的处理
def callback(ch,method,properties,body):
print("[x] Received %r"%body)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
当消息来临时,消费者会执⾏回调函数callback。这⾥的callback就是直接打印消息内容(body)。
回调函数另外的⼏个参数:ch是conne.channel的内存对象地址,
<BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN socket=('::1', 62145, 0, 0)->('::1', 5672, 0, 0) params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>> method是包含了发送的信息
<Basic.Deliver(['consumer_tag=ctag1.9ae48c906b014a83a512413c0e6f9ef8', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=hello'])>
properties我们以后再了解。
2.公平分发(workqueue)
  在这种结构⾥,我们要考虑到这样⼀种情况:有多个消费者,消费者在得到消息时需要对消息进⾏处理,并且有可能处理消息所消耗的时间是不同的
。这⾥我们⽤的queue叫做workqueue。
  为了模拟消费者对消息进⾏处理的过程,我们⽤time.sleep()做⼀个消耗时间的过程。消息的产⽣和接收是这样的
message = ''.join(sys.argv[1:]) or"Hello World!"
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.unt(b'.'))
print(" [x] Done")
这⾥插播⼀条''.join(sys.argv[1:])的作⽤:就是把在shell⾥输⼊的指令后跟的代码加在message⾥。消费者得到消息后,数消息⾥有⼏个“.”,sleep相应的秒数。
connect和join的区别
放出第⼀版的代码
import pika,sys
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
channel.queue_declare(queue='hello')
message = ''.join(sys.argv[1:]) or"Hello World."
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
print('send %s'%message)
RabbitMQ⽣产者
import time,pika
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
channel.queue_declare(queue='hello')
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.unt(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
queue='hello')
channel.start_consuming()
RabbbitMQ消费者
这时候,我们可以多启动⼏个消费者,再⽤⽣产者发送消息,看看效果!
可以发现,消息是被公平的依次被分发给各个消费者的(Fair dispatch),这种分发的⽅式叫轮询。
消息确认message acknowledgments 
现在考虑下这种情形:消费者在处理消息时需要较长的时间,在这时把这个消费者kill掉,正在处理的消息和已经接收但未被处理的消息就丢失了。这应该是不允许的,我们可不希望有数据丢失,就需要将这些任务重新发送给其他正常⼯作的消费者。
为了保证任务不丢失,RabbitMQ⽀持使⽤message acknowledgments,消费者在完成任务后会给RabbitMQ发送个消息,告诉他活已经⼲完了,RabbitMQ就会把这个任务给释放掉。⽽当出现消费者宕机、掉线等情况时,RabbitMQ会重新把这个任务发送给其他的消费者。
往回看看上⽂说到的no_ack,这个值默认的是False,RabbitMQ是不主动销毁消息的所以我们⼀看看在这⾥把值置为True。
channel.basic_consume(callback,
queue='hello',
no_ack=True)
这样只要消费者接收到消息,RabbitMQ就直接销毁掉这个消息,就成了⼿动确认。我们要想实现刚才说的消息不丢失,就要这样定义
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# time.unt(b'.'))
time.sleep(10)    #修改了⼀下,在延时的10s把消费者断掉
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
queue='hello')#这⾥的no_ack默认为False
这样,当⼀个消费者宕机了,RabbitMQ就会直接把任务拍个下⼀个消费者。
消息持久化
刚才通过了消息确认,我们保证了消费者在掉线的时候任务不丢失,可是还有⼀个问题,如果RabbitMQ如果断掉(或者服务重启)了,⾥⾯的任务(包括所有queue和exchange依旧会丢失)这时候我们可以⽤到——消息持久化Message durability
channel.queue_declare(queue='hello',durable=True)#将队列持久化(只保存了队列)
channel.basic_publish(exchange='',
routing_key='hello',
body=message,
properties=pika.BasicProperties(delivery_mode=2))#保持消息持久化
必须同时将队列和消息持久化,可以保证RabbitMQ服务在重启后任务还存在。
注意⼏点:
1.如果只持久化了消息,服务重启后消息丢失
2.如果只持久化了队列,服务重启后队列还在,但消息丢失
3.在持久化队列的时候要保持⽣产者和消费者的⼀致性
最后⼀点,因为有可能每个消费者处理信息的能⼒不⼀样,如果按公平分发的化有可能导致负载不平衡,旱的旱死、涝的涝死。为避免这种情况发⽣还有⼀个知识点
channel.basic_qos(prefetch_count=1)
⽤这个语句限制了消费者待处理信息的个数
workQueue的终极代码
import pika,sys
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
channel.queue_declare(queue='hello',durable=True)#队列持久化
message = ''.join(sys.argv[1:]) or"Hello World2."
channel.basic_publish(exchange='',
routing_key='hello',
body=message,
properties=pika.BasicProperties(delivery_mode=2)) #消息持久化
print('send %s'%message)
RabbitMQ_workqueue_procucer
import time,pika
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
channel.queue_declare(queue='hello',durable=True)#队列持久化
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.unt(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)  #消息回执
channel.basic_qos(prefetch_count=1)  #限制消费者待处理任务个数
channel.basic_consume(callback,
queue='hello'
)
channel.start_consuming()
RabbitMQ_workqueue_customer
3.发布/订阅(publish/subscribe)
我们在前⾯两部分将的都是将消息由⽣产者到消费者之间通过queue传递,现在将引⼊⼀个新的成员:exchange。
其实⽣产者在发送的时候是不知道消息要发送给那个queue的,甚⾄他都不知道消息是由queue接收的。实际上⽣产者只是把message发送给了exchange。⾄于message后续的处理都是由exchange决定的。
就像图上标⽰的,exchange在sender和queue之间起到了转呈的作⽤。
按照⼯作⽅式,我们将exchange分成了fanout、direct、topic和headers四种类型。
fanout:所有绑定到这个exchange的队列都接收消息
direct:通过routingKey和exchange决定的那个唯⼀的queue可以接收消息
topic:所有符合routingKey(可以是表达式)的queue可以接收消息
  表达式说明:#表⽰⼀个或多个字符
*表⽰任何字符
        使⽤RoutingKey为#时相当于fanout
headers:通过headers来决定把消息发送给哪些queue。
在这个part我们来看fanout的作⽤。
exchange_type='fanout')
我们定义⼀个exchange,名字随便起⼀个‘logs’,类型就声明为fanout。
(在前⾯两节我们还没有引⼊exchange这个概念,就⽤了默认的exchange设置
channel.basic_publish(exchange='',
routing_key='hello',
body='123')
,exchange=''空的字符串表⽰了默认的exchange或名字是空的,那exchange就把消息发送给routing_key指定的queue⾥(前提是这个queue是存在的),在声明了exchange以后,我们就可以⽤这个exchange发送消息了
channel.basic_publish(exchange='logs',                #使⽤的exchange名称
routing_key='',                #使⽤的队列名称
body='123')                    #消息内容
注意到了⼀点没有?这⾥并没有定义队列的名称?为什么?在⼴播的时候是不⽤固定具体的哪个queu
e的,我们
result = channel.queue_declare() #⽣成随机queue
我们在消费端声明queue的时候可以⽣成⼀个随机的queue,这⾥还要加个命令
result = channel.queue_declare(exclusive=True)
这个exclusive表⽰在连接在关闭以后这个queue直接被销毁掉。
然后把这个queue绑定在转发器上。所有进⼊这个exchange的消息被发送给所有和他绑定的队列⾥。
随机的queue已经声明了,现在就把他跟exchange绑定
channel.queue_bind(exchange='logs',
hod.queue)#注意queue名的获取⽅法
这就是最终的代码:
import pika
import sys
connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connect.channel()
exchange_type='fanout')  #logs 是随便起的名字,声明了exchange
message = ''.join(sys.argv[1:]) or'info: Hello World!'
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
fanout_publish
import pika
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
result = channel.queue_declare(exclusive=True)  #exclusive 唯⼀的,为True时不指定queue名的化随机⽣成⼀个queue,
# 在断开连接后把queue删除,相当于⽣成⼀个随机queue
channel.queue_bind(exchange='logs',
hod.queue) #绑定的是exchange对应的queue
print('waiting for logs.')
def callback(ch,method,preproteries,body):
print('get data:%r'%body)
channel.basic_consume(callback,
hod.queue,
no_ack=True)
channel.start_consuming()
fanout_customer
总体看⼀下,发送端的代码跟前⾯的差不太多,最重要的差别就是把routingKey给忽略掉了,但是明确了exchange的对象。
⽽接收⽅是在建⽴连接后要声明exchange,并且要和队列绑定。如果没有队列和exchange绑定,消息就被销毁了。这就是整个发送的过程
还有⼀点,这个订阅——发布的模型就像电台和收⾳机⼀样,如果customer下线了是收不到信息的,消息也是在线发送的,并不会保存。
在这个过程中,我们⼤致了解了发布——订阅模型。其实就是在发送端定义了⼀个exchange,在接收端定义了⼀个队列,然后把这两者绑定,就OK了。可是我们现在只想订阅⼀部分有⽤的信息,⽐如只获取错误信息写到⽇志⽂件⾥,但同时⼜能将所有的信息都显⽰在控制台(或者terminal上)。
上⼀节所讲述的bind,也可以简单的理解为这个queue对这个exchange的内容“感兴趣”。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。