Springboot+RabbitMQ实现消息确认、防⽌消息重复消费、延时队列
学习RabbitMQ也有⼀段时间了,特记录下学习⼼得。
RabbitMQ现阶段应⽤场景包括:
1:订单⽣成后,通过RabbitMQ推送短信和邮件。
2:订单⽣成后,⼀⼩时内未⽀付则关闭订单。
由应⽤场景引发的问题:
1:如何确保消费端成功消费消息
2:如何防⽌消费端重复消费消息
3:由TTL和DLX特性实现的延时队列第⼀条消息延时时长若⽐后⼀条消息延时时长更久,会导致后⼀条消息⽆法准时消费(消息在同⼀队列⾥)
1:消息确认
RabbitMQ消息确认⼜分为⽣产端确认和消费端确认
⽣产端确认:
⾸先需要确保创建的Exchange和Queue持久化(新建时默认持久化)
@Bean
public TopicExchange createTopicExchange(){
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Queue createTopicQueue(){
return new Queue(T_TOPIC_QUEUE);
}
可通过RabbitMQ后台进⾏查看
然后在l中配置
spring.rabbitmq.publisher-confirms:true
spring.rabbitmq.publisher-returns:true
实现RabbitTemplate.ConfirmCallbackRabbit和RabbitTemplate.ReturnCallback两个接⼝
@Component
public class RabbitMqProducer implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
//需初始化设置
@Bean
public void createRabbitTemplate(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
//确认消息是否到达Exchange b为false则发送失败
@Override
public void confirm(CorrelationData correlationData,boolean b, String s){
if(!b){
System.out.println("消息发送失败,失败原因:"+ s);
}
}
//确认消息是否到达Queue,消息发送失败触发(⽐如routing_key匹配不到queue)
@Override
public void returnedMessage(Message message,int i, String s, String s1, String s2){
System.out.println("进⼊returnCallback!exchange:"+ s1 +",routingkey:"+ s2);
System.out.println("消息内容:"+ message +",失败原因:"+ s);
}
}
消费端确认
消费者确认模式默认为⾃动确认模式,因此需要把消费者确认模式改为⼿动模式
在l中配置
spring.rabbitmq.listener.simple.acknowledge-mode: manual
若成功消费,则调⽤channel.basicAck()确认消费
参数说明:
deliveryTag:该消息的index
multiple:是否批量.true:将⼀次性ack所有⼩于deliveryTag的消息。
@RabbitListener(queues = RabbitMqConfig.T_TOPIC_QUEUE)
public void processMessage(Message message, Channel channel)throws IOException {
System.out.w()+"消费端收到消息:"+new Body()));
channel.MessageProperties().getDeliveryTag(),true);
}
若消费失败,即执⾏过程中出现异常则调⽤channel.basicNack()
注意requeue参数需要为false,若为true时会导致重发->异常->重发死循环,此时的做法应该是确认消息已消费,记录异常信息,把消息转发到死信队列中,等待后续处理。
参数说明:
deliveryTag:该消息的index
multiple:是否批量.true:将⼀次性拒绝所有⼩于deliveryTag的消息。
requeue:被拒绝的是否重新⼊队列
@RabbitListener(queues = RabbitMqConfig.T_TOPIC_QUEUE)
public void processM(Message message, Channel channel)throws Exception {
System.out.w()+"消费端收到消息:"+new Body()));
//模拟出现异常
try{
if(true){
throw new IOException("抛异常了!");
}
}catch(Exception e){
channel.MessageProperties().getDeliveryTag(),false,false);
}
}
2:防⽌消息重复消费
防⽌消息重复消费就是保证接⼝的幂等性,也就是保证相同的数据多次请求同⼀接⼝结果仍然⼀致。
⽐较通⽤的做法是给消息设置⼀个全局性唯⼀性Id(可根据Snowflake算法实现)存储在redis中,请求接⼝前先查询redis判断再执⾏后续操作。
根据具体的业务进⾏分析,例如订单扣款,请求接⼝前先根据订单表的是否⽀付相关字段进⾏判断等等
3:延时队列
rabbitmq-plugins enable rabbitmq_delayed_message_exchangespringboot推荐算法
使⽤CustomExchange进⾏初始化Exchange相关属性,其它和之前⼀样配置
//利⽤rabbitmq_delayed_message_exchange插件实现延时队列
@Bean
public Queue createCustomQueue(){
return new Queue(TOPIC_CUSTOM_QUEUE);
}
@Bean
public CustomExchange createCustomExchange(){
Map<String, Object> args =new HashMap<>();
args.put("x-delayed-type","topic");
return new CustomExchange(TOPIC_CUSTOM_EXCHANGE,"x-delayed-message",true,false, args);
}
@Bean
public Binding bingdelayCustomTopic(){
return BindingBuilder.bind(createCustomQueue()).to(createCustomExchange()).with(TOPIC_ROUT_KEY).noargs();
}
发送端通过⾃定义时长来实现延时队列
public boolean sendCustomMg(String message,int time){
System.out.println("⽣产者custom发送:"+ w()+":"+message);
delaymessage->{MessageProperties().setDelay(time*1000);return delaymessage;});
return true;
}
注意区别
TTL
rabbitmq_delayed_message_exchange插件

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。