stream是redis 5.0新增的数据结构,用作消息队列。由于redis的pub/sub,消息不能持久化,当

1、xadd:向队列添加消息,如果指定的队列不存在,则会新建此队列。返回值是这个消息的id

语法:xadd key [NOMKSTREAM] [MAXLEN | MINID [= | ~] threshold [LIMIT count]] * | id field value [field value . . .]

如xadd mystream * name zhangsan age 18,返回这条消息的id,如1622543482818-0,id左半部分是时间戳,右半部分是用于区分同一时间的消息的序列号。

执行上面命令后,用keys *命令就可以看出会生成一个名为mystream的key,执行type mystream,会发现是stream类型。执行ttl mystream,返回-1,表示没有设置过期时间。

默认情况下,如果key不存在会新建,如果加了nomkstream,则不会新建。nomkstream是no make stream的缩写。

xadd mystream nomkstream * a 1,返回1622623367513-0。

xadd mystream nomkstream * b 2,返回1622623399416-0

xadd可以用*来让redis服务端自动生成id,也可以显式指定id,但要求是The ID specified in XADD is bigger than the target stream top item。

xadd mystream nomkstream 1622623415402-0 c 3,返回1622623415402-0。

2、xrange:获取队列指定id区间的消息列表

语法:xrange key start end [COUNT count]

如:xrange mystream 1622543482818-0 1622623415402-0,会返回mystream队列中id在[1622543482818-0, 1622623415402-0]区间的消息列表,顺序和xadd顺序一致。

我们可以用-来表示队列的最小消息id,用+来表示队列的最大消息id

如:xrange mystream - +

xrange mystream - + count 3,只展示前3个

相似命令:xrevrange:xrange的反序

3、xread:以阻塞或非阻塞的方式获取指定id之后的消息列表

语法:xread [COUNT count] [BLOCK milliseconds] STREAMS key [key . . .] id [id . . .]

如:xread streams mystream 1622623367513-0,会返回id为1622623399416-0和1622623415402-0的消息。

如果想获取所有的消息,我们可以用xread streams mystream 0-0,或者xread streams mystream 1-0,或者xread streams mystream 2-0,等等,只要时间戳足够小就行。

xread count 1 block 3000 streams mystream 0-0,最多阻塞3s后返回一条消息

4、xlen:查看队列现存消息数

语法:xlen key

如xlen mystream,返回消息数(可能是0),如果key不存在,则返回0。

5、xdel:删除指定id的消息,一次可以删除多个。

语法:xdel key id [id . . .]

如xdel mystream 1622551986863-0,返回删掉的消息的数量。

6、xtrim:对流进行裁剪。返回值是裁剪掉的消息的数量

语法:xtrim key MAXLEN | MINID [= | ~] threshold [LIMIT count]

如xtrim mystream maxlen 2,只保留最后2个消息,其他消息全删除。

xtrim mystream minid 1622623415402-0,删除id小于1622623415402-0的消息,仅保留1622623415402-0及之后的消息。

 

7、xgroup:管理消费组、消费者

语法:xgroup [create key groupname id | $ [MKSTREAM]] [setid key groupname id | $] [destory key groupname] [createconsumer | delconsumer key groupname consumername]

如:xgroup create mystream cg1 0-0,给mystream队列创建一个消费组,名字是cg1,从头消费队列数据。

xgroup create mystream cg2 1622623367513-0,给mystream队列创建一个消费组,名字是cg2,只消费1622623367513-0之后的消息。

xgroup create mystream cg3 $,给mystream队列创建一个消费组,名字是cg3,只消费新消息,消费组创建前队列中已有的数据不管了。

xgroup setid mystream cg2 1622623399416-0,重置偏移量到1622623399416-0。

xgroup destory mystream cg1,删除消费组cg1。

xgroup createconsumer mystream cg2 c1,给cg2消费组创建消费者c1。可以不显式创建,在用xreadgroup消费的时候会自动生成指定的消费者。

