> 文章列表 > 用Redis实现消息队列的四种思路

用Redis实现消息队列的四种思路

用Redis实现消息队列的四种思路

Redis实现消息队列

面试被问到redis实现消息队列的思路,参考学习了几篇博文的内容,整理如下。深入学习请参考文末链接或其他相关资料。

消息队列

首先,消息队列是应用之间异步通信的方式,主要由三个部分组成。

生产者,消息所承载业务信息的一个实例化,整个消息的发起方。

中间的broker是消息的服务端,主要是处理消息单元,负责消息的存储、投递等功能,是核心部分。

消费者,主要负责消息的消费,具体是根据消息承载的信息处理各种逻辑。

消息队列应用场景,主要分为三种。

1.异步处理,在实时性要求不严格的一些场景,比如用户注册发送验证码,下单通知发送优惠券等。服务方只需要把协商好的消息发送到消息队列,剩下的由消费者的消息服务去处理,不需要等待消费者的返回结果就可以返回给客户端,返回业务层面。

2.应用解耦。把一些相关但耦合性不高的系统关联起来,增加整个系统的灵活度。

3.流量削峰。为了权衡高可用,把大量并行任务发送到mq,根据mq的存储、分发功能平稳处理后续业务。起到大流量缓冲的作用。

消息队列需求

设计消息队列需要根据场景需求,通常考虑是否有如下三个需求

  • 消息保序
  • 处理重复消息
  • 保证消息可靠性

Redis实现消息队列

成熟的MQ有RocketMQ、Kafka、ActiveMQ等,为什么还需要Redis来自定义实现消息队列?

  • 有些简单的业务场景,不需要重量级的 MQ 组件, Redis 相对轻量易用。

基于List实现消息队列

List常用命令

命令 用法 描述
LPUSH LPUSH key value [value …] 将一个或多个值 value 插入到列表 key 的表头如果有多个 value 值,那么各个 value 值按从左到右的顺序依次插入到表头
RPUSH RPUSH key value [value …] 将一个或多个值 value 插入到列表 key 的表尾(最右边)
LPOP LPOP key 移除并返回列表 key 的头元素
BLPOP BLPOP key [key …] timeout 移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
RPOP RPOP key 移除并返回列表 key 的尾元素。
BRPOP BRPOP key [key …] timeout 移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
BRPOPLPUSH BRPOPLPUSH source destination timeout 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它;如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
RPOPLPUSH RPOPLPUSH source destinationb 命令 RPOPLPUSH 在一个原子时间内,执行以下两个动作:将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端。将 source 弹出的元素插入到列表 destination ,作为 destination 列表的的头元素
LLEN LLEN key 返回列表 key 的长度。如果 key 不存在,则 key 被解释为一个空列表,返回 0 .如果 key 不是列表类型,返回一个错误
LRANGE LRANGE key start stop 返回列表 key 中指定区间内的元素,区间以偏移量 start 和 stop 指定
127.0.0.1:6379> lpush mylist a a b c d e 
(integer) 6 
127.0.0.1:6379> rpop mylist 
"a" 
127.0.0.1:6379> rpop mylist 
"a" 
127.0.0.1:6379> rpop mylist 
"b" 

用Redis实现消息队列的四种思路

存在问题

实时消费

LPUSH、RPOP 存在一个性能风险,生产者向队列插入数据的时候,List 不会主动通知消费者及时消费。程序需要不断轮询并判断是否为空再执行消费逻辑。

Redis 提供了 BLPOP、BRPOP 阻塞读取的命令,消费者在在读取队列没有数据的时候自动阻塞,直到有新的消息写入队列,才会继续读取新消息执行业务逻辑。

消息可靠性

List 队列中的消息一经发送出去,便从队列里删除。如果由于网络原因消费者没有收到消息,或者消费者在处理这条消息的过程中崩溃了,就再也无法还原出这条消息,缺少消息确认机制。

Redis 提供了 RPOPLPUSH、BRPOPLPUSH(阻塞)两个指令,含义是从 List 从读取消息的同时把这条消息复制到另一个 List 中(备份),并且是原子操作。

我们就可以在业务流程正确处理完成后再删除队列消息实现消息确认机制。如果在处理消息的时候宕机了,重启后再从备份 List 中读取消息处理。

127.0.0.1:6379> rpush myqueue one 
(integer) 1 
127.0.0.1:6379> rpush myqueue two 
(integer) 2 
127.0.0.1:6379> rpush myqueue three 
(integer) 3 
127.0.0.1:6379> rpoplpush myqueue queuebak 
"three" 
127.0.0.1:6379> lrange myqueue 0 -1 
1) "one" 
2) "two" 
127.0.0.1:6379> lrange queuebak 0 -1 
1) "three" 

