Python3.5RocketMQ
尽管RocketMQ作为阿⾥⼀个优秀的消息中间件,不仅在阿⾥内部使⽤⾮常⼴,在⾏业内应⽤也⾮常的多,但是RocketMQ本⾝确没有直接提供Python客户端,这对于python程序员来说是多么的不友好。⽽我司⼜有⼀部分算法⼯程师是使⽤python的,为了解决他们从MQ获取数据的问题,有两种⽅案:1、通过某个java服务进⾏数据转发,即算法将数据发送到kafka,定制⼀个java服务通过⼀定的规则将数据转发到RocketMQ;2、Python调⽤java代码,实现与RocketMQ的通信。第⼀种⽅案需要有⼈专门去维护数据转发的java服务,耦合性较⼤,思考再三,还是决定使⽤⽅案⼆。
我这使⽤的是python3.5,win7 64位系统,所以下载JPype1‑0.6.3‑cp35‑cp35m‑win_amd64.whl,下载成功后,使⽤pip install C:\Users xx\Downloads\JPype1‑0.6.3‑cp35‑cp35m‑win_amd64.whl 即可,⼼疼下⾛了半天弯路的⾃⼰。好了,环境解决了,接下来就是代码的实现了。
先列出参照资料:
上⾯的代码看似⾮常完美,不过是基于python2写的,但是尽管改成python3的,运⾏时在注册RocketMQ的回调时⼀直提⽰存在多个重载,后决定利⽤java基于rocketmq-client封装⼀层jar,废话不多说,直接给代码:
其中:
1、lib下为java消费RocketMQ需要的依赖,clife-utils-mq-0.0.16-SNAPSHOT.jar为我基于rocketmq-client⾃定义jar
2、conf下为logback的⽇志配置⽂件,修改对应配置可以控制jvm⽇志级别
3、考虑到算法会应⽤到许多python的模块,为了避免在回调中应⽤到python的模块,从⽽导致⽆法被序列号到jvm中执⾏,在消费者中和⽣产者中都应⽤到了python的多进程,启动⼀个专门的进程维护jvm,并通过⼀个Queue进程数据共享。
-------------------------------------⼿动分割线----------------------------------
近期有收到私信问题,这⾥对⼀些问题做⼀个统⼀的回答。
1、如何实现只消费指定tag
这个其实很简单,稍微看看给的代码就可以看到。如下图:
2、如何实现有序消费
这个在开发时,笔者有考虑到这个需求,只不过Python接⼝对算法⼯程师隐藏了这项配置。
同问题1,consumerConfig2存在⼀个isOrderly的参数,默认为false,如果需要实现有序消费只需要增加如下代码即可。
python转java代码
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论