【Redis09】Redis基础:Stream操作
Redis基础学习:Stream操作
又来一个不好理解的东西,Stream 类型,而且它是整个 Redis 中对于数据操作最复杂的一种类型。但话又说回来,其实这个东西吧,还是个队列,只不过又是一种换了形式的队列。并且呢,据说是受到很多 Kafka 的影响,我对于 Kafka 仅仅是搭过环境的水平,完全没法用它来进行比较,所以我们的重点还是以理解 Redis 中的 Stream 为主吧。
回顾下我们之前学习过的队列类的数据类型,有 List 可以做普通的队列、有 Sorted Set 可以做定时延时类的队列、有 PubSub 可以做广播类型的系统,那么现在这个 Stream 又是干嘛的呢?
它呀,和广播有点像,也是多个客户端消费者可以接收,然后呢?数据不会 POP ,也就是说数据还在,消费者可以根据需要指定消费哪些数据,可以把之前的消息再用别的业务处理客户端消费一次,或者仅仅只等待消费最新的。然后它还可以分组,如果某个分组的消费出问题了还可以转移给别的消费者去处理。
是不是感觉有点懵逼?别急,我也懵逼,但是作用已经很清晰了,这是一个可以作为高可用、可存储的队列系统,相对于我们上回讲的 Pub/Sub 的问题,它是真的一个完整的多消费者队列功能实现,真有点正式的 消息队列 工具的意思。
基本操作
基本操作无外乎就是添加、修改、删除之类的了,我们一个一个来看下。
XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]
添加 Stream 数据使用的是 XADD 命令,它的参数很多,我们就学习最简单的步骤就好了。
127.0.0.1:6379> XADD a * name boy age 20
"1652342833001-0"
127.0.0.1:6379> XADD a * name girl age 17
"1652342848364-0"
中间那个 * 是啥意思?注意看命令的签名,*|ID ,意思是我们可以为数据指定一个 ID ,如果使用 * 的话就是自动生成一个 时间戳-序号 的 ID ,如果要自己手动指定的话,也必须是有个 - 的形式,比如 0-1、1111-1212 这样的格式。ID 可以用于保证消息的有序、不重复。
添加的内容是键值对形式的数据,和 Hash 一样,然后返回的内容就是新添加数据的 ID 。接下来就是数据的查询,XRANGE 命令就可以了,注意它的参数是 ID ,Stream 类型要求 ID 必须是递增有序的,所以可以使用 ID 来进行范围查询。
127.0.0.1:6379> XRANGE a - +
1) 1) "1652342833001-0"2) 1) "name"2) "boy"3) "age"4) "20"
2) 1) "1652342848364-0"2) 1) "name"2) "girl"3) "age"4) "17"
127.0.0.1:6379> XRANGE a 1652342833001-0 +
1) 1) "1652342833001-0"2) 1) "name"2) "boy"3) "age"4) "20"
2) 1) "1652342848364-0"2) 1) "name"2) "girl"3) "age"4) "17"
127.0.0.1:6379> XRANGE a 1652342833002 +
1) 1) "1652342848364-0"2) 1) "name"2) "girl"3) "age"4) "17"
+ 和 - 号代表的是最大值和最小值的意思,也就是全部查找。默认的 XRANGE 是正序查找,也就是根据 ID 从小到大,所以要用 - 号在前的写法,它的反向查找命令则是从大到小的查找,我们后面会看到。
对于删除来说,使用的是 XDEL 命令。
127.0.0.1:6379> XDEL a 1652342833001-0
(integer) 1
127.0.0.1:6379> XRANGE a - +
1) 1) "1652342848364-0"2) 1) "name"2) "girl"3) "age"4) "17"
127.0.0.1:6379> XDEL a 1652342848364
(integer) 1
127.0.0.1:6379> ttl a
(integer) -1
在这里,我们最后还用 ttl 试了一下 a 这个 key ,你会发现它竟然还存在。默认情况下,其它的数据类型比如 Hash 或者 List 之类的,如果数据删光了,那么这个 key 也会删掉,但是 Stream 不会,为什么呢?其实就是为了消费者监听的时候不至于出现问题嘛。这里是比较特别的一个地方,也是可以用在面试或者被面试情况下的一个小知识点哦。
普通的基本操作就是上面那些了,接下来我们试试相同 ID 以及小于当前最大 ID 的情况下能不能添加数据。
127.0.0.1:6379> XADD a 0-1 name wx age 4
"0-1"
127.0.0.1:6379> XADD a 0-1 name wx age 4
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
127.0.0.1:6379> XADD a 0-2 name wx age 4
"0-2"
127.0.0.1:6379> XADD a 1-1 name wx age 4
"1-1"
127.0.0.1:6379> XADD a 1-5 name wx age 4
"1-5"
127.0.0.1:6379> XADD a 1-3 name wx age 4
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
很明显,相同的 ID 必然是无法添加的,然后我们测试的是直接将 ID 设置到 1-5 ,完了再添加 1-3 的数据,同样也是无法添加成功的。这个序列是前后都要大于,比如说我们如果换成 2-1 就可以插入了,- 前面的先比较,相同的再比较后面的序号。
最后,XREVRANGE 可以反向查询,也就是根据 ID 从大到小查询,另外 XRANGE 和 XREVRANGE 也都有 COUNT 参数,可以控制返回数据的数量。
127.0.0.1:6379> XREVRANGE a + -
1) 1) "1-5"2) 1) "name"2) "wx"3) "age"4) "4"
2) 1) "1-1"2) 1) "name"2) "wx"3) "age"4) "4"
3) 1) "0-2"2) 1) "name"2) "wx"3) "age"4) "4"
127.0.0.1:6379> XREVRANGE a 0 1
(empty array)
127.0.0.1:6379> XREVRANGE a 0-1 1-1
(empty array)
127.0.0.1:6379> XREVRANGE a 1-1 0-1
1) 1) "1-1"2) 1) "name"2) "wx"3) "age"4) "4"
2) 1) "0-2"2) 1) "name"2) "wx"3) "age"4) "4"
127.0.0.1:6379> XREVRANGE a + - count 1
1) 1) "1-5"2) 1) "name"2) "wx"3) "age"4) "4"
监听项目
要是仅仅是查询的话,那么 Stream 其实也就并没有什么特别的地方了。前面说过,它也是一种队列系统,那么如果我们要监听新来的消息要怎么办类?当然有相关的命令可以供我们使用了,第一个就来看看 XREAD 。
XREAD
其实他和 XRANGE 很像,不过是根据指定 ID 读取大于这个 ID 的最新数据。
127.0.0.1:6379> xread count 2 streams a 0
1) 1) "a"2) 1) 1) "0-2"2) 1) "name"2) "wx"3) "age"4) "4"2) 1) "1-1"2) 1) "name"2) "wx"3) "age"4) "4"
我们给定的是 0 ,就从 0-xxx 的数据开始,通过 COUNT 可以指定返回数据的条数。如果更换一下 ID 信息,就可以很明显地看到不同。
127.0.0.1:6379> xread streams a 1
1) 1) "a"2) 1) 1) "1-1"2) 1) "name"2) "wx"3) "age"4) "4"2) 1) "1-5"2) 1) "name"2) "wx"3) "age"4) "4"
127.0.0.1:6379> xread streams a 1-5
(nil)
127.0.0.1:6379> xread streams a 1-1
1) 1) "a"2) 1) 1) "1-5"2) 1) "name"2) "wx"3) "age"4) "4"
如果只是这样,未免就太没意思了。XREAD 是可以阻塞运行的,同时,还有一个特殊的 ID 值,可以让我们去获取最新添加进来的数据。
127.0.0.1:6379> xread block 0 streams a $
1) 1) "a"2) 1) 1) "1-6"2) 1) "name"2) "wx"3) "age"4) "4"
(12.95s)
BLOCK 参数,后面的 0 表示的是超时时间。添加了这个参数之后是不是就非常像 BLPOP 之类的命令了。然后 $ 这个 ID 符号表示的就是等待并获取最新添加进来的数据。注意,不会显示之前已经存在的老数据。
这一套下来有点像什么呢?其实非常类似于我们在 Linux 中使用的 tail -f 这个命令。
是不是看出不同了?List 取出数据是需要 POP ,之后 List 中这条数据就没有了。PubSub 是订阅一个频道,PUBLISH 往这个频道发送消息,如果之前你没有订阅,这个频道之前接收过的内容也是直接没有了。而 XREAD ,不仅可以获得最新的,老的也没有删掉,并且同时还可以阻塞进行。
消费者组
简单地理解 ,消费者组就是可以把多个客户端划分到同一个组中。然后,不同的组一起去消费这个 Stream 队列中的内容,和 Pub/Sub 的广播形式很像。不同的是,Stream 中可以让几个客户端划分到不同的组中,虽说还是处理数据吧,但是队列中的数据因为并没有消失,所以可以重复处理或者转交给其它消费者组处理,从而应对一些特殊情况,比如说某个消费者组出现问题,或者有大量堆积之类的。
要使用消费者组我们要先创建组。
127.0.0.1:6379> XGROUP create a agroup $
OK
命令很简单,XGROUP 命令,紧跟着 CREATE 参数,接着指定 Stream 数据的 key ,后面是组的名称,最后还有一个 ID ,我们可以直接使用上面学习过的 $ ,表示的就是接收最新 XADD进来的数据。然后我们就添加一些数据,注意,因为使用了 $ ,所以要在创建完组之后再添加数据,要不在组里面是查不到数据的。
127.0.0.1:6379> XADD a * msg red
"1652346780402-0"
127.0.0.1:6379> XADD a * msg blue
"1652346783165-0"
127.0.0.1:6379> XADD a * msg yellow
好了,我们可以通过 XREADGROUP 命令来获取组中的数据,它的参数签名是这样的。
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
GROUP 后面要跟上组名并且起一个消费者名字。然后可以指定数量,也可以阻塞 BLOCK 。再之后是 STREAMS 后面需要跟着指定的 Stream 键名。就像下面这样使用。
127.0.0.1:6379> XREADGROUP group agroup c1 count 1 streams a >
1) 1) "a"2) 1) 1) "1652346780402-0"2) 1) "msg"2) "red"
127.0.0.1:6379> XREADGROUP group agroup c1 count 1 streams a 0
1) 1) "a"2) 1) 1) "1652346780402-0"2) 1) "msg"2) "red"
奇怪了,怎么又来了一个 > 符号?这个符号表示 目前为止从未传递给其他消费者 的消息内容。另外我们也可以指定一个 ID 或者给个 0 ,也能看到这条 red 数据。注意,我们一定要先通过 > 取出一条数据,然后再使用 0 才能读到这条数据。如果直接上来就是使用 0 的话,是查不到数据的,大家可以自己试试。
为什么要这样呢?其实 > 之后,数据是被读到了当前这个消费者组的pending entries list
中,它代表的是 待办实体列表 。我们在 c1 这个消费者中,会维护一个自己的这个列表,表示 c1 拿到了这个数据。但是,现在并不表示这个数据就被处理完成了。看不懂没关系,我们再向下看。
127.0.0.1:6379> XREADGROUP group agroup c1 count 1 streams a >
1) 1) "a"2) 1) 1) "1652346783165-0"2) 1) "msg"2) "blue"
127.0.0.1:6379> XREADGROUP group agroup c1 count 10 streams a 0
2) 1) "a"2) 1) 1) "1652346780402-0"2) 1) "msg"2) "red"2) 1) "1652346783165-0"2) 1) "msg"2) "blue"
127.0.0.1:6379> XACK a agroup 1652346780402-0
(integer) 1
127.0.0.1:6379> XREADGROUP group agroup c1 count 1 streams a 0
1) 1) "a"2) 1) 1) "1652346780402-0"2) 1) "msg"2) "red"
首先,我们再拿过来一条数据,现在 c1 里面有两条数据了。接着我们使用 XACK 去确认这条数据被处理完了。再继续看,pending entries list 中就没有 red 这条命令了。然后我们继续确认消费掉 blue 这条数据,现在整个 pending entries list 就空了。ACK 应答机制也是正式的消息队列中都有的功能。
127.0.0.1:6379> XACK a agroup 1652346783165-0
(integer) 1
127.0.0.1:6379> XREADGROUP group agroup c1 count 1 streams a 0
1) 1) "a"2) (empty array)
我们可以继续再读取新的数据进来。
127.0.0.1:6379> XREADGROUP group agroup c1 count 1 streams a >
1) 1) "a"2) 1) 1) "1652346783165-0"2) 1) "msg"2) "yellow"
127.0.0.1:6379> XREADGROUP group agroup c1 count 1 streams a 0
1) 1) "a"2) 1) 1) "1652346785825-0"2) 1) "msg"2) "yellow"
有意思吧?为什么要有这样一个机制呢?如果是普通队列,假设在处理过程中出现异常了,其实这条消息就被 POP 掉了,如果要恢复再次处理的话我们就要把它重新加入到队列中。而 Stream 则是通过确认机制,在我们确定消费完成之后,进行确认操作,之后这条数据才算是彻底被消费掉了。同时,需要注意的是,这只是在这一个组中处理的。假如当前我们的消费者 c1 出现了问题,那么我们可以把这条信息再转移给另一个消费者 c2 ,正好 c2 是个日志记录或者其它相关处理的消费者程序,这时就可以进行记录错误信息或者重新入队的操作了,甚至就把 c2 当成是一个“死信队列”(参考 RabbitMQ)。
转移
先再添加两条数据。
127.0.0.1:6379> XADD a * msg pink
"1652347643861-0"
127.0.0.1:6379> XADD a * msg white
"1652347649468-0"
然后我们再拿过来一条数据放到 c1 中,查看当前已经有两条数据。
127.0.0.1:6379> XREADGROUP group agroup c1 count 1 streams a >
1) 1) "a"2) 1) 1) "1652347643861-0"2) 1) "msg"2) "pink"
127.0.0.1:6379> XREADGROUP group agroup c1 count 2 streams a 0
1) 1) "a"2) 1) 1) "1652346785825-0"2) 1) "msg"2) "yellow"2) 1) "1652347643861-0"2) 1) "msg"2) "pink"
然后我们再通过 XREADGROUP 来指定一个 c2 消费者读取当前组中的数据。
127.0.0.1:6379> XREADGROUP group agroup c2 count 1 streams a >
1) 1) "a"2) 1) 1) "1652347649468-0"2) 1) "msg"2) "white"
好了,接下来我们使用 XPENDING 来查看当前组中的 pending entries list 情况。一共有三条 pending 数据,最小的是 1652346785825-0 ,最大的是 1652347649468-0 ,有两条 c1 在处理,有一条 c2 在处理。之前的 red 和 blue 我们已经 XACK 掉了,所以这里就只有这三条了。
127.0.0.1:6379> XPENDING a agroup
1) (integer) 3
2) "1652346785825-0"
3) "1652347649468-0"
4) 1) 1) "c1"2) "2"2) 1) "c2"2) "1"
同时,我们也可以指定消费者及使用范围参数进行查询。
127.0.0.1:6379> XPENDING a agroup - + 10 c2
1) 1) "1652347649468-0"2) "c2"3) (integer) 348694) (integer) 4
好了,现在情况就是这么个情况,c1 中有两条未确认的信息,c2 中有一条,现在 c1 处理不了了,我想把其实中一条转移到 c2 来处理。非常简单,使用 XCLAIM 命令就可以。
127.0.0.1:6379> XCLAIM a agroup c2 0 1652346785825-0
1) 1) "1652346785825-0"2) 1) "msg"2) "yellow"
然后再使用 XPENDING 看一下,怎么样,c1 变成一条,c2 变成 2 条了吧。
127.0.0.1:6379> XPENDING a agroup
1) (integer) 3
2) "1652346785825-0"
3) "1652347649468-0"
4) 1) 1) "c1"2) "1"2) 1) "c2"2) "2"
您要还不信,咱们直接 XREADGROUP 查询一下 c2 ,是不是 yellow 这条数据已经过来了呀。
127.0.0.1:6379> XREADGROUP group agroup c2 count 2 streams a 0
1) 1) "a"2) 1) 1) "1652346785825-0"2) 1) "msg"2) "yellow"2) 1) "1652347649468-0"2) 1) "msg"2) "white"
总结一下,在组中,只要是没有 XACK 的数据,其实都是还可以再次进行消费的数据,你可以直接查询出来那么就是可以再消费的数据。而真正被消费掉的数据只要确认一下就可以了。
那么如果是不同的组呢?比如我们再建立一个消费者组,然后还是从 a 中读取数据。这个时候,两个组会同时消费一样的数据。注意了,划重点,只有一个组中不同的消费者会拿到不同的数据,而不是不同的组,不同的组是能够消费相同的数据的。大家可以自己尝试一下哦。
其它
好了,核心内容说完了,说实话,还是比较懵逼的。不过大概的概念大家应该能看得明白了吧,实在不行就当成是一个比较高级的队列,再不行等咱学完了其它正式的消息队列系统之后再回来研究一下呗。希望到时候别直接把我上面的理解全都颠覆了,直接就说这 TM 写得啥玩意,压根不对!!
学习嘛,就是这样,对错也只是一时的,现在能理解到什么程度就是什么程度,将来发现现在的理解有问题反而是好事,也能更加加深对于这个知识点的印象。
剩下的就是一些辅助命令了。首先就是 XINFO ,一看名字就是查看 Stream 类型的一些信息用的。
127.0.0.1:6379> XINFO stream a1) "length"2) (integer) 53) "radix-tree-keys"4) (integer) 15) "radix-tree-nodes"6) (integer) 27) "last-generated-id"8) "1652347649468-0"9) "groups"
10) (integer) 1
11) "first-entry"
12) 1) "1652346760526-0"2) 1) "msg"2) "begin"
13) "last-entry"
14) 1) "1652347649468-0"2) 1) "msg"2) "white"
127.0.0.1:6379> XINFO groups a
1) 1) "name"2) "agroup"3) "consumers"4) (integer) 25) "pending"6) (integer) 37) "last-delivered-id"8) "1652347649468-0"
127.0.0.1:6379> XINFO consumers a agroup
1) 1) "name"2) "c1"3) "pending"4) (integer) 15) "idle"6) (integer) 544241
2) 1) "name"2) "c2"3) "pending"4) (integer) 25) "idle"6) (integer) 421324
它可以查看 Stream 信息,也可以查看指定的组以及组内的消费者信息,同时用它也可以看到 pending 中的信息,感觉不错吧。然后是 XLEN ,比较简单,就是 Steam 中的数据数量。
127.0.0.1:6379> XLEN a
(integer) 5
看到没有,上面我们不管是 XREAD 还是 XREADGROUP 操作了半天,本质上对 Stream 根本没啥影响,别人还是老老实实的 5 条数据。
127.0.0.1:6379> XGROUP destroy a agroup
(integer) 1
127.0.0.1:6379> XINFO consumers a agroup
(error) NOGROUP No such consumer group 'agroup' for key name 'a'
XGROUP 可以创建,使用的是 CREATE ,那么必须也可以删组嘛,使用的就是 XGROUP DESTROY 命令。关于 XGROUP 更多的命令选择,可以使用 XGROUP HELP 来查看。
裁剪
裁剪是啥?好吧,还是来简单的理解,就是我们这个 Stream 中只留下指定数量的数据,别的都被裁剪掉了。
127.0.0.1:6379> xtrim a maxlen 4
(integer) 2
使用的就是 XTRIM 命令,MAXLEN 就是留下的最大数量,然后裁剪掉的是最早的那些数据,比如我们这里就裁剪掉了最早的 red 和 blue 。
127.0.0.1:6379> xrange a - +
1) 1) "1652346785825-0"2) 1) "msg"2) "yellow"
1) 1) "1652347643861-0"2) 1) "msg"2) "pink"
2) 1) "1652347649468-0"2) 1) "msg"2) "white"
总结
好了,到此为止,整个 Redis 中和数据类型或数据结构相关的基础知识我们就学习完了。当然还有别的基本命令可能会和这些数据类型相关的内容有关,等我们学习到的时候再说。后面基础系列还将继续学习其它的相关命令,如果你也和我一样想要完全从头好好学一遍 Redis 的话,紧跟着大部队的步伐不要掉队哦。