xgroup delconsumer mystream cg2 c1,把cg2消费组的c1消费者删除。

8、xreadgroup:消费组消费消息

语法:xreadgroup GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key . . .] id [id . . .]

When you read with XREADGROUP, the server will remember that a given message was delivered to you: the message will be stored inside the consumer group in what is called a Pending Entries List (PEL), that is a list of message IDs delivered but not yet acknowledged.用block指定最大阻塞时间,如block 60000,会最多阻塞600s,直至有一个消息来临或超时返回空。用count指定最大消息数,如count 10。

The client will have to acknowledge the message processing using XACK in order for the pending entry to be removed from the PEL. The PEL can be inspected using the XPENDING command.

The NOACK subcommand can be used to avoid adding the message to the PEL in cases where reliability is not a requirement and the occasional message loss is acceptable. This is equivalent to acknowledging the message when it is read.

The ID can be one of the following two:

The special > ID, which means that the consumer want to receive only messages that were never delivered to any other consumer. It just means, give me new messages.

Any other ID, that is, 0 or any other valid ID or incomplete ID (just the millisecond time part), will have the effect of returning entries that are pending for the consumer sending the command with IDs greater than the one provided. So basically if the ID is not >, then the command will just let the client access its pending entries: messages delivered to it, but not yet acknowledged. Note that in this case, both BLOCK and NOACK are ignored.

使用示例:

del mystream

xadd mystream * hello world,返回1622904007564-0

xgroup create mystream mygroup $,创建消费组mygroup

xreadgroup group mygroup godconsumer streams mystream >,返回nil

xadd mystream * a 1,返回1622904332808-0

xreadgroup group mygroup godconsumer streams mystream >,返回

1) 1) "mystream"

   2) 1) 1) "1622904332808-0"

         2) 1) "a"

            2) "1"

再次执行xreadgroup group mygroup godconsumer streams mystream >,返回nil

xadd mystream * b 2,返回1622904516827-0

xreadgroup group mygroup godconsumer streams mystream 0,返回

1) 1) "mystream"

   2) 1) 1) "1622904332808-0"

         2) 1) "a"

            2) "1"

xack mystream mygroup 1622904332808-0,返回1

xreadgroup group mygroup godconsumer streams mystream 0,返回

1) 1) "mystream"

   2) (empty array)

9、xack:向redis服务端发送通知,表明指定的消费组把指定id的消息消费好了,同时会把这个消息id从PEL中删掉。

语法:xack key group id [id . . .]

如:xack mystream cg1 1622623399416-0

10、xpending:查看PEL数据

语法:xpending key group [[IDLE min-idle-time] start end count [consumer]]

如:xpending mystream cg1,返回指定消费组未ack的消息情况,包括未ack的消息总数、每个消费者的名称及其未ack消息数、此消费组未ack的第一个消息的id和最后一个消息的id。

idle???消息过期时间在哪里设置?

redis stream使用

11、xinfo:查看消费组、消费者信息

语法:xinfo [CONSUMERS key groupname] [GROUPS key] [STREAM key]

如:xinfo groups mystream,查看mystream队列的消费组情况,会返回消费组的名称、每个消费组消费的最后一个消息的id。

xinfo consumers mystream cg1,返回mystream队列cg1消费组的消费者情况(每个消费者的名称、每个消费者消费的消息的数量,每个消费者的idle),如果没有这个消费组,会报错(error) NOGROUP No such consumer group 'cg1' for key name 'mystream'。

redis stream使用

xinfo stream mystream,查看mystream队列的长度、最大消息id、现存第一条及最后一条消息的id及内容。

12、xclaim

相似命令:xautoclaim 

相关文章:

  • 2021-05-21
  • 2021-07-30
  • 2021-05-23
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
猜你喜欢
  • 2021-07-25
  • 2021-10-08
  • 2022-12-23
  • 2021-06-23
  • 2021-12-25
  • 2021-10-26
  • 2021-11-28
相关资源
相似解决方案