Redis5之stream数据结构
redis中stream数据结构用于实现消息队列,当然之前版本也存在有pubsub(发布订阅方式的消息队列),而这个stream的设计引进类似kafka中的消费组。
引用来自官方:Consumer groups were initially introduced by the popular messaging system called Kafka (TM). Redis reimplements a similar idea in completely different terms, but the goal is the same: to allow a group of clients to cooperate consuming a different portion of the same stream of messages.
Stream是仅追加数据结构,xadd将每条信息加入到消息链表中串起来。每条消息都有对应唯一的ID即称为key。
每个Stream可以有多个消费组,每个消费组有多个消费者,每个消费组会有游标last_delivered_id
,用于记录当前消费到哪条消息。
消费者内部会有一个状态变量pending_ids,它记录了当前已经被客户端读取但还未ack的消息。
基本使用
xadd:追加消息
127.0.0.1:6379> xadd people * name icharle age 18 "1595668369649-0" 127.0.0.1:6379> xadd people * name pad age 13 "1595668593661-0" # people为key # name icharle age 18为消息内容,是键值对,类似于hash结构 # *为自动生成 消息ID。生成的格式为:时间戳-x 大致表示为某个时间点第x条消息
xdel:删除消息
127.0.0.1:6379> xlen people (integer) 2 127.0.0.1:6379> xdel people 1595668593661-0 (integer) 1 127.0.0.1:6379> xlen people (integer) 1
xlen:消息链长度
xrange:消息链范围列表
127.0.0.1:6379> xrange people - + # -表示最小消息id +表示最大消息id 1) 1) "1595668369649-0" 2) 1) "name" 2) "icharle" 3) "age" 4) "18" 2) 1) "1595668593661-0" 2) 1) "name" 2) "pad" 3) "age" 4) "13"
消费者&消费组
单独消费者模式
127.0.0.1:6379> xadd people * name icharle age 18
"1595668369649-0"
127.0.0.1:6379> xadd people * name pad age 13
"1595668593661-0"
127.0.0.1:6379> xadd people * name soarteam age 14
"1595668593661-0"
127.0.0.1:6379> xread count 1 streams people 0-0 # 从消息链头部消费一条
1) 1) "people"
2) 1) 1) "1595668369649-0"
2) 1) "name"
2) "icharle"
3) "age"
4) "18"
127.0.0.1:6379> xread count 1 streams people $ # 从消息链尾消费最新一条,之前已经在链表中数据不计入其中,因此返回为空
(nil)
127.0.0.1:6379> xread block 0 count 1 streams people $ # 采用永久阻塞方式block 0等待新数据到来,最后一行显示等待时间。 其中0可以换成具体时间,比如1000ms
1) 1) "people"
2) 1) 1) "1595669721851-0"
2) 1) "name"
2) "test"
3) "age"
4) "14"
(77.87s)
消费组模式
127.0.0.1:6379> xgroup create people cg1 0-0 # 创建从头开始消费的消费组。将0-0替换为$表示从最新开始消费
OK
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams people > # 创建一个消费者。> 号表示从当前消费组的last_delivered_id后面开始读
1) 1) "people"
2) 1) 1) "1595669445372-0"
2) 1) "name"
2) "icharle"
3) "age"
4) "18"
127.0.0.1:6379>
127.0.0.1:6379> xinfo groups people # 查看消费者
1) 1) "name"
2) "cg1"
3) "consumers" # 总共有一个消费者
4) (integer) 1
5) "pending"
6) (integer) 1 # 有一条消息未被ack确认
7) "last-delivered-id"
8) "1595669445372-0"
127.0.0.1:6379> xack people cg1 1595669445372-0 # ack该条消息
(integer) 1
127.0.0.1:6379>
127.0.0.1:6379> xinfo groups people
1) 1) "name"
2) "cg1"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 0 # pending状态变为0
7) "last-delivered-id"
8) "1595669445372-0"
持久化方面:同样支持AOF、RDB方式(Stream被异步复制到副本中并持久存储到AOF和RDB文件),但消息可能存在丢失,因此适合一些日志业务可以容忍少量消息丢失。Redis全内存型相对比kafka速度上有所提升。
强烈安利官方文档Introduction to Redis Streams