基于redisstream+springboot的消息队列机制简单实现基于redis stream + spring boot的消息队列机制简单实现
1.通知结构整体设计
2.流程解析
1. 在各个不同的功能点都可能触发消息通知,借助LxNoticeStreamUtil#addNoticeMessage(Integer noticeAction,String actionParam)⽅法将消息推
送到指定队列
2. 在程序启动时就会由StreamConsumerRunner启动MQ的StreamMessageListener,⼀旦有消息产⽣,监听者⾃动接收消息,并解析执
spring ioc注解
3. 接收到消息后委托给通知发送封装类NoticeMediator#sendNotice(Map<String,String> messageBody)处理
4. NoticeMediator接收到消息体后根据携带的消息内容解析出需要发送什么通知,从⽽
由NoticeParameterFactory#getStringParameterStrategy(Integer strategyCode)构建出⽤于获取被通知对象以及消息内容
的NoticeParameterStrategy
5. 每个具体的消息通知需求都可以抽象为⼀个NoticeParameterStrategy,在这个基础结构中提供了三种⽅法
1. 反序列化请求发起时的必要参数DTO
void init(String body)
2. 获取通知的消息体
T getMessage()
3. 获取被通知者列表
List<S> listNotifiedPersons()
6. 构建出消息通知必要的内容后交给Notifier#sendNotice(String message, List<String> noticedPersons)发送通知
7. 通知完成后对特定消息发起ACK,确认消息消费完成
3.消息补偿机制
⽬标:在消息⼀次因为⽹络或其他可恢复的原因造成消息未消费完成的情况提供⼀种补偿机制,让消息尽可能消费完成
1. 使⽤特定消费者组的特定消费者监听消息队列中挂在消费者下的PendingList
PendingList存储着被消费过,但是没有被ACK的消息
2. 遍历PendingList消费这些未成功处理的消息,若补偿消费成功发送ACK确认
3. 在3次遍历后还存在未消费的消息则继续存放等待下次补偿机制触发
4. 判断MQ容量,超过限制⾃动缩容1/2
4.spring redis connection
@ConditionalOnProperty(name ="dis.client-type", havingValue ="lettuce", matchIfMissing =true)
通过这个注解可知,在我使⽤的版本已经将lettuce置为默认redis连接池
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.4</version>
<relativePath/><!-- lookup parent from repository -->
</parent>
4.1⾃定义redis 连接(standalone)
1. 构造连接池配置对象
@Bean
public GenericObjectPoolConfig redisPool(){
GenericObjectPoolConfig<Object> genericObjectPoolConfig =new GenericObjectPoolConfig<>();
genericObjectPoolConfig.MaxWait());
genericObjectPoolConfig.MaxActive());
genericObjectPoolConfig.MaxIdle());
genericObjectPoolConfig.MinIdle());
return new GenericObjectPoolConfig<>();
}
2. 构造redis连接配置对象
@Bean("redisConfigMaster1")
public RedisStandaloneConfiguration redisConfigMaster(){
RedisStandaloneConfiguration redisStandaloneConfiguration =new Host(), Port());
redisStandaloneConfiguration.Database());
redisStandaloneConfiguration.setPassword(RedisPassword.Password()));
return redisStandaloneConfiguration;
}
3. 配置连接⼯⼚,⽤于⽣产与redis的连接
@Bean("factory1")
public LettuceConnectionFactory factory(@Qualifier("redisConfigMaster1") RedisStandaloneConfiguration redisConfigMaster){
LettuceClientConfiguration clientConfiguration = LettucePoolingClientConfiguration.builder().poolConfig(redisPool()).build();
return new LettuceConnectionFactory(redisConfigMaster, clientConfiguration);
}
4. ⽣成redisTemplate操作对象
@Bean("noticeStringRedisTemplate")
public StringRedisTemplate redisTemplate(@Qualifier("factory1") RedisConnectionFactory factory){
return getStringStringRedisTemplate(factory);
}
/**
* 设置序列化⽅式(这⼀步不是必须的)
*
* @param factory
* @return
*/
private StringRedisTemplate getStringStringRedisTemplate(RedisConnectionFactory factory){
StringRedisTemplate template =new StringRedisTemplate(factory);
GenericJackson2JsonRedisSerializer jackson2JsonRedisSerializer =new GenericJackson2JsonRedisSerializer();
template.setValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
5.
5.⼩结
1. 本⽂设定了⼀种基于redis stream构建的队列机制,包含队列的消息⾃动⽣产与消费,可通过配置与变动StreamConsumerRunner中相
关代码实现⾃定义的扩容。
2. 本⽂简单使⽤单点下的redis作为数据源,读者可根据⾃⼰需求构造出集/哨兵模式下的redis连接配置
3. spring boot 在⾼版本使⽤Lettuce作为redis的默认连接⼯具
4. 借助⼯⼚模式+枚举类来屏蔽⼤量的具体策略类,借助spring的IOC机制来辅助⼯⼚模式⽣产合适的策略对象.
5. ApplicationContext可通过实现ApplicationContextAware接⼝获取
6. 可通过了解stream基础⽀持
欢迎 star fork

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。