使⽤rocketmq-spring-boot-starter来配置、发送和消费RocketMQ消息
简介:本⽂将 rocktmq-spring-boot 的设计实现做⼀个简单的介绍,读者可以通过本⽂了解将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,然后通过⼀个简单的⽰例来⼀步⼀步的讲解如何使⽤这个 spring-boot-starter ⼯具包来配置,发送和消费 RocketMQ 消息。
作者 | 辽天
来源 |
导读:本⽂将 rocktmq-spring-boot 的设计实现做⼀个简单的介绍,读者可以通过本⽂了解将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,然后通过⼀个简单的⽰例来⼀步⼀步的讲解如何使⽤这个 spring-boot-starter ⼯具包来配置,发送和消费 RocketMQ 消息。
在 Spring ⽣态中玩转 RocketMQ 系列⽂章:
本⽂配套可交互教程已登录阿⾥云知⾏动⼿实验室,PC 端登录在浏览器中⽴即体验。
通过本⽂,您将了解到:
Spring 的消息框架介绍
rocketmq-spring-boot 具体实现
使⽤⽰例
前⾔
上世纪 90 年代末,随着 Java EE(Enterprise Edition) 的出现,特别是 Enterprise Java Beans 的使⽤需要复杂的描述符配置和死板复杂的代码实现,增加了⼴⼤开发者的学习曲线和开发成本,由此基于简单的 XML 配置和普通 Java 对象(Plain Old Java Objects)的 Spring 技术应运⽽⽣,依赖注⼊(Dependency Injection), 控制反转(Inversion of Control)和⾯向切⾯编程(AOP)的技术更加敏捷地解决了传统 Java 企业及版本的不⾜。
随着 Spring 的持续演进,基于注解(Annotation)的配置逐渐取代了 XML ⽂件配置,2014 年 4 ⽉ 1 ⽇,Spring Boot 1.0.0 正式发布,它基于“约定⼤于配
置”(Convention over configuration)这⼀理念来快速地开发、测试、运⾏和部署 Spring 应⽤,并能通过简单地与各种启动器(如 spring-boot-web-starter)结合,让应⽤直接以命令⾏的⽅式运⾏,不需再部署到独⽴容器中。这种简便直接快速构建和开发应⽤的过程,可以使⽤约定的配置并且简化部署,受到越来越多的开发者的欢迎。
Apache RocketMQ 是业界知名的分布式消息和流处理中间件,简单地理解,它由 Broker 服务器和客户端两部分组成:
其中客户端⼀个是消息发布者客户端(Producer),它负责向 Broker 服务器发送消息;另外⼀个是消息的消费者客户端(Consumer),多个消费者可以组成⼀个消费组,来订阅和拉取消费 Broker 服务器上存储的消息。
为了利⽤ Spring Boot 的快速开发和让⽤户能够更灵活地使⽤ RocketMQ 消息客户端,Apache RocketMQ 社区推出了 spring-boot-starter 实现。随着分布式事务消息功能在 RocketMQ 4.3.0 版本的发布,近期升级了相关的 spring-boot 代码,通过注解⽅式⽀持分布式事务的回查和事务消息的发送。
本⽂将对当前的设计实现做⼀个简单的介绍,读者可以通过本⽂了解将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,然后通过⼀个简单的⽰例来⼀步⼀步的讲解如何使⽤这个 spring-boot-starter ⼯具包来配置,发送和消费 RocketMQ 消息。
Spring 中的消息框架
顺便在这⾥讨论⼀下在 Spring 中关于消息的两个主要的框架,即 Spring Messaging 和 Spring Cloud Stream。它们都能够与 Spring Boot 整合并提供了⼀些参考的实现。和所有的实现框架⼀样,消息框
架的⽬的是实现轻量级的消息驱动的微服务,可以有效地简化开发⼈员对消息中间件的使⽤复杂度,让系统开发⼈员可以有更多的精⼒关注于核⼼业务逻辑的处理。
1. Spring Messaging
Spring Messaging 是 Spring Framework 4 中添加的模块,是 Spring 与消息系统集成的⼀个扩展性的⽀持。它实现了从基于 JmsTemplate 的简单的使⽤ JMS 接⼝到异步接收消息的⼀整套完整的基础架构,Spring AMQP 提供了该协议所要求的类似的功能集。在与 Spring Boot 的集成后,它拥有了⾃动配置能⼒,能够在测试和运⾏时与相应的消息传递系统进⾏集成。springboot aop
单纯对于客户端⽽⾔,Spring Messaging 提供了⼀套抽象的 API 或者说是约定的标准,对消息发送端和消息接收端的模式进⾏规定,不同的消息中间件提供商可以在这个模式下提供⾃⼰的 Spring 实现:在消息发送端需要实现的是⼀个 XXXTemplate 形式的 Java Bean,结合 Spring Boot 的⾃动化配置选项提供多个不同的发送消息⽅法;在消息的消费端是⼀个 XXXMessageListener 接⼝(实现⽅式通常会使⽤⼀个注解来声明⼀个消息驱动的 POJO),提供回调⽅法来监听和消费消息,这个接⼝同样可以使⽤ Spring Boot 的⾃动化选项和⼀些定制化的属性。
如果有兴趣深⼊的了解 Spring Messaging 及针对不同的消息产品的使⽤,推荐阅读这个⽂件。参考 Spring Messaging 的既有实现,RocketMQ 的 spring-boot-starter 中遵循了相关的设计模式并结合 Ro
cketMQ ⾃⾝的功能特点提供了相应的 API(如顺序、异步和事务半消息等)。
2. Spring Cloud Stream
Spring Cloud Stream 结合了 Spring Integration 的注解和功能,它的应⽤模型如下:
该图⽚引⾃ spring cloud stream
Spring Cloud Stream 框架中提供⼀个独⽴的应⽤内核,它通过输⼊(@Input)和输出(@Output)通道与外部世界进⾏通信,消息源端(Source)通过输⼊通道发送消息,消费⽬标端(Sink)通过监听输出通道来获取消费的消息。这些通道通过专⽤的 Binder 实现与外部代理连接。开发⼈员的代码只需要针对应⽤内核提供的固定的接⼝和注解⽅式进⾏编程,⽽不需要关⼼运⾏时具体的 Binder 绑定的消息中间件。在运⾏时,Spring Cloud Stream 能够⾃动探测并使⽤在 classpath 下到的Binder。
这样开发⼈员可以轻松地在相同的代码中使⽤不同类型的中间件:仅仅需要在构建时包含进不同的 Binder。在更加复杂的使⽤场景中,也可以在应⽤中打包多个 Binder 并让它⾃⼰选择 Binder,甚⾄在运⾏时为不同的通道使⽤不同的 Binder。
Binder 抽象使得 Spring Cloud Stream 应⽤可以灵活的连接到中间件,加之 Spring Cloud Stream 使⽤利⽤了 Spring Boot 的灵活配置配置能⼒,这样的配置可以通过外部配置的属性和 Spring Boot ⽀持的任何形式来提供(包括应⽤启动参数、环境变量和 l 或者 application.properties ⽂件),部署⼈员可以在运⾏时动态选择通道连接 destination(例如,Kafka 的 topic 或者 RabbitMQ 的 exchange)。
Binder SPI 的⽅式来让消息中间件产品使⽤可扩展的 API 来编写相应的 Binder,并集成到 Spring Clo
ud Steam 环境,⽬前 RocketMQ 还没有提供相关的 Binder,我们计划在下⼀步将完善这⼀功能,也希望社区⾥有这⽅⾯经验的同学积极尝试,贡献 PR 或建议。
spring-boot-starter的实现
在开始的时候我们已经知道,spring boot starter 构造的启动器对于使⽤者是⾮常⽅便的,使⽤者只要在 l引⼊starter 的依赖定义,相应的编译,运⾏和部署功能就全部⾃动引⼊。因此常⽤的开源组件都会为 Spring 的⽤户提供⼀个 spring-boot-starter 封装给开发者,让开发者⾮常⽅便集成和使⽤,这⾥我们详细的介绍⼀下RocketMQ(客户端)的 starter 实现过程。
1. spring-boot-starter 的实现步骤
对于⼀个 spring-boot-starter 实现需要包含如下⼏个部分:
1)在 l 的定义
定义最终要⽣成的 starter 组件信息
<groupId>ketmq</groupId>
<artifactId>spring-boot-starter-rocketmq</artifactId>
<version>1.0.0-SNAPSHOT</version>
定义依赖包
它分为两个部分:Spring ⾃⾝的依赖包和 RocketMQ 的依赖包。
2)配置⽂件类
定义应⽤属性配置⽂件类 RocketMQProperties,这个 Bean 定义⼀组默认的属性值。⽤户在使⽤最终的 starter 时,可以根据这个类定义的属性来修改取值,当然不是直接修改这个类的配置,⽽是 spring-boot 应⽤中对应的配置⽂件:src/main/resources/application.properties。
3)定义⾃动加载类
定义 src/resources/META-INF/spring.factories ⽂件中的⾃动加载类,其⽬的是让 spring boot 更具⽂中中所指定的⾃动化配置类来⾃动初始化相关的 Bean、Component 或 Service,它的内容如下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
ketmq.spring.starter.RocketMQAutoConfiguration
在 RocketMQAutoConfiguration 类的具体实现中,定义开放给⽤户直接使⽤的 Bean 对象包括:
RocketMQProperties 加载应⽤属性配置⽂件的处理类;
RocketMQTemplate 发送端⽤户发送消息的发送模板类;
ListenerContainerConfiguration 容器 Bean 负责发现和注册消费端消费实现接⼝类,这个类要求:由 @RocketMQMessageListener 注解标注;实现
RocketMQListener 泛化接⼝。
4)最后具体地进⾏ RpcketMQ 相关的封装
在发送端(producer)和消费端(consumer)客户端分别进⾏封装,在当前的实现版本提供了对 Spring Messaging 接⼝的兼容⽅式。
2. 消息发送端实现
1)普通发送端
发送端的代码封装在 RocketMQTemplate POJO 中,下图是发送端的相关代码的调⽤关系图:
为了与 Spring Messaging 的发送模板兼容,在 RocketMQTemplate 集成了 AbstractMessageSendingTemplate 抽象类,来⽀持相关的消息转换和发送⽅法,这些⽅法最终会代理给 doSend() ⽅法、doSend() 以及 RocoketMQ 所特有的⼀些⽅法如异步,单向和顺序等⽅法直接添加到 RoketMQTempalte 中,这些⽅法直接代理调⽤到RocketMQ 的 Producer API 来进⾏消息的发送。
2)事务消息发送端
对于事务消息的处理,在消息发送端进⾏了部分的扩展,参考上⾯的调⽤关系类图。
RocketMQTemplate ⾥加⼊了⼀个发送事务消息的⽅法 sendMessageInTransaction(),并且最终这个⽅法会代理到 RocketMQ 的 TransactionProducer 进⾏调⽤,在这个Producer 上会注册其关联的 TransactionListener 实现类,以便在发送消息后能够对 TransactionListener ⾥的⽅法实现进⾏调⽤。
3. 消息消费端实现
在消费端 Spring-Boot 应⽤启动后,会扫描所有包含 @RocketMQMessageListener 注解的类(这些类需要集成 RocketMQListener 接⼝,并实现 onMessage()⽅法),这个 Listener 会⼀对⼀的被放置到。
DefaultRocketMQListenerContainer 容器对象中,容器对象会根据消费的⽅式(并发或顺序),将 RocketMQListener 封装到具体的 RocketMQ 内部的并发或者顺序接⼝实现。在容器中创建 RocketMQ Consumer 对象,启动并监听定制的 Topic 消息,如果有消费消息,则回调到 Listener 的 onMessage() ⽅法。
使⽤⽰例
上⾯的⼀章介绍了 RocketMQ 在 spring-boot-starter ⽅式的实现,这⾥通过⼀个最简单的消息发送和消费的例⼦来介绍如何使这个 rocketmq-spring-boot-starter。
1. RocketMQ 服务端的准备
1)启动 NameServer 和 Broker
要验证 RocketMQ 的 Spring-Boot 客户端,⾸先要确保 RocketMQ 服务正确的下载并启动。可以参考 RocketMQ 主站的快速开始来进⾏操作。确保启动 NameServer 和Broker 已经正确启动。
2)创建实例中所需要的 Topics
在执⾏启动命令的⽬录下执⾏下⾯的命令⾏操作:
bash bin/mqadmin updateTopic -c DefaultCluster -t string-topic
2. 编译 rocketmq-spring-boot-starter
⽬前的 spring-boot-starter 依赖还没有提交的 Maven 的中⼼库,⽤户使⽤前需要⾃⾏下载 git 源码,
然后执⾏ mvn clean install 安装到本地仓库。
git clone github/apache/rocketmq-externals.git
cd rocketmq-spring-boot-starter
mvn clean install
3. 编写客户端代码
⽤户如果使⽤它,需要在消息的发布和消费客户端的 maven 配置⽂件 l 中添加如下的依赖:
属性 spring-boot-starter-rocketmq-version 的取值为:1.0.0-SNAPSHOT,这与上⼀步骤中执⾏安装到本地仓库的版本⼀致。1)消息发送端的代码
发送端的配置⽂件 application.properties:
发送端的 Java 代码:
2)消息消费端代码
消费端的配置⽂件 application.properties:
消费端的 Java 代码:

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。