用Redis实现消息队列的四种思路

基于发布、订阅模式

Redis 通过 PUBLISH 、 SUBSCRIBE 等命令实现了订阅与发布模式, 这个功能提供两种信息机制, 分别是订阅/发布到频道和订阅/发布到模式。

频道我们可以先理解为是个 Redis 的 key 值,模式可以理解为是一个类似正则匹配的 Key,只是个可以匹配给定模式的频道。这样就不需要显式的去订阅多个名称了,可以通过模式订阅这种方式,一次性关注多个频道。

详细过程可参考Redis消息队列的三种方案文章的相关内容

命令 用法 描述
PSUBSCRIBE PSUBSCRIBE pattern [pattern …] 订阅一个或多个符合给定模式的频道
PUBSUB PUBSUB subcommand [argument [argument …]] 查看订阅与发布系统状态
PUBLISH PUBLISH channel message 将信息发送到指定的频道
PUNSUBSCRIBE PUNSUBSCRIBE [pattern [pattern …]] 退订所有给定模式的频道
SUBSCRIBE SUBSCRIBE channel [channel …] 订阅给定的一个或多个频道的信息
UNSUBSCRIBE UNSUBSCRIBE [channel [channel …]] 指退订给定的频道

基于Streams的消息队列

描述 用法
添加消息到末尾,保证有序,可以自动生成唯一ID XADD key ID field value [field value …]
对流进行修剪,限制长度 XTRIM key MAXLEN [~] count
删除消息 XDEL key ID [ID …]
获取流包含的元素数量,即消息长度 XLEN key
获取消息列表,会自动过滤已经删除的消息 XRANGE key start end [COUNT count]
以阻塞或非阻塞方式获取消息列表 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] id [id …]
创建消费者组 XGROUP [CREATE key groupname id-or-] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
读取消费者组中的消息 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]
将消息标记为"已处理" XACK key group ID [ID …]
为消费者组设置新的最后递送消息ID XGROUP SETID [CREATE key groupname id-or-] [DESTROY key groupname]
删除消费者 XGROUP DELCONSUMER [CREATE key groupname id-or-] [DESTROY key groupname]
删除消费者组 XGROUP DESTROY [CREATE key groupname id-or-] [DESTROY key groupname] [DEL
显示待处理消息的相关信息 XPENDING key group [start end count] [consumer]
查看流和消费者组的相关信息 XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
打印流信息 XINFO STREAM [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]

list 和 zset 实现的队列都没能很好的支持多消费组的场景,Streams 数据类型,是为 redis 设计的消息队列,能支持多消费组的场景。

Streams 提供了丰富的消息队列操作命令。

  • XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
  • XREAD:用于读取消息,可以按 ID 读取数据;
  • XREADGROUP:按消费组形式读取消息;
  • XPENDING 和 XACK:XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息,而 XACK 命令用于向消息队列确认消息处理已完成。

Stream 提供了 xreadgroup 指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息 ID。它同 xread 一样,也可以阻塞等待新消息。读到新消息后,对应的消息 ID 就会进入消费者的 PEL(正在处理的消息) 结构里,客户端处理完毕后使用 xack 指令通知服务器,本条消息已经处理完毕,该消息 ID 就会从 PEL 中移除。

基于zset的消息队列

延时队列来处理的场景,比如支付订单的15分钟有效时间、物流订单30分钟同步一次最新状态等等。这种场景,你可以考虑使用 redis 的 zset 来实现,zset 结构有一个参数 score, 可以用来控制时延。

zset 是一个有序的结构,每次取第一个元素来判断能否处理,如果第一个元素都不能处理,说明之后的元素都没有到执行时间。

zset 实现队列的几个特点:

  • 消息的有序性?zset 结构本身是按照 score 有序的,因此从消息投递先后来看便是无序;你可以根据 score 参数值的大小来控制消息在队列的先后顺序
  • 重复消息处理消息可靠性保障与 list 结构实现的队列类似

具体代码和过程可参考redis消息队列,你还不敢用?

总结

redis 消息队列优点:

  • 方便,相信目前很多项目开发已引入 redis, 因此,运维、学习成本等就降低了。
  • 轻量级,开箱即用,也可以自己做一层简单封装。
  • 多样性,可根据需求选择底层队列结构。

业务上避免过度复用一个 Redis,既用它做缓存、做计算,还拿它做任务队列。redis 队列可以考虑在非核心业务中使用,比如电商会员积分、短信下发、支付倒计时甚至一些定时处理的订单状态等。

参考博文

https://www.51cto.com/article/699922.html

https://www.51cto.com/article/640335.html

https://juejin.cn/post/7094272373930590245