【问题标题】:redis streams ordered processing within consumer groupsredis 流式处理消费者组内的有序处理
【发布时间】:2021-07-21 17:20:48
【问题描述】:

我正在将 python (aioredis) 与 redis 流一起使用。

我有一个生产者 - 许多(分组)消费者场景,并希望确保消费者正在处理以有序方式发送到流的(批量)消息,这意味着:当第一条消息完成时,处理下一条消息在流中等等。这也意味着消费者组中的一个消费者正在处理,而其他消费者将等待。

我还想依赖第二个、第三个等消费者组中的有序处理——所有这些都依赖于发送到一个流的相同消息。意思:

message 1 ... n -> stream1 
ordered processing within group 1 ... n  
whereas consumer 1 ... n per group 1 ... n

当我还想确保每个组的潜在订单检查逻辑不会过多过载时,什么是完成这项工作的好方法?

【问题讨论】:

  • 如果要按顺序处理消息,为什么要使用消费者组?这只是为了在多个消费者组之间共享消息吗?
  • 我使用消费者组来应用与一条消息相关的不同主题。例如 - 由生产者发送到流的订单 1) 消费组支付:处理付款详情 2) 消费组物流:触发货物交付 3) 消费组购买:购买更多商品以填补库存
  • 重试怎么样?

标签: python redis stream aioredis


【解决方案1】:

让我回到老派的同步处理,如果你想顺序处理流消息并不容易,原因是失败/重试。

假设您希望最多处理一次每条消息,流式消息执行是一个关键部分,消费者组成员作为线程/进程。

要同步这一点,您需要有某种锁定机制,因为消费者组可以在不同的机器上运行。您可以使用全局锁定机制来防止多个消费者消费来自同一流的消息。

您可以使用 Redis 锁 (RedLock) 来获取/释放锁。

伪代码

Procedure SequentialProcessor

Input: StreamName
Input: ConsumerName
Input: ConsumerGroup
Input: LockTime 


BEGIN
    redLock = RedLock()
    WHILE True DO
     IF redLock.aquireLock(StreamName#ConsumerGroup, LockTime) THEN
       message = redis.XREADGROUP( ConsumerGroup, StreamName, ...)
       TRY
         processMessage( message )
       FINALLY
          redLock.releaseLock( StreamName#ConsumerGroup )
     ENDIF
    END
END

 

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-04-18
    • 2017-02-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-05-04
    • 1970-01-01
    相关资源
    最近更新 更多