【发布时间】:2017-08-21 18:33:25
【问题描述】:
我是 kafka 的新手,正在尝试使用 kafka 构建一个生产者 - 消费者应用程序。在这里,我可以向 kalka 生成消息,但是当我尝试使用消费者将其消费回来时,它返回 0 条记录。
我检查了我的消费者组的偏移量,我可以看到偏移量等于日志长度相同(在我的情况下为 1M - 与记录数相同)。
如果我在创建消费者时使用此配置属性,它会从头开始读取。
configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
但我的要求是,如果我重新启动消费者,它应该像 AMQ 一样从上一个端点开始。
这里有什么我想念的吗?我认为只有在消费者民意调查之后才会改变偏移量。为什么一开始就设置为最大记录长度?
【问题讨论】:
-
@MatthiasJ.Sax 感谢您的回复。但是根据您分享的链接,我知道消费者 api 不会更改我从主题中轮询和消费的偏移量。但是,即使在我使用该主题的任何记录之前,我的案例偏移量也已设置为结束。我做错什么了吗?
-
没错。 “寻求”一个职位总是懒惰的。我在下面给出了更详细的答案。让我知道它是否有帮助。 Kafka Consumer 刚开始时有点难以理解——它与其他系统不同(而且更好)。但因此,(一开始)也有点难以做到正确:)
标签: java apache-kafka kafka-consumer-api kafka-producer-api