Redis中如何实现的消息队列?实现的⽅式有⼏种?
细⼼的你可能发现了,竟然出现了三篇⽂章都是在说消息队列,,讲了 等,由此可见消息队列在整个 Java 技术体系中的重要程度。这⾥我们将重点来看⼀下 Redis 是如何实现消息队列的。
在 Redis 中实现消息队列的⽅式有⼏种?
典型回答
早在 Redis 2.0 版本之前使⽤ Redis 实现消息队列的⽅式有两种:
●使⽤ List 类型实现
●使⽤ ZSet 类型实现
其中使⽤ List 类型实现的⽅式最为简单和直接,它主要是通过 lpush、rpop 存⼊和读取实现消息队列的,如下图所⽰:redis支持的五种数据类型
lpush 可以把最新的消息存储到消息队列(List 集合)的⾸部,⽽ rpop 可以读取消息队列的尾部,这样就实现了先进先出,如下图所⽰:
命令⾏的实现命令如下:
127.0.0.1:6379> lpush mq "java" #推送消息 java
(integer)1
127.0.0.1:6379> lpush mq "msg" #推送消息 msg
(integer)2
127.0.0.1:6379> rpop mq #接收到消息 java
"java"
127.0.0.1:6379> rpop mq #接收到消息 msg
"mq"
其中,mq 相当于消息队列的名称,⽽ lpush ⽤于⽣产并添加消息,⽽ rpop ⽤于拉取并消费消息。
使⽤ List 实现消息队列的优点是消息可以被持久化,List 可以借助 Redis 本⾝的持久化功能,AOF 或者是 RDB 或混合持久化的⽅式,⽤于把数据保存⾄磁盘,这样当 Redis 重启之后,消息不会丢失。
但使⽤ List 同样存在⼀定的问题,⽐如消息不⽀持重复消费、没有按照主题订阅的功能、不⽀持消费消息确认等。
ZSet 实现消息队列的⽅式和 List 类似,它是利⽤ zadd 和 zrangebyscore 来实现存⼊和读取消息的,这⾥就不重复叙述了。但 ZSet 的实现⽅式更为复杂⼀些,因为 ZSet 多了⼀个分值(score)属性,我们可以使⽤它来实现更多的功能,⽐如⽤它来存储时间戳,以此来实现延迟消息队列等。
ZSet 同样具备持久化的功能,List 存在的问题它也同样存在,不但如此,使⽤ ZSet 还不能存储相同元素的值。因为它是有序集合,有序集合的存储元素值是不能重复的,但分值可以重复,也就是说当消息值重复时,只能存储⼀条信息在 ZSet 中。
在 Redis 2.0 之后 Redis 就新增了专门的发布和订阅的类型,Publisher(发布者)和 Subscriber(订阅者)来实现消息队列了,它们对应的执⾏命令如下:
●发布消息,publish channel “message”
●订阅消息,subscribe channel
使⽤发布和订阅的类型,我们可以实现主题订阅的功能,也就是 Pattern Subscribe 的功能。因此我们可以使⽤⼀个消费
者“queue_*”来订阅所有以“queue_”开头的消息队列,如下图所⽰:
发布订阅模式的优点很明显,但同样存在以下 3 个问题:
●⽆法持久化保存消息,如果 Redis 服务器宕机或重启,那么所有的消息将会丢失;
●发布订阅模式是“发后既忘”的⼯作模式,如果有订阅者离线重连之后就不能消费之前的历史消息;
●不⽀持消费者确认机制,稳定性不能得到保证,例如当消费者获取到消息之后,还没来得及执⾏就宕机了。因为没有消费者确认机
制,Redis 就会误以为消费者已经执⾏了,因此就不会重复发送未被正常消费的消息了,这样整体的 Redis 稳定性就被没有办法得到保障了。
然⽽在 Redis 5.0 之后新增了 Stream 类型,我们就可以使⽤ Stream 的 xadd 和 xrange 来实现消息的存⼊和读取了,并且 Stream 提供了 xack ⼿动确认消息消费的命令,⽤它我们就可以实现消费者确认的功能了,使⽤命令如下:
127.0.0.1:6379> xack mq group1 1580959593553-0
(integer)1
相关语法如下:
xack key group-key ID [ID ...]
消费确认增加了消息的可靠性,⼀般在业务处理完成之后,需要执⾏ ack 确认消息已经被消费完成,整个流程的执⾏如下图所⽰:
其中“Group”为组,消费者也就是接收者需要订阅到组才能正常获取到消息。
以上就 Redis 实现消息队列的四种⽅式,他们分别是:
●使⽤ List 实现消息队列;
●使⽤ ZSet 实现消息队列;
●使⽤发布订阅者模式实现消息队列;
●使⽤ Stream 实现消息队列。
早期版本中⽐较常⽤的实现消息队列的⽅式是 List、ZSet 和发布订阅者模式,使⽤ Stream 来实现消息队列是近两年才流⾏起来的⽅案,并且很多企业也没有使⽤到 Redis 5.0 这么新的版本。
●在 Java 代码中使⽤ List 实现消息队列会有什么问题?应该如何解决?
●在程序中如何使⽤ Stream 来实现消息队列?
知识扩展
使⽤ List 实现消息队列
在 Java 程序中我们需要使⽤ Redis 客户端框架来辅助程序操作 Redis,⽐如 Jedis 框架。
使⽤ Jedis 框架⾸先需要在 l ⽂件中添加 Jedis 依赖,配置如下:
<!-- mvnrepository/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${version}</version>
</dependency>
List 实现消息队列的完整代码如下:
import redis.clients.jedis.Jedis;
publicclass ListMQTest {
public static void main(String[] args){
// 启动⼀个线程作为消费者
new Thread(()->consumer()).start();
// ⽣产者
producer();
}
/
**
* ⽣产者
*/
public static void producer(){
Jedis jedis =new Jedis("127.0.0.1",6379);
// 推送消息
jedis.lpush("mq","Hello, List.");
}
/**
* 消费者
*/
public static void consumer(){
Jedis jedis =new Jedis("127.0.0.1",6379);
// 消费消息
while(true){
// 获取消息
String msg = jedis.rpop("mq");
if(msg != null){
// 接收到了消息
System.out.println("接收到消息:"+ msg);
}
}
}
}
以上程序的运⾏结果是:
接收到消息:Hello, Java.
但是以上的代码存在⼀个问题,可以看出以上消费者的实现是通过 while ⽆限循环来获取消息,但如果消息的空闲时间⽐较长,⼀直没有新任务,⽽ while 循环不会因此停⽌,它会⼀直执⾏循环的动作,这样就会⽩⽩浪费了系统的资源。
此时我们可以借助 Redis 中的阻塞读来替代 rpop 的⽅法就可以解决此问题,具体实现代码如下:
import redis.clients.jedis.Jedis;
public class ListMQExample {
public static void main(String[] args)throws InterruptedException {
// 消费者
new Thread(()->bConsumer()).start();
// ⽣产者
producer();
}
/**
* ⽣产者
*/
public static void producer()throws InterruptedException {
Jedis jedis =new Jedis("127.0.0.1",6379);
// 推送消息
jedis.lpush("mq","Hello, Java.");
Thread.sleep(1000);
jedis.lpush("mq","message 2.");
Thread.sleep(2000);
jedis.lpush("mq","message 3.");
}
/**
* 消费者(阻塞版)
*/
public static void bConsumer(){
Jedis jedis =new Jedis("127.0.0.1",6379);
while(true){
/
/ 阻塞读
for(String item : jedis.brpop(0,"mq")){
// 读取到相关数据,进⾏业务处理

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。