【问题标题】:Making Kafka consumers consume existing messages before subscription让 Kafka 消费者在订阅前消费现有消息
【发布时间】:2016-08-09 21:31:24
【问题描述】:

拥有 Publisher 和 N 个 Consumer,如果消费者使用 auto.offset.reset=latest,那么他们会错过在订阅之前发布到某个主题的所有消息……众所周知,具有 auto.offset.reset=latest 的消费者不会重播订阅前主题中存在的消息...

所以我需要:

  1. 让发布者等待所有订阅者开始消费消息,然后开始发布。不知道如何在不利用 Zookeeper 的情况下做到这一点。 Kafka 是否提供了这样做的方法?
  2. 另一种方法是让 auto.offset.reset=latest 消费者明确消费所有现有消息,以防他们即将使用现有消息订阅主题...

这种情况的最佳做法是什么?

我猜消费者必须检查现有消息的主题,如果有的话就消费它们,然后启动auto.offset.reset=latest消费。这听起来对我来说是最好的方式......

【问题讨论】:

  • 使用auto.offset.reset=earliest有什么坏处吗?
  • 如果您最早使用,您需要记忆或保留最后一个偏移量,以便知道您上次停止的位置,对吗?我刚刚意识到这两个选项都是完全错误的,唯一的方法是记住最后一个偏移量......
  • 是的,你是对的。您只需要每次都跟踪偏移量。你介意说说你的用例吗,比如你在哪里使用 Kafka,以及你使用的是哪个处理引擎,比如 spark 或storm?还有是批处理还是流式处理?

标签: apache-kafka


【解决方案1】:

如果开始使用高级消费者,它会执行以下操作:

  1. 为其消费者组寻找承诺的偏移量

    一个。如果找到有效的偏移量,则从那里恢复

    b.如果没有找到有效的偏移量,则根据auto.offset.reset设置偏移量

因此,auto.offset.reset 仅在未提交有效偏移量时触发。此行为旨在并且必须在发生故障时提供至少一次处理保证。

因此,如果您想从头开始阅读主题,您可以使用新的消费者 group.id 并设置 auto.offset.reset = earliest 或者在启动 poll() 之前使用 seekToBeginning() 显式修改启动时的偏移量循环。

【讨论】:

    【解决方案2】:

    我们使用 Eureka 提供的服务发现功能(任何其他服务发现应用程序都可以完成这项工作)+ 别名来执行选项 (1)。基本上,在至少有一个订阅者可用之前,发布者不会注册自己(并开始处理请求或发布通知)。

    【讨论】:

      猜你喜欢
      • 2020-10-04
      • 1970-01-01
      • 2017-09-23
      • 1970-01-01
      • 1970-01-01
      • 2020-08-11
      • 1970-01-01
      • 1970-01-01
      • 2019-08-29
      相关资源
      最近更新 更多