探究Redis4的stream类型
2
10 ⽉初,Redis 搞了个⼤新闻。别紧张,是个好消息:Redis 引⼊了名为 stream 的新数据类型和对应的命令,⼤概会在年底正式发布到 4.x 版本中。像引⼊新数据类型这样的变化在 Redis 的发展历史上⾮常罕见,所以称之为⼤新闻⼀点也不为过。⾄少很多介绍 Redis 的资料要跟着修订了。
背景
按作者的,stream 类型的想法深受 Kafka 的 stream 概念的影响,所以顺理成章沿⽤了这个名字。当然这并不意味 Redis 将提供 Kafka stream 特性的替代品,它俩依旧是两种泾渭分明的东西。Redis 的 stream 特性旨在填补 PubSub 和 Blocked list 机制间的空缺,解决这两者不能解决的问题。
Redis 的 PubSub 可以⽤来实现简单的订阅机制。⼀个或多个 client 向 Redis 订阅特定的频道,当某个 client 向这个频道发布消息
时,Redis 会把消息发送给订阅该频道的 client。需要注意的是,Redis 只负责转发消息,并不保证订阅的 client 是否真正收到了消息,⽐如client 可能正好挂掉了或者中间出了点⽹络问题。在某些情况下,这种简单的订阅机制就够⽤了;但在某些情况下,我们需要确保消息已经发布出去,PubSub 就不能满⾜
要求。
⼀个替代的⽅案是采⽤ BLPOP 等命令,也即前⽂提到的 Blocked list。client 调⽤ BLPOP(或其他类似的命令),阻塞在特定的频道上。如果有 client 发布消息(在这⾥,就是 rpush 新的值),被阻塞的 client 就会结束阻塞,得到新 rpush 进来的值。如果 Redis 没法把新消息发送给 client,那么这个消息会留在频道⾥。当 client 下次重新调⽤ BLPOP 时,就能拿回这个消息。这个⽅案听起来不错,⾄少它解决了确保消息发布的问题。但你可能也想到了,能收到特定频道的消息的只有⼀个 client,因为只要某个 client 接收了消息,消息就不再存在于频道当中了。⽽ PubSub 是⽀持⼀对多发送消息的。另⼀个问题是,每个 client 只能去获取最新的消息,对于复杂的操作,BLPOP 等命令便⽆能为⼒了。
stream 就是为了解决以上问题才提出来的。
⽤法
遵循其他数据类型的惯例,操作 stream 类型的键的命令都以X开头。
(由于 stream 特性尚未正式发布,且部分特性还处于 TODO 状态,下⾯内容肯定会有所变更。如果有改动,我会修订这部分的内容)
添加操作
XADD key [MAXLEN [~] <count>] <ID or *> [field value] [field value] ...
stream 跟 hash ⼀样,有 subkey 的概念。上⾯命令⾥的 ID 就是指 subkey。⼀般情况下,你不需要指定 ID,仅需提供*来让 Redis ⽣成⼀个 ID。Redis ⽣成的 ID 格式如下:$ms-$seq。其中$ms指当前的 13 位毫秒时间戳,$seq指给定 key 在当前毫秒时间戳下的序列号(从 0 开始),中间以-隔开。早前⽤的分隔符是.,后来考虑到xx.yy这种形式太容易错当作浮点数了,所以改⽤-。如果 Redis ⽣成 ID 的时候,当前毫秒时间戳跟上⼀个 ID 的时间戳⼀样,它会把序列号加⼀。假使服务器发⽣时间回拨的情况,Redis 会沿⽤上⼀个 ID 的时间戳,只是把序列号加⼀。实际上这种⽣成 ID 的机制并⾮为了记录创建的时间,仅仅⽤于⽣成递增的 ID。你也可以在调⽤时指定⾃⼰⽣成的 ID。
[field value] ...这部分指定的是 stream key 对应 ID 的值。每个 ID 带的 field 可以不同。
取长度操作
XLEN key
返回长度,就是这样。
读取操作
XRANGE key start end [COUNT <n>]
XRANGE 返回某个 stream 给定范围内的 ID 所对应的值。你可以通过 COUNT 指定返回的值的最⼤数⽬。
举个例⼦,像这样创建两个 ID:
127.0.0.1:6379> xadd test * apple 1
1507383725597-0
127.0.0.1:6379> xadd test * binana 2
1507383735965-0
如下的 XRANGE 操作能够返回这两个 ID 的值。
127.0.0.1:6379> xrange test 1507383725597-0 1507383735965-0
1) 1) 1507383725597-0
2) 1) "apple"
2) "1"
2) 1) 1507383735965-0
2) 1) "binana"
2) "2"
⼤多数情况下,你⽤到的是-和+这两个特殊 ID 值,像这样:xrange test 1507383725597-0 +。前者表⽰ ID 范围的起始位置,后者表⽰ ID 范围的末尾位置。
XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>]
[RETRY <milliseconds> <ttl>] STREAMS key_1 key_2 ... key_N
ID_1 ID_2 ... ID_N
如果想同时读取多个 stream 的值,需要⽤到 XREAD。XREAD 能够返回给定多个 stream 的某个起始ID 之后的数据。我加粗了之后两个字,因为跟 XRANGE 不同,XREAD 不返回起始ID 的值。你可以通过 COUNT 指定各个 stream 返回的值的最⼤数⽬。
XREAD 的阻塞是可选的,你可以通过 BLOCK 参数去指定允许阻塞的时间。如果不指定,表⽰不阻塞,⽴刻返回 nil。注意这⼀点跟BLPOP 不同,BLPOP ⼀类的命令,默认是永久阻塞的。
XREAD 主要的参数是 STREAMS 后⾯的 key 和起始ID 列表。key 和起始ID 需要是⼀⼀对应的,有多少个 key 就要指定多少个起始ID。跟 XRANGE ⼀样,起始ID 也可以是-和+这样的特殊值。注意由于-表⽰ ID 范围的起始位置,⽽不是第⼀个 ID,所以⽤-可以获取第⼀个ID 的值。除此之外,起始ID 还可以是$,表⽰获取命令执⾏之后的新增 ID 的值。显然,$只有跟 BLOCK ⼀起⽤才有意义。
RETRY/GROUP:尚未实现 TODO。
删除操作
stream 不⽀持“改”操作,所以“增删查改”还剩个“删”没讲。stream 没有专门的删命令。还记得介绍 XADD 时展⽰的 MAXLEN 参数吗?在XADD 命令添加了新的 ID 之后,如果命令指定的 MAXLEN 超过了当前 stream 包含的 ID 的个数,Redis 会删除多出来的部分。
重新贴下 MAXLEN 的格式:XADD MAXLEN [~] <count> ...。count 决定了 MAXLEN 的值。如果 MAXLEN 和 count 之间没有插⼊~,表⽰精确地保留count个 ID;如果插⼊了~,表⽰保留⼤约count个 ID。我会在“实现”这⼀节解释所谓的“精确”和“⼤约”的区别。
⽤途
stream 很⼤程度上类似于 Blocked list,但是它的操作更加⾃由,不再受限于只能读取最新的值,也不
再拘束于只能让单个 client 读取值。跟 PubSub 相⽐,stream 允许 client 重新获取发布过的值,提供了更强的保障。
实现
Redis 把每个 stream 实现成以 ID 的值为 key 的前缀树,外加 length(当前的 ID 数)等元数据。考虑到默认⽣成的 ID 是毫秒时间戳+序列号,采⽤前缀树的形式可以节省下⼤量的空间。毕竟差⼏千毫秒的两个 ID,也会有前九位是完全相同的。另外前缀树还允许随机访问某个起始ID。
不过并⾮每个 ID 都是独占⼀个节点。每当插⼊⼀个新的 ID 时,Redis 会先访问前缀树的最⼤的节点(毕竟 ID 是递增的),如果这个节点不⼤于STREAM_BYTES_PER_LISTPACK(2048字节),新的 ID 会被插⼊到这个节点⾥⾯;否则才会创建新的节点。在查⼀个 ID
时,Redis 会查最后⼀个⽐该 ID ⼩的节点,然后从该节点往后遍历,直到到该 ID 为⽌。在我看来,⼀个节点⾥包含多个 ID 的设计,有利于 ID 遍历的操作。这种设计避免了在遍历时频繁访问新的节点,更好地利⽤了 CPU 的本地缓存。
每个节点具有这样的结构:
+--------------+---------+---------+--/--+---------+
| master_entry | entry_1 | entry_2 | ... | entry_N |
+--------------+---------+---------+--/--+---------+
其中
master_entry:
+-------+---------+------------+---------+--/--+---------+---------+
| count | deleted | num-fields | field_1 | field_2 | ... | field_N |
+-------+---------+------------+---------+--/--+---------+---------+
entry_x(SAMEFIELDS):
+-----+--------+-------+-/-+-------+
|flags|entry-id|value-1|...|
+-----+--------+-------+-/-+-------+
或者
+-----+--------+----------+-------+-------+-/-+---+
|flags|entry-id|num-fields|field_1|value_1|...|
+-----+--------+----------+-------+-------+-/-+---+
当节点被创建时,会以第⼀个插⼊的 ID 初始化master_entry的值。显然,count 的初始值是 1,deleted 的初始值是 0,num-fields 等于该 ID 对应的 field 数⽬,后⾯的多个 field 则是该 ID 对应的 field 数。在插⼊master_entry之后,还会新增⼀个 entry 来记录额外的 field 和每个 field 对应的 value。这个新增的 entry 的 entry-id 取 ID 跟前缀树节点的 key 的差。第⼀个 ID 的 entry-id 为 0,因为当前节点的 key 就是这个 ID,两者不存在差异。之后每插⼊⼀个新的 ID,都会更新master_entry的 count 数,并插⼊对应的 entry。当然插⼊新 ID 的同时也不忘更新length 等元数据。
前⾯提到,每个 ID 带的 field 可以不同。但是在实际的使⽤中,每个 ID 带的 field 基本是相同的。所以 Redis 做了个优化:如果新增的 ID
的 field 跟master_entry完全⼀样,entry ⾥⾯会设置⼀个名为 SAMEFIELDS 的 flags,并仅记录 value 的值。除⾮新增 ID 的 field
跟master_entry有些不同,entry ⾥⾯才会记录新增 ID 的所有 field 和对应的 value。
最后说⼀下删除操作。由于
1. stream 的删除操作,只⽀持保留特定数⽬的 ID 数
2. stream 会记录全部的 ID 数(length)
3. stream 的数据结构⼤体上是⼀个前缀树,前缀树的每个节点包含 count 个 ID
所以删除操作,就是
1. 从前往后遍历,减去每个前缀树节点的 count,直到 length 等于 XADD 指定的 MAXLEN,或者减去下⼀个节点后剩下的 length 会⼩
于 MAXLEN
2. 如果减去某个节点后,剩下的 length ⼩于 MAXLEN,Redis 会遍历该节点,设置若⼲个 entry 的 flags 为 DELETED 直到 length 等于
redis支持的五种数据类型MAXLEN,更新 count 和 deleted 两个域。
如果 XADD 命令指定的 MAXLEN 包含~,则表⽰⼤约保留 MAXLEN 个 ID。在这种情况下,Redis 只会完成上⾯的第⼀步。换句话说,选择“⼤约”能省下对某个节点进⾏遍历的时间。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。