SpringCloudAlibaba消息队列:基于RocketMQ实现服务异步通信
本讲咱们将学习以下三⽅⾯内容:
介绍消息队列与 Alibaba RocketMQ;
掌握 RocketMQ 的部署⽅式;
讲解微服务接⼊ RocketMQ 的开发技巧;
⾸先咱们先来认识什么是消息队列 MQ 呢?
消息队列与 RocketMQ
消息队列 MQ
消息队列(Message Queue)简称 MQ,是⼀种跨进程的通信机制,通常⽤于应⽤程序间进⾏数据的异步传输,MQ 产品在架构中通常也被叫作“消息中间件”。它的最主要职责就是保证服务间进⾏可靠的数据传输,同时实现服务间的解耦。
这么说太过学术,我们看⼀个项⽬的实际案例,假设市级税务系统向省级税务系统上报本年度税务汇总数
据,按以往的设计市级税务系统作为数据的⽣产者需要了解省级税务系统的 IP、端⼝、接⼝等诸多细节,然后通过 RPC、RESTful 等⽅式同步向省级税务系统发送数据,省级税务系统作为数据的消费者接受后响应“数据已接收”。
虽然从逻辑上是没有问题的,但是从技术层⾯却衍⽣出三个新问题:
假如上报时省级税务系统正在升级维护,市级税务系统就必须设计额外的重发机制保证数据的完整性;
假如省级税务系统接收数据需要 1 分钟处理时间,市级税务系统采⽤同步通信,则市级税务系统传输线程就要阻塞 1 分钟,在⾼并发场景下如此长时间的阻塞很容易造成系统的崩溃;
假如省级税务系统接⼝的调⽤⽅式、接⼝、IP、端⼝有任何改变,都必须⽴即通知市级税务系统进⾏调整,否则就会出现通信失败。
从以上三个问题可以看出,省级系统产⽣的变化直接影响到市级税务系统的执⾏,两者产⽣了强耦合,
如果问题放在互联⽹的微服务架构中,⼏⼗个服务进⾏串联调⽤,每个服务间如果都产⽣类似的强耦合,系统必然难以维护。
为了解决这种情况,我们需要在架构中部署消息中间件,这个组件应提供可靠的、稳定的、与业务⽆关的特性,使进程间通信解耦,⽽这⼀类消息中间件的代表产品就是 MQ 消息队列。当引⼊ MQ 消息队列后,消息传递过程会产⽣以下变化。
可以看到,引⼊消息队列后,⽣产者与消费者都只⾯向消息队列进⾏数据处理,数据⽣产者根本不需要了解具体消费者的信息,只要把数据按事先约定放在指定的队列中即可。⽽消费者也是⼀样的,消费者端监听消息队列,如果队列中产⽣新的数据,MQ 就会通过“推送PUSH”或者“抽取 PULL”的⽅式让消费者获取到新数据进⾏后续处理。
通过⽰意图可以看到,只要消息队列产品是稳定可靠的,那消息通信的过程就是有保障的。在架构领域,很多⼚商都开发了⾃⼰的 MQ 产品,最具代表性的开源产品有:
Kafka
ActiveMQ
ZeroMQ
RabbitMQ
RocketMQ
每⼀种产品都有⾃⼰不同的设计与实现原理,但根本的⽬标都是相同的:为进程间通信提供可靠的异步传输机制。RocketMQ 作为阿⾥系产品天然被整合进 Spring Cloud Alibaba ⽣态,在经历过多次双 11 的考验后,RocketMQ 在性能、可靠性、易⽤性⽅⾯都是⾮常优秀的,下⾯咱们来了解下 RocketMQ 吧。
RocketMQ
RocketMQ 有很多优秀的特性,在可⽤性⽅⾯,RocketMQ 强调集⽆单点,任意⼀点⾼可⽤,客户端具备负载均衡能⼒,可以轻松实现⽔平扩容;在性能⽅⾯,在天猫双 11 ⼤促背后的亿级消息处理就是通过 RocketMQ 提供的保障;在 API ⽅⾯,提供了丰富的功能,可以实现异步消息、同步消息、顺序消息、事务消息等丰富的功能,能满⾜⼤多数应⽤场景;在可靠性⽅⾯,提供了消息持久化、失败重试机制、消息查询追溯的功能,进⼀步为可靠性提供保障。
了解 RocketMQ 的诸多特性后,咱们来理解 RocketMQ ⼏个重要的概念:
消息 Message:消息在⼴义上就是进程间传递的业务数据,在狭义上不同的 MQ 产品对消息⼜附加了额外属性如:Topic(主题)、Tags(标签)等;
消息⽣产者 Producer:指代负责⽣产数据的⾓⾊,在前⾯案例中市级税务系统就充当了消息⽣产者的⾓⾊;
消息消费者 Consumer:指代使⽤数据的⾓⾊,前⾯案例的省级税务系统就是消息消费者;
MQ消息服务 Broker:MQ 消息服务器的统称,⽤于消息存储与消息转发;
⽣产者组 Producer Group:对于发送同⼀类消息的⽣产者,RocketMQ 对其分组,成为⽣产者组;
消费者组 Consumer Group:对于消费同⼀类消息的消费者,RocketMQ 对其分组,成为消费者组。
在理解这些基本概念后,咱们正式进⼊ RocketMQ 的部署与使⽤环节,通过案例代码理解 RocketMQ 的执⾏过程。对于 RocketMQ 来说,使⽤它需要两个阶段:搭建 RocketMQ 服务器集与应⽤接⼊ RocketMQ 队列,⾸先咱们来部署 RocketMQ 集。
部署 RocketMQ 集
RocketMQ 天然采⽤集模式,常见的 RocketMQ 集有三种形式:多 Master 模式、多 Master 多 Slave- 异步复制模式、多 Master 多 Slave- 同步双写模式,这三种模式各⾃的优缺点如下。
多 Master 模式是配置最简单的模式,同时也是使⽤最多的形式。优点是单个 Master 宕机或重启维护对应⽤⽆影响,在磁盘配置为RAID10 时,即使机器宕机不可恢复情况下,由于 RAID10 磁盘⾮常可靠,同步刷盘消息也不会丢失,性能也是最⾼的;缺点是单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
多 Master 多 Slave 异步复制模式。每个 Master 配置⼀个 Slave,有多对 Master-Slave,HA 采⽤异步复制⽅式,主备有短暂消息毫秒级延迟,即使磁盘损坏只会丢失少量消息,且消息实时性不会受影响。同时 Master 宕机后,消费者仍然可以从 Slave 消费,⽽且此过程对应⽤透明,不需要⼈⼯⼲预,性能同多 Master 模式⼏乎⼀样;缺点是 Master 宕机,磁盘损坏情况下会丢失少量消息。
activemq和rocketmq的区别多 Master 多 Slave 同步双写模式,HA 采⽤同步双写⽅式,即只有主备都写成功,才向应⽤返回成功,该模式数据与服务都⽆单点故障,Master 宕机情况下,消息⽆延迟,服务可⽤性与数据可⽤性都⾮常⾼;缺点是性能⽐异步复制模式低 10% 左右,发送单个消息的执⾏时间会略⾼,且⽬前版本在主节点宕机后,备机不能⾃动切换为主机。
本讲我们将搭建⼀个双 Master 服务器集,⾸先来看⼀下部署架构图:
在双 Master 架构中,出现了⼀个新⾓⾊ NameServer(命名服务器),NameServer 是 RocketMQ ⾃
带的轻量级路由注册中⼼,⽀持Broker 的动态注册与发现。在 Broker 启动后会⾃动向 NameServer 发送⼼跳包,通知 Broker 上线。当 Provider 向 NameServer 获取路由信息,然后向指定 Broker 建⽴长连接完成数据发送。
集⼯作流程
启动NameServer,NameServer起来后监听端⼝,等待Broker、Producer、Consumer连上来,相当于⼀个路由控制中⼼。
Broker启动,跟所有的NameServer保持长连接,定时发送⼼跳包。⼼跳包中包含当前Broker信息(IP+端⼝等)以及存储所有Topic信息。注册成功后,NameServer集中就有Topic跟Broker的映射关系。
收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时⾃动创建Topic。
Producer发送消息,启动时先跟NameServer集中的其中⼀台建⽴长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择⼀个队列,然后与队列所在的Broker建⽴长连接从⽽向Broker发消息。
Consumer跟Producer类似,跟其中⼀台NameServer建⽴长连接,获取当前订阅Topic存在哪些Broker
上,然后直接跟Broker建⽴连接通道,开始消费消息。
为了避免单节点瓶颈,通常 NameServer 会部署两台以上作为⾼可⽤冗余。NameServer 本⾝是⽆状态的,各实例间不进⾏通信,因此在Broker 集配置时要配置所有 NameServer 节点以保证状态同步。
部署 RocketMQ 集要分两步:部署 NameServer 与部署 Broker 集。
第⼀步,部署 NameServer 集。
''我们创建两台 CentOS7 虚拟机,IP 地址分别为 192.168.31.200 与 192.168.31.201,要求这两台虚拟机内存⼤于 2G,并安装好64 位 JDK1.8,具体过程不再演⽰。
之后访问 Apache RocketMQ 下载页:
获取 RocketMQ 最新版 rocketmq-all-4.8.0-bin-release.zip,解压后编辑 rocketmq-all-4.8.0-bin-release/bin/runserver.sh ⽂件,因为 RocketMQ 是服务器软件,默认为其配置 8G 内存,这是 PC 机及或者笔记本吃不消的,所以在 82 ⾏附近将 JVM 内存缩⼩到 1GB 以⽅便演⽰。
修改前:
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
修改后:
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
修改完毕,将 rocketmq-all-4.8.0-bin-release 上传到两台 NameServer 虚拟机的 /usr/local ⽬录下,执⾏ bin ⽬录下的 mqnamesrv 命令。
cd /usr/local/rocketmq-all-4.8.0-bin-release/bin/
sh mqnamesrv
mqnamesrv 是 RocketMQ ⾃带 NameServer 的启动命令,执⾏后看到 The Name Server boot success. serializeType=JSON 就代表NameServer 启动成功,NameServer 将占⽤ 9876 端⼝提供服务,不要忘记在防⽕墙设置放⾏。之后如法炮制在另⼀台 201 设备上部署 NameServer,构成 NameServer 集。
第⼆步,部署 Broker 集。
我们再额外创建两台 CentOS7 虚拟机,IP 地址分别为 192.168.31.210 与 192.168.31.211,同样要求这两台虚拟机内存⼤于 2G,并安装好 64 位 JDK1.8。
打开 rocketmq-all-4.8.0-bin-release ⽬录,编辑 /bin/runbroker.sh ⽂件,同样将启动 Broker 默认占⽤内存从 8G 缩⼩到 1G,将
64 ⾏调整为以下内容:
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
在 conf ⽬录下,RocketMQ 已经给我们贴⼼的准备好三组集配置模板:
2m-2s-async 代表双主双从异步复制模式;
2m-2s-sync 代表双主双从同步双写模式;
2m-noslave 代表双主模式。
我们在 2m-noslave 双主模式⽬录中,在 broker-a.properties 与 broker-b.properties 末尾追加 NameServer 集的地址,为了⽅便理解我也将模板⾥⾯每⼀项的含义进⾏注释,⾸先是 broker-a.properties 的完整内容如下:
#集名称,同⼀个集下的 broker 要求统⼀
brokerClusterName=DefaultCluster
#broker 名称
brokerName=broker-a
#brokerId=0 代表主节点,⼤于零代表从节点
brokerId=0
#删除⽇志⽂件时间点,默认凌晨 4 点
deleteWhen=04
#⽇志⽂件保留时间,默认 48 ⼩时
fileReservedTime=48
#Broker 的⾓⾊
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
brokerRole=ASYNC_MASTER
#刷盘⽅式
#- ASYNC_FLUSH 异步刷盘,性能好宕机会丢数
#- SYNC_FLUSH 同步刷盘,性能较差不会丢数
flushDiskType=ASYNC_FLUSH
#末尾追加,NameServer 节点列表,使⽤分号分割
namesrvAddr=192.168.31.200:9876;192.168.31.201:9876
broker-b.properties 只有 brokerName 不同,如下所⽰:
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
#末尾追加,NameServer 节点列表,使⽤分号分割
namesrvAddr=192.168.31.200:9876;192.168.31.201:9876
之后将 rocketmq-all-4.8.0-bin-release ⽬录上传到 /usr/local ⽬录,运⾏下⾯命令启动 broker 节点 a。
cd /usr/local/rocketmq-all-4.8.0-bin-release/
sh bin/mqbroker -c ./conf/2m-noslave/broker-a.properties
在 mqbroker 启动命令后增加 c 参数说明要加载哪个 Broker 配置⽂件。
启动成功会看到下⾯的⽇志,Broker 将占⽤ 10911 端⼝提供服务,请设置防⽕墙放⾏。
The broker[broker-a, 192.168.31.210:10911] boot success. serializeType=JSON and name server is 192.168.31.200:9876;192.168.31.201:9876

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。