MENU

Redis5之stream数据结构

redis中stream数据结构用于实现消息队列,当然之前版本也存在有pubsub(发布订阅方式的消息队列),而这个stream的设计引进类似kafka中的消费组

引用来自官方:Consumer groups were initially introduced by the popular messaging system called Kafka (TM). Redis reimplements a similar idea in completely different terms, but the goal is the same: to allow a group of clients to cooperate consuming a different portion of the same stream of messages.


Stream是仅追加数据结构,xadd将每条信息加入到消息链表中串起来。每条消息都有对应唯一的ID即称为key。
每个Stream可以有多个消费组,每个消费组有多个消费者,每个消费组会有游标last_delivered_id,用于记录当前消费到哪条消息。
消费者内部会有一个状态变量pending_ids,它记录了当前已经被客户端读取但还未ack的消息。

基本使用

  • xadd:追加消息

    127.0.0.1:6379> xadd people * name icharle age 18
    "1595668369649-0"
    127.0.0.1:6379> xadd people * name pad age 13
    "1595668593661-0"
    
    # people为key
    # name icharle age 18为消息内容,是键值对,类似于hash结构
    # *为自动生成 消息ID。生成的格式为:时间戳-x 大致表示为某个时间点第x条消息
    
  • xdel:删除消息

    127.0.0.1:6379> xlen people
    (integer) 2
    127.0.0.1:6379> xdel people 1595668593661-0
    (integer) 1
    127.0.0.1:6379> xlen people
    (integer) 1
    
  • xlen:消息链长度

  • xrange:消息链范围列表

    127.0.0.1:6379> xrange people - + # -表示最小消息id +表示最大消息id
    1) 1) "1595668369649-0"
       2) 1) "name"
          2) "icharle"
          3) "age"
          4) "18"
    2) 1) "1595668593661-0"
       2) 1) "name"
          2) "pad"
          3) "age"
          4) "13"
    

消费者&消费组

单独消费者模式

127.0.0.1:6379> xadd people * name icharle age 18
"1595668369649-0"
127.0.0.1:6379> xadd people * name pad age 13
"1595668593661-0"
127.0.0.1:6379> xadd people * name soarteam age 14
"1595668593661-0"
127.0.0.1:6379>  xread count 1 streams people 0-0  # 从消息链头部消费一条
1) 1) "people"
   2) 1) 1) "1595668369649-0"
         2) 1) "name"
            2) "icharle"
            3) "age"
            4) "18"
127.0.0.1:6379>  xread count 1 streams people $  # 从消息链尾消费最新一条,之前已经在链表中数据不计入其中,因此返回为空
(nil)
127.0.0.1:6379> xread block 0 count 1 streams people $ # 采用永久阻塞方式block 0等待新数据到来,最后一行显示等待时间。 其中0可以换成具体时间,比如1000ms
1) 1) "people"
   2) 1) 1) "1595669721851-0"
         2) 1) "name"
            2) "test"
            3) "age"
            4) "14"
(77.87s)

消费组模式

127.0.0.1:6379> xgroup create people cg1 0-0  # 创建从头开始消费的消费组。将0-0替换为$表示从最新开始消费
OK
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams people >  # 创建一个消费者。> 号表示从当前消费组的last_delivered_id后面开始读
1) 1) "people"
   2) 1) 1) "1595669445372-0"
         2) 1) "name"
            2) "icharle"
            3) "age"
            4) "18"
127.0.0.1:6379>
127.0.0.1:6379> xinfo groups people   # 查看消费者
1) 1) "name"
   2) "cg1"
   3) "consumers"       # 总共有一个消费者
   4) (integer) 1
   5) "pending"             
   6) (integer) 1       # 有一条消息未被ack确认
   7) "last-delivered-id"
   8) "1595669445372-0"
   
127.0.0.1:6379> xack people cg1 1595669445372-0  # ack该条消息
(integer) 1
127.0.0.1:6379>
127.0.0.1:6379> xinfo groups people
1) 1) "name"
   2) "cg1"
   3) "consumers"
   4) (integer) 1
   5) "pending"
   6) (integer) 0        # pending状态变为0
   7) "last-delivered-id"
   8) "1595669445372-0"

持久化方面:同样支持AOF、RDB方式(Stream被异步复制到副本中并持久存储到AOF和RDB文件),但消息可能存在丢失,因此适合一些日志业务可以容忍少量消息丢失。Redis全内存型相对比kafka速度上有所提升。

强烈安利官方文档Introduction to Redis Streams

标签: Redis
返回文章列表 文章二维码 打赏
本页链接的二维码
打赏二维码