Redis5.0Stream实现轻量化消息队列(⼀⽂实现Java实战完
整版)
背景
本⼈近期在搞⼀个轻量化部署,需要⽤到消息队列,但是感觉kafka相对较重,所以最终选择了⼀个相对轻量化消息队列“Redis Stream”。感觉⽹上的Java实现不是很好,经过⼀段时间摸索,决定将完整的可运⾏的使⽤Java实现Redis消息队列写出来,供⼤家参考。代码已上传⾄gitee,⽂末可下载。
还是希望⼤家能够耐⼼的看完,相信看完本⽂可以帮助您快速了解Redis作为消息中间件在Java环境中的开发。
撸代码之前需要掌握
⾸先需要了解⼀下Stream的基础知识,这⾥给个链接,⾥⾯是针对官⽹的翻译版,可以先⾏参照了解⼀下每个命令的使⽤
本⽂⽤到的命令如下:
> XADD mystream  * hello world    创建⼀个mystream 流
> XGROUP CREATE mystream group-1 $    创建消费组group-1
> XGROUP CREATE mystream group-2 $  创建消费组group-2
> XRANGE mystream - +    查询流中的消息
> XPENDING mystream group-1    没有组中没有ACK的消息
> XPENDING mystream group-1 0 + 10 consumer-1    查看消费中consumer-1中没有消费的消息
还包括XCLAIM 转组命令、XTRIM 定时清理流数据命令等在java中实现
环境准备
我使⽤的是redis5.0.2,这⾥直接为⼤家送上redis在linux上安装的源⽂件。
准备三个Spring boot⼯程,⼀个⽣产者producer,两个消费者consumer1、consumer2
开撸~撸~
⾸先需要创建我们的流stream,以及相应的组group,这个可以⼿动在redis中创建,也可以代码⾃动
创建。备注:这⾥再啰嗦⼀句,对于这些流以及组的概念本⽂就不进⾏重述了,还望⼤家先了解⼀下基本的操作。
我们这⾥创建⼀个流:mystream;两个组:group-1、group-2
127.0.0.1:6379> XADD mystream * hello world
"1617952839936-0"
127.0.0.1:6379> XGROUP CREATE mystream group-1 $
OK
127.0.0.1:6379> XGROUP CREATE mystream group-2 $
OK
127.0.0.1:6379>
下⾯我们开始通过Java代码来实现⽣产者和消费者,我使⽤的Spring Boot版本是2.4.3
⽣产者
代码结构如下
pom中需要引⼊如下
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apachemons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
配置⽂件l,链接redis设置流的名称
server:
port: 8080
servlet:
context-path: /
spring:
redis:
database: 0
host: 192.168.44.129
port: 6379
password:
timeout: 0java stream
lettuce:
pool:
max-active: 8
max-wait: -1
max-idle: 8
min-idle: 0
redisstream:
stream: mystream
RedisStreamConfig只是做了读取配置中流的名称
@Data
@Component
@ConfigurationProperties(prefix = "redisstream")
public class RedisStreamConfig {
private String stream;
}
PublishService类中做了简单的发送操作,这⾥我们通过调⽤该test⽅法可以将数据发送到相应的流中
@Service
public class PublishService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RedisStreamConfig redisStreamConfig;
public void test(String msg){
// 创建消息记录, 以及指定stream
StringRecord stringRecord = StreamRecords.string(Collections.singletonMap("name", msg)).Stream1());            // 将消息添加⾄消息队列中
this.stringRedisTemplate.opsForStream().add(stringRecord);
}
}
RedisStreamController类⾥⾯做了⼀个简单的调⽤
@RestController
public class RedisStreamController {
@Autowired
private PublishService publishService;
@GetMapping("produceMsg")
public void produceMsg(@Param("msg")String msg){
}
}
⾄此⼀个简单的Redis⽣产者就已经完成了,我们来测试⼀下
打开浏览器输⼊:
执⾏完成,通过命令>XRANGE mystream - + 我们可以看到刚输⼊的“nihao”已经存⼊该流中。
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1617952839936-0"
2) 1) "hello"
2) "world"
2) 1) "1617955133254-0"
2) 1) "name"
2) "nihao"
127.0.0.1:6379>
消费者
代码结构如下,只需要关注红框内的⽂件就好,pom⽂件同⽣产者
server:
port: 8081
servlet:
context-path: /
spring:
redis:
database: 0
host: 192.168.44.129
port: 6379
password:
timeout: 0
lettuce:
pool:
max-active: 8
max-wait: -1
max-idle: 8
min-idle: 0
redisstream:
stream: mystream
group: group-1
consumer: consumer-1
RedisStreamConfig类⽂件如下,简单的获取相应的配置
@Data
@Component
@ConfigurationProperties(prefix = "redisstream")
public class RedisStreamConfig {
private String stream;
private String group;
private String consumer;
}
RedisStreamConsumerConfig类如下,重点都在这⾥,具体的功能写的还算详细,这⾥⾯只要包含了将消费者监听类绑定到响应的流上,以及拉取消息的⼀些配置。
这⾥⾯我们关闭了ACK⾃动消费,我们在消息监听类⾥⾯进⾏⼿动消费
@Configuration
public class RedisStreamConsumerConfig {
@Autowired
@Autowired
ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Autowired
RedisStreamConfig redisStreamConfig;
/**
* 主要做的是将OrderStreamListener监听绑定消费者,⽤于接收消息
*
* @param connectionFactory
* @param streamListener
* @return
*/
@Bean
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> consumerListener1(
RedisConnectionFactory connectionFactory,
OrderStreamListener streamListener) {
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
Stream(), connectionFactory, streamListener);
container.start();
return container;
}
/**
* @param mystream          从哪个流接收数据
* @param connectionFactory
* @param streamListener    绑定的监听类
* @return
*/
private StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamContainer(String mystream, RedisConnectionFactory connectionFacto
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(5)) // 拉取消息超时时间
.batchSize(10) // 批量抓取消息
.targetType(String.class) // 传递的数据类型
.executor(threadPoolTaskExecutor)
.build();
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer
.create(connectionFactory, options);
//指定消费最新的消息
StreamOffset<String> offset = ate(mystream, ReadOffset.lastConsumed());
//创建消费者
Consumer consumer = Consumer.Group(), Consumer());
StreamMessageListenerContainer.StreamReadRequest<String> streamReadRequest = StreamMessageListenerContainer.StreamReadRequest.builder(offs                .errorHandler((error) -> {
})
.cancelOnError(e -> false)
.consumer(consumer)
//关闭⾃动ack确认
.autoAcknowledge(false)
.build();
//指定消费者对象
return container;
}
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
template.setConnectionFactory(factory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。