【问题标题】:Kafka not able to consume without reading from beginning -Java卡夫卡不从头开始阅读就无法消费-Java
【发布时间】:2017-08-21 18:33:25
【问题描述】:

我是 kafka 的新手,正在尝试使用 kafka 构建一个生产者 - 消费者应用程序。在这里,我可以向 kalka 生成消息,但是当我尝试使用消费者将其消费回来时,它返回 0 条记录。

我检查了我的消费者组的偏移量,我可以看到偏移量等于日志长度相同(在我的情况下为 1M - 与记录数相同)。

如果我在创建消费者时使用此配置属性,它会从头开始读取。

configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

但我的要求是,如果我重新启动消费者,它应该像 AMQ 一样从上一个端点开始。

这里有什么我想念的吗?我认为只有在消费者民意调查之后才会改变偏移量。为什么一开始就设置为最大记录长度?

【问题讨论】:

  • @MatthiasJ.Sax 感谢您的回复。但是根据您分享的链接,我知道消费者 api 不会更改我从主题中轮询和消费的偏移量。但是,即使在我使用该主题的任何记录之前,我的案例偏移量也已设置为结束。我做错什么了吗?
  • 没错。 “寻求”一个职位总是懒惰的。我在下面给出了更详细的答案。让我知道它是否有帮助。 Kafka Consumer 刚开始时有点难以理解——它与其他系统不同(而且更好)。但因此,(一开始)也有点难以做到正确:)

标签: java apache-kafka kafka-consumer-api kafka-producer-api


【解决方案1】:

正如链接所述,您需要考虑以下几种情况:

  1. 开始一个新的消费者(new group.id):在这种情况下,不会有提交的偏移量,因此消费者根据参数设置auto.offset.reset开始读取

  2. 重新启动消费者(重用group.id):在这种情况下,消费者将从中断的地方继续。参数设置auto.offset.reset被忽略。

因此,对于场景 (1),您只需“配置”您的起始位置。对于场景(2),您的起始位置是“固定的”(即,始终是最后提交的偏移量),这不能通过配置更改。但是,您始终可以在第一次致电 poll() 之前执行 .seekToBeginning().seekToEnd() 并阅读整个主题或从主题末尾开始。对.seekXX() 的调用将“覆盖”最后提交的偏移量,并允许您以您喜欢的任何偏移量开始消费。请注意,还有 seek() 采用“偏移参数”,因此您可以指定要开始使用的任何偏移量。

【讨论】:

  • 我从您的回答和您分享的链接中了解偏移量现在是如何工作的。但我没有得到的是,我使用生产者为一个主题制作了 1M。我创建了一个具有唯一组 ID configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); 的消费者。这仍然是在消耗没有 AUTO_OFFSET_RESET_CONFIG 设置为最早的主题中的任何数据。
  • 在正常情况下,如果我使用新的组 id 开始一个新的消费者,它应该从头开始,而不是我强制将偏移量设置为正确的开始?
  • 如果您不设置auto.offset.reset 并使用新的消费者(即新的组ID),它应该从最后读取......不知道为什么它会读取整个主题......(我假设,当您启动消费者时,您没有任何正在运行的生产者)。
  • @MatthiasJ.Sax - 我已经遵循了这个答案。但是消费者重启后我在阅读消息时仍然遇到问题。我已经在stackoverflow.com/questions/48826279/… 中发布了详细信息。你能告诉我可能出了什么问题吗?
  • @tuk 不确定。实际上,我在 Kafka 用户邮件列表上回复了您交叉发布的问题。
猜你喜欢
  • 2020-03-14
  • 1970-01-01
  • 1970-01-01
  • 2021-04-02
  • 2017-10-12
  • 2018-09-18
  • 1970-01-01
  • 1970-01-01
  • 2018-05-05
相关资源
最近更新 更多