【问题标题】:Pub/Sub Lite Delayed ConsumerPub/Sub Lite 延迟消费者
【发布时间】:2023-04-02 04:18:01
【问题描述】:

我正在使用consumer.pause(<partitions>) 实现 Kafka 延迟主题消费。

Pub/Sub Kafka shim 将暂停变为 NoOp:

https://github.com/googleapis/java-pubsublite-kafka/blob/v0.6.7/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java#L590-L600

是否有任何文档说明如何将 pub sub lite 主题的消费延迟设定的持续时间?

即我想使用来自 Pub/Sub Lite 主题的所有消息,但有 4 分钟的综合延迟。

这是我使用 Kafka 原生的算法:

  • 致电consumer.poll()
  • 恢复所有分配的分区consumer.resume(consumer.assignment())
  • 将之前的 delayed 记录与最近轮询的记录结合起来
  • 将记录分隔到
    • 足以处理的记录
    • 记录还太年轻,无法处理
  • 为任何太年轻的记录暂停分区consumer.pause(<partitions of too young>)
  • 保留一个太年轻的记录缓冲区,以便在下一次通过时重新考虑,称为delayed
  • 处理足够旧的记录
  • 冲洗,重复

我们只提交足够老的记录的偏移量,如果进程死亡,“太年轻”缓冲区中的任何记录都将保持未提交,并且在随后的重新平衡中接收分区的任何消费者都会重新访问它们。

此算法是否有更通用的形式可以与原生 Kafka 和 Pub/Sub Lite 一起使用?

编辑:CloudTasks 在这里是个坏主意,因为它断开了偏移提交链。我需要确保我只为从下游系统得到确认的记录提交偏移量。

【问题讨论】:

    标签: apache-kafka google-cloud-pubsub google-cloud-pubsublite


    【解决方案1】:

    如果您删除 pauseresume 阶段,与上述类似的操作可能会正常工作。我会注意到,对于这两个系统,您不能保证在任何给定的 poll() 调用中直到现在都接收到服务器上存在的所有消息,因此如果您没有为给定分区提供任何记录,您可能会增加额外的延迟投票电话。

    如果您在启用自动提交的情况下执行以下操作,您应该有效地将处理延迟严格超过 4 分钟。

    1. 致电consumer.poll()
    2. 睡到每记录 4 分钟前
    3. 流程记录
    4. 转到 1。

    如果您使用手动提交,您可以使每条消息的睡眠时间接近 4 分钟,但缺点是需要手动管理偏移量:

    1. 致电consumer.poll()
    2. 将记录放入有序的每个分区缓冲区中
    3. 睡眠直到任何分区的最早记录是过去 4 分钟
    4. 处理过去 4 分钟以上的记录
    5. 提交已处理记录的偏移量
    6. 去1

    【讨论】:

    • 睡眠不是我的选择。只有允许阻塞的调用是对轮询本身的调用。这只会在没有新消息的情况下阻塞。我们已经自己管理偏移量。
    • "只有被允许阻塞的调用是对自身进行轮询的调用。"这似乎是一个任意的约束;鉴于您已经在 poll() 调用中阻塞了线程,您应该重新考虑它。假设您无法控制此约束的另一种方法是计算轮询时间,因为制作最旧记录所需的时间是过去 4 分钟。当最早的记录是过去 4 分钟时,这将累积新消息或超时。尽管与睡眠方法相比,这具有内存开销。
    猜你喜欢
    • 2021-04-18
    • 1970-01-01
    • 2021-07-09
    • 2019-10-09
    • 2019-12-04
    • 2012-01-28
    • 2011-11-04
    • 1970-01-01
    • 2011-05-12
    相关资源
    最近更新 更多