Python:Rocketmq消息队列使⽤
rocketmq可以与kafka等⼀起使⽤,⽤于实时消息处理。
安装rocketmq:
⽣产消息producer:
from rocketmq.client import Producer, Message
import json
producer = Producer('PID-test')
producer.set_namesrv_addr('x:xxxxx') #rocketmq队列接⼝地址(服务器ip:port)
producer.start()
msg_body = {"id":"001","name":"test_mq","message":"abcdefg"}
ss = json.dumps(msg_body).encode('utf-8')
rabbitmq rocketmq kafka区别msg = Message('topic_name') #topic名称
msg.set_keys('xxxxxx')
msg.set_tags('xxxxxx')
msg.set_body(ss) #message body
retmq = producer.send_sync(msg)
print(retmq.status, retmq.msg_id, retmq.offset)
producer.shutdown()
其中:
设置ip:port的位置:producer.set_namesrv_addr('x:xxxxx')
当只有单⼀服务器时,格式是上⾯这个;
当有多个服务器地址(集模式)时,可以使⽤:
producer.set_namesrv_addr("x:x.xxx:x.xxx:xxxxx")
不过以下这种⽅式本⼈测试不通过:producer.set_namesrv_addr(["x:xxxxx","x:xxxxx","x:xxxxx"])如果使⽤pandas数据,pandas数据可以直接转换
_json(orient='records').encode('utf-8'),然后放⼊body中发送。
消费消息consumer:
可以使⽤ PushConsumer 和 PullConsumer,同样来⾃ rocketmq.client。
# 使⽤PullConsumer时
from rocketmq.client import PullConsumer
consumer = PullConsumer('CID_test')
consumer.set_namesrv_addr('x:xxxxx')
consumer.start()
for msg in consumer.pull('topic_name'):
print(msg.id, msg.body)
consumer.shutdown()
# PushConsumer与此类似
from rocketmq.client import PushConsumer
注:⽬前rocketmq库只⽀持linux和mac。
#
参考:
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论