基于Redis实现消息队列

源码 2024-9-4 15:17:34 22 0 来自 中国
基于Redis实现消息队列

1.业务场景

假设在没有专业消息中心件的情况下,又要通过消息队列去解耦。redis是个更好的选择。
2.实现方式

扼要分析实现方式,这里只做个大概的概括

  • 发布与订阅(缺点:范例的一对一,不支持多个消耗者公平消耗消息,消息无法长期化,如果出现网络断开、Redis 宕机等,消息就会被抛弃等题目)
  • list队列(缺点:没有很好 ACK 机制,没有 ConsumerGroup 消耗组,不支持一对多消耗等题目)
  • stream队列(保举)官方:https://redis.io/docs/data-types/streams/
3.概念

Redis5.0带来了Stream类型。着实就是Redis对消息队列(MQ,Message Queue)的完满实现。
重要有几个概念:
1.消耗者组(Consumer Group):一个消耗组有多个消耗者(Consumer), 这些消耗者之间是竞争关系。也就是说不会出现重复消耗的场景。
2.pending_ids :消耗者(Consumer)的状态变量,作用是维护消耗者的未确认的 id。 pending_ids 记载了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。
3.last_delivered_id :游标,每个消耗组会有个游标 last_delivered_id,恣意一个消耗者读取了消息都会使游标 last_delivered_id 往前移动。
4.消息ID: 消息ID的情势是timestampInMillis-sequence,比方1527846880572-5
这里扼要贴出Redis中Stream使用的相干指令
着实像代码,都是基于下令的高度封装

消息队列相干下令:

  • XADD - 添加消息到末了
  • XTRIM - 对流举行修剪,限定长度
  • XDEL - 删除消息
  • XLEN - 获取流包罗的元素数量,即消息长度
  • XRANGE - 获取消息列表,会自动过滤已经删除的消息
  • XREVRANGE - 反向获取消息列表,ID 从大到小
  • XREAD - 以壅闭或非壅闭方式获取消息列表
消耗者组相干下令:

  • XGROUP CREATE - 创建消耗者组
  • XREADGROUP GROUP - 读取消耗者组中的消息
  • XACK - 将消息标志为"已处置惩罚"
  • XGROUP SETID - 为消耗者组设置新的末了递送消息ID
  • XGROUP DELCONSUMER - 删除消耗者
  • XGROUP DESTROY - 删除消耗者组
  • XPENDING - 表现待处置惩罚消息的相干信息
  • XCLAIM - 转移消息的归属权
  • XINFO - 查察流和消耗者组的相干信息;
  • XINFO GROUPS - 打印消耗者组的信息;
  • XINFO STREAM - 打印流信息
4.代码实现

stream相干设置,这里重要设置消耗组和消耗者相干信息,以及消息的监听机制
您需要登录后才可以回帖 登录 | 立即注册

Powered by CangBaoKu v1.0 小黑屋藏宝库It社区( 冀ICP备14008649号 )

GMT+8, 2024-10-19 06:18, Processed in 0.106056 second(s), 32 queries.© 2003-2025 cbk Team.

快速回复 返回顶部 返回列表