SpringBoot快速集成RocketMQ实战教程
前⾔
RocketMQ是⽬前主流的消息中间件之⼀,并且⾃⾝就⽀持分布式功能。最初由阿⾥巴巴团队开发,并且经历过双⼗⼀等海量消息场景的考验,后捐赠给Apache开源基⾦会,这也是为什么我们经常听说RocketMQ是阿⾥巴巴的消息中间件,项⽬却在Apache的顶级项⽬中。
⽹络上通过SpringBoot集成RocketMQ的教程很多,但⼤多数都⽆法做到快速、通⽤的进⾏集成。本篇⽂章带⼤家快速完成基于Spring Boot的集成使⽤,同时针对⼀些集成过程中的概念和使⽤⽅法以实例进⾏讲解。
RocketMQ的部署
关于RocketMQ的部署,通常有单Master模式、多Master模式、多Master多Slave模式(异步复制或同步双写)等。
本⽂重点介绍RocketMQ的集成部分,就不再这⾥讲解如何部署Master的部署过程,读者学习时只需部署单机模式或基于Docker部署即可。
依赖集成
⾸先创建⼀个SpringBoot项⽬,为了⽅便通过浏览器访问测试,引⼊web对应的starter。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.0</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
上⾯的依赖以及可以完成⼀个基于SpringBoot的web项⽬了。下⾯需要集成RocketMQ的依赖。
在此步骤中有两个选择,⼀个就是直接引⼊RocketMQ的依赖,⽐如:
<dependency>
<groupId>ketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
但此种⽅式需要进⾏⼤量的配置及实例化操作,并不能够达到快速集成、⽅便使⽤的⽬的。
这⾥我们采⽤RocketMQ官⽅提供的基于spring的集成。项⽬的源码及依赖使⽤位于GitHub上:
github/apache/rocketmq-spring 。
在该项⽬的ReadMe中已经清晰的描述了如何引⼊依赖:
<!--add dependency l-->
<dependency>
<groupId>ketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${RELEASE.VERSION}</version>
</dependency>
我们只需按照说明,引⼊对应的依赖即可,这⾥采⽤2.1.1版本。因此,引⼊依赖⽂件如下:
<dependency>
<groupId>ketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
引⼊依赖之后,剩下的就是配置⽂件的配置和使⽤了。
配置⽂件
我们知道,SpringBoot默认的starter内置了很多配置⽂件,直接通过yml⽂件进⾏配置即可使⽤。这⾥引⼊了rocketmq的starter,虽然并不是官⽅的,但使⽤⽅式基本⼀致。
在yml⽂件中配置如下参数:
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: test-group
name-server参数对应的就是部署的RocketMQ的Nameserver服务,如果有多个的话⽤英⽂分号(;)进⾏分割。
如果使⽤的是SpringBoot2.0+的框架或者是JDK10,可将name-server改成nameServer。否则,可能会出现⼀些奇怪的bug。
上⾯是简化了的最基础的配置,其他的配置均采⽤默认配置,如果需要定制化配置,可对具体参数按照统⼀形式进⾏配置。
⽣产者⽰例
当完成了上⾯的集成,⽣产者使⽤其实⾮常简单,只需要在使⽤的地⽅注⼊RocketMQTemplate对象,然后调⽤其对应的发送⽅法即可。简单⽰例如下:
@Component
public class TestSendService {
@Resource
private RocketMQTemplate rocketMQTemplate;
public void send() {
rocketMQTemplate.send("test-topic-1",
MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
}
}
但如果是在项⽬中,这样每次使⽤都注⼊⼀个RocketMQTemplate并不符合⾯向对象的思想,⽽且RocketMQTemplate还提供了多个常⽤的⽅法,⽐如同步、异步、直接发送等模式。
我们可以将其封装成为⼀个通⽤的Service,这样其他服务只需注⼊对应的Service,调⽤公共的⽅法即可,并且注明每个⽅法的使⽤场景。抽象出来的Service接⼝如下:
/**
* Rocket MQ 对应服务封装
*
**/
public interface RocketMqService {
/**
* 同步发送消息<br/>
* <p>
* 当发送的消息很重要是,且对响应时间不敏感的时候采⽤sync⽅式;
*
* @param mqMsg 发送消息实体类
*/
void send(MqMsg mqMsg);
/**
* 异步发送消息,异步返回消息结果<br/>
* <p>
* 当发送的消息很重要,且对响应时间⾮常敏感的时候采⽤async⽅式;
*
* @param mqMsg 发送消息实体类
*/
void asyncSend(MqMsg mqMsg);
/**
* 直接发送发送消息,不关⼼返回结果,容易消息丢失,适合⽇志收集、不精确统计等消息发送;<br/>
* <p>
* 当发送的消息不重要时,采⽤one-way⽅式,以提⾼吞吐量;
*
* @param mqMsg 发送消息实体类
*/
void syncSendOrderly(MqMsg mqMsg);
}
定义了不同类型消息发送的⽅法,同时在注释部分说明具体⽅法的使⽤场景。其中将发送的参数封装为MqMsg对象,MqMsg的结构如下:
public class MqMsg {
/**
* ⼀级消息:消息topic
*/
private String topic;
/**
* ⼆级消息:消息topic对应的tags
*/
private String tags;
/**
* 消息内容
*/
private String content;
// 省略getter/setter⽅法
}
其中,topic为消息的主题,content为消息的内容,具体内容可根据⽣产者和消费者之间进⾏协定。针对上述的接⼝,提供具体的⽅法实现:
@Service("rocketMqService")
public class RocketMqServiceImpl implements RocketMqService {
private static final Logger log = Logger(RocketMqServiceImpl.class);
@Resource
private RocketMQTemplate rocketMQTemplate;
@Override
public void send(MqMsg mqMsg) {
log.info("send发送消息到mqMsg={}", mqMsg);
rocketMQTemplate.Topic() + ":" + Tags(),
MessageBuilder.Content()).build());
}
@Override
public void asyncSend(MqMsg mqMsg) {
log.info("asyncSend发送消息到mqMsg={}", mqMsg);
rocketMQTemplate.Topic() + ":" + Tags(), Content(),
new SendCallback() {
@Override
spring boot是啥public void onSuccess(SendResult sendResult) {
// 成功不做⽇志记录或处理
}
@Override
public void onException(Throwable throwable) {
log.info("mqMsg={}消息发送失败", mqMsg);
}
});
}
@Override
public void syncSendOrderly(MqMsg mqMsg) {
log.info("syncSendOrderly发送消息到mqMsg={}", mqMsg);
rocketMQTemplate.Topic() + ":" + Tags(), Content());
}
}
其中异步发送⽅法asyncSend的异步返回结果中可以根据具体的业务场景进⾏针对性的处理。
上述⽅法中均默认使⽤了tag对topic进⾏分类。如果具体的业务中不需要tag,则可对上述⽅法中拼接的冒号+tag部分去除。本实例使⽤tag进⾏分类,⽅便⽤到时可以借鉴。
完成了上⾯的封装之后,在业务使⽤场景中只需注⼊对应的Service即可,后⾯测试时我们会进⾏演⽰。
消费者⽰例
关于消费者我们可以直接实现RocketMQListener接⼝,然后通过@RocketMQMessageListener注解来匹配⽬标消息。
这⾥为了演⽰统⼀topic下不同的tag的使⽤⽅法,分两个消费者来进⾏演⽰,直接看代码:
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论