Springboot集成rocketMQ官⽅⽂档
RocketMQ-Spring
帮助开发者在中快速集成。⽀持Spring Message规范,⽅便开发者从其它MQ快速切换到RocketMQ。
如何贡献和帮助社区
我们永远欢迎开发者的帮助来使这个项⽬更加完善,⽆论是⼩的⽂档还是⼤的功能新特性,请参考RocketMQ的主站了解
前提条件
JDK 1.8 and above
3.0 and above
功能特性:
同步发送
同步顺序发送
异步发送
异步顺序发送
顺序消费
并发消费(⼴播/集)
one-way⽅式发送
事务⽅式发送
消息轨迹
ACL
pull消费
Quick Start
下⾯列出来了⼀些关键点,完整的⽰例请参考:
注意:当前的RELEASE.VERSION=2.0.1
<!--在l中添加依赖--> <dependency> <groupId>ketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>${RELEASE.VERSION}</version> </dependency>
发送消息
## application.properties rocketmq.name-server=127.0.0.1:9876 up=my-group
注意:
请将上述⽰例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端⼝
@SpringBootApplication public class ProducerApplication implements CommandLineRunner{ @Resource private RocketMQTemplate rocketMQTemplate; public static void main(String[] args){
SpringApplication.run(ProducerApplication.class, args); } public void args) throws Exception { vertAndSend("test-topic-1", "Hello, World!"); rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring m
essage").build()); vertAndSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00"))); // rocketMQTemplate.destroy(); // notes: once rocketMQTemplate be destroyed, you can not send any message again with this rocketMQTemplate } @Data
@AllArgsConstructor public class OrderPaidEvent implements Serializable{ private String orderId; private BigDecimal paidMoney; } }
在发送客户端发送事务性消息并且实现回查Listener
@SpringBootApplication public class ProducerApplication implements CommandLineRunner{ @Resource private RocketMQTemplate rocketMQTemplate; public static void main(String[] args){
SpringApplication.run(ProducerApplication.class, args); } public void args) throws Exception { try { // Build a SpringMessage for sending in transaction Message msg = MessageBuilder.withPayload(..)... // In sendMessageInTransaction(), the first parameter transaction name ("test") // must be same with the
@RocketMQTransactionListener's member field 'transName' rocketMQTemplate.sendMessageInTransaction("test", "test-topic" msg, null); } catch (MQClientExcep
tion e) { e.printStackTrace(System.out); } } // Define transaction listener with the annotation @RocketMQTransactionListener @RocketMQTransactionListener(transName="test") class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // ... local transaction process,
return bollback, commit or unknown return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // ... check transaction status and return bollback, commit or unknown return RocketMQLocalTransactionState.COMMIT; } } }
更多发送相关配置
rocketmq.producer.send-message-timeout=300000 rocketmq.producerpress-message-body-threshold=4096 rocketmq.producer.max-message-size=4194304 -times-when-send-async-failed=0
-next-server=true -times-when-send-failed=2
接收消息
## application.properties rocketmq.name-server=127.0.0.1:9876
注意:
请将上述⽰例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端⼝
@SpringBootApplication public class ConsumerApplication{ public static void main(String[] args){
SpringApplication.run(ConsumerApplication.class, args); } @Slf4j @Service @RocketMQMessageListener(topic = "test-
topic-1", consumerGroup = "my-consumer_test-topic-1") public class MyConsumer1 implements RocketMQListener<String> { public void onMessage(String message) { log.info("received message: {}", message); } } @Slf4j @Service
@RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2") public class MyConsumer2 implements RocketMQListener<OrderPaidEvent>{ public void onMessage(OrderPaidEvent orderPaidEvent) { log.info("received orderPaidEvent: {}", orderPaidEvent); } } }
更多消费相关配置
see:
消息轨迹spring到底是干啥的
Producer 端要想使⽤消息轨迹,需要多配置两个配置项:
## application.properties rocketmq.name-server=127.0.0.1:9876 up=my-group
able-msg-trace=true rocketmq.producer.customized-trace-topic=my-trace-topic
Consumer 端消息轨迹的功能需要在 @RocketMQMessageListener 中进⾏配置对应的属性:
@Service @RocketMQMessageListener( topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1", enableMsgTrace = true, customizedTraceTopic = "my-trace-topic" ) public class MyConsumer implements RocketMQListener<String> { ... }
注意:
默认情况下 Producer 和 Consumer 的消息轨迹功能是开启的且 trace-topic 为 RMQ_SYS_TRACE_TOPIC Consumer 端的消息轨迹trace-topic 可以在配置⽂件中配置 sumer.customized-trace-topic 配置项,不需要为在每
个 @RocketMQTransactionListener 配置
ACL
Producer 端要想使⽤ ACL 功能,需要多配置两个配置项:
## application.properties rocketmq.name-server=127.0.0.1:9876 up=my-group
rocketmq.producer.access-key=AK rocketmq.producer.secret-key=SK
事务消息的发送需要在 @RocketMQTransactionListener 注解⾥配置上 AK/SK:
@RocketMQTransactionListener( txProducerGroup = "test, accessKey = "AK", secretKey = "SK" ) class TransactionListenerImpl implements RocketMQLocalTransactionListener { ... }
注意:
可以不⽤为每个 @RocketMQTransactionListener 注解配置 AK/SK,在配置⽂件中配置 rocketmq.producer.access-
key 和 rocketmq.producer.secret-key 配置项,这两个配置项的值就是默认值
Consumer 端 ACL 功能需要在 @RocketMQMessageListener 中进⾏配置
@Service @RocketMQMessageListener( topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1", accessKey = "AK", secretKey = "SK" ) public class MyConsumer implements RocketMQListener<String> { ... }
注意:
可以不⽤为每个 @RocketMQMessageListener 注解配置 AK/SK,在配置⽂件中配置 sumer.access-
key 和 sumer.secret-key 配置项,这两个配置项的值就是默认值
FAQ
1. ⽣产环境有多个nameserver该如何连接?
rocketmq.name-server⽀持配置多个nameserver地址,采⽤;分隔即可。例如:172.19.0.1:9876;172.19.0.2:9876
1. rocketMQTemplate在什么时候被销毁?
开发者在项⽬中使⽤rocketMQTemplate发送消息时,不需要⼿动执⾏rocketMQTemplate.destroy()⽅法, rocketMQTemplate会在spring容器销毁时⾃动销毁。
1. 启动报错:Caused by: ption.MQClientException: The consumer group[xxx] has been
created before, specify another name please
RocketMQ在设计时就不希望⼀个消费者同时处理多个类型的消息,因此同⼀个consumerGroup下的consumer职责应该是⼀样的,不要⼲不同的事情(即消费多个topic)。建议consumerGroup与topic⼀⼀对应。
1. 发送的消息内容体是如何被序列化与反序列化的?
RocketMQ的消息体都是以byte[]⽅式存储。当业务系统的消息内容体如果是java.lang.String类型时,统⼀按照utf-8编码转成byte[];如果业务系统的消息内容为⾮java.lang.String类型,则采⽤序列化成JSON格式的字符串之后,再统⼀按照utf-8编码转成byte[]。
1. 如何指定topic的tags?
RocketMQ的最佳实践中推荐:⼀个应⽤尽可能⽤⼀个Topic,消息⼦类型⽤tags来标识,tags可以由应⽤⾃由设置。 在使⽤rocketMQTemplate发送消息时,通过设置发送⽅法的destination参数来设置消息的⽬的地,destination的格式为
topicName:tagName,:前⾯表⽰topic的名称,后⾯表⽰tags名称。
注意:
tags从命名来看像是⼀个复数,但发送消息时,⽬的地只能指定⼀个topic下的⼀个tag,不能指定多个。
1. 发送消息时如何设置消息的key?
可以通过重载的xxxSend(String destination, Message<?> msg, ...)⽅法来发送消息,指定msg的headers来完成。⽰例:
Message<?> message = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, msgId).build(); rocketMQTemplate.send("topic-test", message);
同理还可以根据上⾯的⽅式来设置消息的FLAG、WAIT_STORE_MSG_OK以及⼀些⽤户⾃定义的其它头信息。
注意:
在将Spring的Message转化为RocketMQ的Message时,为防⽌header信息与RocketMQ的系统属性冲突,在所有header的名称前⾯都统⼀添加了前缀USERS_。因此在消费时如果想获取⾃定义的消息头信息,请遍历头信息中以USERS_开头的key即可。
1. 消费消息时,除了获取消息payload外,还想获取RocketMQ消息的其它系统属性,需要怎么做?
消费者在实现RocketMQListener接⼝时,只需要起泛型为MessageExt即可,这样在onMessage⽅法将接收到RocketMQ原⽣的MessageExt消息。
@Slf4j @Service @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1") public class MyConsumer2 implements RocketMQListener<MessageExt>{ public void onMessage(MessageExt messageExt) {
log.info("received messageExt: {}", messageExt); } }
1. 如何指定消费者从哪开始消费消息,或开始消费的位置?
消费者默认开始消费的位置请参考:。 若想⾃定义消费者开始的消费位置,只需在消费者类添加⼀个RocketMQPushConsumerLifecycleListener接⼝的实现即可。 ⽰例如下:
@Slf4j @Service @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1") public class MyConsumer1 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener { @Override public void onMessage(String message) { log.info("received message: {}", message); } @Override public void prepareStart(final DefaultMQPushConsumer consumer) { // set consumer consume message from now
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis())); } }
同理,任何关于DefaultMQPushConsumer的更多其它其它配置,都可以采⽤上述⽅式来完成。
1. 如何发送事务消息(即半消息⽀持分布式事务)? 在客户端,⾸先⽤户需要实现RocketMQLocalTransactionListener接⼝,并在接⼝
类上注解声明@RocketMQTransactionListener,实现确认和回查⽅法;然后再使⽤资源模板RocketMQTemplate, 调⽤⽅法sendMessageInTransaction()来进⾏消息的发布。 注意:这个⽅法通
过指定发送者组名与具体的声明了txProducerGroup的TransactionListener进⾏关联,您也可以不指定这个值,从⽽使⽤默认的事务发送者组。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论