Java实现RabbitMq延时队列和死信队列
延时队列:实际是不存在直接可⽤的延时队列,可通过死信消息和死信队列来实现延时队列的功能。
死信交换机: DLX 全称(Dead-Letter-Exchange)。其实它是个普通的交换机,但它是设置在队列上某个参数的值对应的交换机。
死信队列:如果某个队列上存在参数:x-dead-letter-exchange,当这个队列⾥的消息变成死信消息(dead message)后会被重新Pushlish到 x-dead-letter-exchange 所对应参数值的交换机上,跟这个交换机所绑定的队列就是死信队列。
死信消息:
消息被拒绝(ject / basic.nack),并且requeue = false
消息TTL过期
队列达到了最⼤的长度时
过期消息:RabbitMq 有两种设置消息过期的⽅式:
创建队列时通过 x-message-ttl 参数指定该队列消息的过期时间,这种队列⾥的消息过期时间全部相同。
⽣产者Pushlish消息时,通过设置消息的 expiration 参数指定过期时间,每个消息的过期时间都不⼀样。
  如果两者同时使⽤,过期时间按照⼩的⼀⽅为准,两种⽅式设置的时间都是毫秒。
应⽤场景:延时队列的应⽤场景很多,在我的项⽬开发中也涉及到很多,例如:订单五分钟未⽀付⾃动取消、订单准备超时30分钟推送提醒给门店、订单完成后两⼩时推送评价邀请给⽤户等等,这些间隔指定时间后的操作都可以使⽤延时队列。
上⼀篇⽂章:介绍了RabbitMq的基本操作,要引⼊的包和配置可以参考上⼀篇⽂章。这⾥就利⽤RabbitMq的死信队列直接来实现延时队列的功能。
⾸先创建⼀个⾃动加载类利⽤Bean在项⽬启动时,⾃动创建延时和死信交换机/延时和死信队列,并将创建好的队列绑定在对应的交换机上。如果交换机和队列存在的情况下,则不会创建或更新。这⼀步可减少⼿动或忘记创建队列带来的⿇烦:
package com.demo.fig;
llect.Maps;
slf4j.Slf4j;
import org.*;
import t.annotation.Bean;
import t.annotation.Configuration;
import java.util.Map;
/**
* RabbitMq 延时队列实现
* @author AnYuan
*/
@Slf4j
@Configuration
public class DelayQueueConfig {
/**
* 延迟队列
*/
public static final String DELAY_EXCHANGE = "delay.hange";
public static final String DELAY_QUEUE = "delay.queue.business.queue";
public static final String DELAY_QUEUE_ROUTING_KEY = "delay.queue.utingKey";
/**
* 死信队列
*/
public static final String DEAD_LETTER_EXCHANGE = "delay.hange";
public static final String DEAD_LETTER_QUEUE_ROUTING_KEY = "delay.queue.deadLetter.utingKey";
public static final String DEAD_LETTER_QUEUE = "delay.queue.deadLetter.queue";
/**
* 声明死信交换机
* @return deadLetterExchange
*/
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
/**
* 声明死信队列⽤于接收死信消息
* @return deadLetterQueueA
*/
@Bean
public Queue deadLetterQueueA() {
return new Queue(DEAD_LETTER_QUEUE);
}
/**
*  将死信队列绑定到死信交换机上
* @return deadLetterBindingA
*/
@Bean
public Binding deadLetterBindingA() {
return BindingBuilder
.bind(deadLetterQueueA())
.to(deadLetterExchange())
.with(DEAD_LETTER_QUEUE_ROUTING_KEY);
}
/**
* 声明延时交换机
* @return delayExchange
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(DELAY_EXCHANGE);
}
/**
* 将延时队列绑定参数
* @return Queue
*/
@Bean
public Queue delayQueueA() {
Map<String, Object> maps = wHashMapWithExpectedSize(3);
// 队列绑定DLX参数(关键⼀步)
maps.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// 队列绑定死信RoutingKey参数
maps.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_ROUTING_KEY);
// 消息过期采⽤第⼀种设置队列的 ttl 时间,消息过期时间全部相同。单位:毫秒,这⾥设置为8秒        maps.put("x-message-ttl", 8000);
return QueueBuilder.durable(DELAY_QUEUE).withArguments(maps).build();
}
/**
* 将延时队列绑定到延时交换机上⾯
* @return delayBindingA
*/
@Bean
public Binding delayBindingA() {
return BindingBuilder
.bind(delayQueueA())
.to(directExchange())
.with(DELAY_QUEUE_ROUTING_KEY);
}
}
这⾥我们定义⼀个RabbitMq服务接⼝:
package com.demo.www.service;
/
**
* rabbiMq服务
* @author AnYuan
*/
public interface RabbitMqService {
/**
* 统⼀发送mq
*
* @param exchange  交换机
* @param routingKey 路由key
* @param msg      消息
* @param ttl      过期时间
*/
void send(String exchange, String routingKey, String msg, Integer ttl);
}
服务接⼝的实现类:
package com.demo.www.service.impl;
import com.demo.www.service.RabbitMqService;
slf4j.Slf4j;
import org.Message;
import org.MessageProperties;
import org.springframework.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* rabbitmq服务
* @author AnYuan
*/
@Service
@Slf4j
public class RabbitMqServiceImpl implements RabbitMqService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void send(String exchange, String routingKey, String msg, Integer ttl) {
MessageProperties messageProperties = new MessageProperties();
// 第⼆种⽅式设置消息过期时间
messageProperties.String());
// 构建⼀个消息对象
Message message = new Bytes(), messageProperties);
// 发送RabbitMq消息
}
}
接着创建⼀个测试类进⾏接⼝测试:
springboot其实就是springpackage com.demo.www.service.impl;
llect.Maps;
import com.demo.fig.DelayQueueConfig;
import com.demo.www.service.RabbitMqService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.st.context.SpringBootTest;
import java.time.LocalDateTime;
import java.util.Map;
@Slf4j
@SpringBootTest class RabbitMqServiceImplTest {
@Autowired private RabbitMqService rabbitMqService;
@Test public void sendTest() {
      // ⼿动指定消息过期时间
int ttl = 10000;
Map<String, Object> msgMap = wHashMapWithExpectedSize(3);
msgMap.put("msg", "Hello RabbitMq");
msgMap.put("time", w());
msgMap.put("ttl", ttl);
// 注意这⾥发送的交换机是延时交换机
rabbitMqService.send(DelayQueueConfig.DELAY_EXCHANGE, DelayQueueConfig.DELAY_QUEUE_
ROUTING_KEY, JSONString(msgMap), ttl);    log.info("消息发送成功:{}", JSONString(msgMap));
}
}
以上准备就绪后,延时队列其实已经实现了,来看⼀下项⽬启动后的情况
在RabbitMq的管理后台,可以看到⾃动创建的交换机
⾃动创建的队列,在延时队列的Features栏可以看到有: TTl、DLX、DLK。它们分别代表:(x-message-ttl):设置队列中的所有消息的⽣存周期,也就是过期时间;(x-dead-letter-exchange)绑定了死信交换机,死信消息会重新推送到指定交换机上⽽不是丢掉;(x-dead-letter-routing-key):死信消息推送到交换机上指定路由键的队列中,也就是说绑定了RoutingKey;
当运⾏测试类后会显⽰发送成功:
⾸先会看到延时队列⾥⾯产⽣了⼀条数据:
8秒后消息变成死信消息,同时会推送到死信队列⾥⾯:
这样就实现了延时队列。最后只需要创建⼀个消费者,消费死信队列⾥⾯的消息,注意是消费死信队列!package com.demo.sumers;
import com.alibaba.fastjson.JSONObject;
import com.demo.fig.DelayQueueConfig;
import lombok.Data;
slf4j.Slf4j;
import org.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
* 延时队列消息消费者
* @author AnYuan
*/
@Component
@Slf4j
public class DelayMsgConsumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(DelayQueueConfig.DEAD_LETTER_QUEUE),
exchange = @Exchange(DelayQueueConfig.DEAD_LETTER_EXCHANGE)))
public void queueAConsumer(Message message) {
Msg msg = JSONObject.parseObject(new Body()), Msg.class);
LocalDateTime now = w();
Duration duration = Duration.Time(), now);
log.info("DelayMsgConsumer死信队列消费---->Msg:{}, 发送时间:{}, 当前时间:{},  相差时间:{}秒,消息设置的ttl:{}",
Time()),
localDateTimeToString(now),
}
@Data
public static class Msg {
private String ttl;
private String msg;
private LocalDateTime time;
}
private String localDateTimeToString(LocalDateTime localDateTime){
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
return dateTimeFormatter.format(localDateTime);
}
}
消费者创建好后,项⽬启动即可看到消费的Mq消息,对⽐time⾥⾯的值确认为同⼀条消息:

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。