使⽤RocketMQTemplate发送带tags的消息RocketMQTemplate发送带tags的消息
RocketMQTemplate是RocketMQ集成到Spring cloud之后提供的个⽅便发送消息的模板类,它是基本Spring 的消息机制实现的,对外只提供了Spring抽象出来的消息发送接⼝。
在单独使⽤RocketMQ的时候,发送消息使⽤的Message是‘ssage'包下⾯的Message,⽽使⽤RocketMQTemplate发送消息时,使⽤的Message是ssaging的Message,猛⼀看,没办法发送带tags的消息了,其实在RocketMQ集成的时候已经解决了这个问题。
在RocketMQTemplate发送消息时,调⽤的⽅法是:
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
if (Objects.isNull(message) || Objects.Payload())) {
<("syncSendOrderly failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
long now = System.currentTimeMillis();
//在这⾥对消息进⾏了转化,将Spring的message转化为rocketmq⾃⼰的message
ssage.Message rocketMsg = vertToRocketMessage(objectMapper,
charset, destination, message);
SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
long costTime = System.currentTimeMillis() - now;
log.debug("send message cost: {} ms, msgId:{}", costTime, MsgId());
return sendResult;
} catch (Exception e) {
springboot其实就是spring
<("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
throw new Message(), e);
}
}
在上⾯的代码中,对消息进⾏了转化,将Spring的message转化为rocketmq⾃⼰的message,在
String[] tempArr = destination.split(":", 2);
String topic = tempArr[0];
String tags = "";
if (tempArr.length > 1) {
tags = tempArr[1];
}
所以,在发送消息的时候,我们只要把tags使⽤":"添加到topic后⾯就可以了。
例如:xxxx:tag1 || tag2 || tag3
使⽤RocketMQ 处理消息
消息发送(⽣产者)
以maven + SpringBoot ⼯程为例,先在l增加依赖
<dependency>
<groupId>ketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.1</version>
</dependency>
由于,这个依赖是⼀个starter,直接引⼊依赖就可以开始写投递消息的代码了。这个starter注册了⼀个叫
RocketMQTemplate的bean,⽤它就可以直接把消息投递出去。具体的API是这样的
XXXEvent xxxDto = new XXXEvent();
Message<XXXEvent> message = MessageBuilder.withPayload(xxxDto).build();
String dest = String.format("%s:%s",topic-name","tag-name");
//默认投递:同步发送不会丢失消息。如果在投递成功后发⽣⽹络异常,客户端会认为投递失败⽽回滚本地事务
这种投递⽅式能保证投递成功的消息不会丢失,但是不能保证投递⼀定成功。假设⼀次调⽤的流程是这样的
如果在步骤3的时候发⽣错误,因为出错mqClient会认为消息投递失败⽽把事务回滚。如果消息已经被消费,那就会导致业务错误。我们可以⽤事务消息解决这个问题。
以带事务⽅式投递的消息,正常情况下的处理流程是这样的
出错的时候是这样的
由于普通消息没有消息回查,普通消息⽤的producer不⽀持回查操作,不同业务的回查处理也不⼀样,事务消息需要使⽤单独的producer。消息发送代码⼤概是这样的
//调⽤这段代码之前别做会影响数据的操作
XXXEvent xxxDto = new XXXEvent();
Message<XXXEvent> message = MessageBuilder.withPayload(xxxDto).build();
String dest = String.format("%s:%s",topic-name","tag-name");
TransactionSendResult transactionSendResult = ketMQTemplate.sendMessageInTransaction("poducer-name","topic-name:tag-name",message,"xxxid"); if (LocalTransactionState.ROLLBACK_MESSAGE.LocalTransactionState())){
throw new RuntimeException("事务消息投递失败");
}
//按照RocketMQ的写法,这个地⽅不应该有别的代码
@RocketMQTransactionListener(txProducerGroup = "producer")
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
//消息投递成功后执⾏的逻辑(半消息)
//原⽂:When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try{
//
xxxService.doSomething();
return RocketMQLocalTransactionState.COMMIT;
catch(IOException e){
//不确定最终是否成功
return RocketMQLocalTransactionState.UNKNOWN;
}catch(Exception e){
return RocketMQLocalTransactionState.ROLLBACK;
}
}
//回查事务执⾏状态
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
Boolean result = xxxService.isSuccess(msg,arg);
if(result != null){
if(result){
return RocketMQLocalTransactionState.COMMIT;
}else{
return RocketMQLocalTransactionState.ROLLBACK;
}
}
return RocketMQLocalTransactionState.UNKNOWN;
}
}
处理消息(消费)
普通消息和事务消息的区别只在投递的时候才明显,对应的消费端代码⽐较简单
slf4j.Slf4j;
import ketmq.spring.annotation.RocketMQMessageListener;
import RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.RedisTemplate;
import org.StringRedisTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "xxx-consumer", topic = "topic-name",selectorExpression = "tag-name") public class XXXEventMQListener implements RocketMQListener<XXXEvent> {
private  String repeatCheckRedisKeyTemplate = "topic-name:tag:repeat-check:%s";
@Autowired private StringRedisTemplate redisTemplate;
@Override
public void onMessage(XXXEvent message) {
log.info("consumer message {}",message);
//处理消息
try{
xxxService.doSomething(message);
}catch(Exception ex){
log.warn(String.format("message [%s] 消费失败",message),ex);
//抛出异常后,MQClient会返回ConsumeConcurrentlyStatus.RECONSUME_LATER,这条消息会再次尝试消费
throw new RuntimException(ex);
}
}
}
RocketMQ⽤ACK机制保证NameServer知道消息是否被消费在
ketmq.spring.support.DefaultRocketMQListenerContainer⾥是这么处理的
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
@SuppressWarnings("unchecked")
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {        for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", MsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageExt:{}", messageExt, e);
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
以上为个⼈经验,希望能给⼤家⼀个参考,也希望⼤家多多⽀持。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。