【发布时间】:2018-05-08 04:30:03
【问题描述】:
我有一个生产者,我调用它并将记录发布到 Kafka,然后我调用一个返回记录的消费者,但是当我再次调用消费者时,消费者不会返回任何记录。 (我需要再次获取我发布到 Kafka 的记录)。我该怎么做?(任何代码将不胜感激)
【问题讨论】:
-
@STaefi 但我该怎么做呢?
标签: java apache-kafka
我有一个生产者,我调用它并将记录发布到 Kafka,然后我调用一个返回记录的消费者,但是当我再次调用消费者时,消费者不会返回任何记录。 (我需要再次获取我发布到 Kafka 的记录)。我该怎么做?(任何代码将不胜感激)
【问题讨论】:
标签: java apache-kafka
Kafka 在消息被消费后不会删除它。但它为任何消费者保留了阅读的偏移量。因此,在您从中读取消息后,偏移量会继续前进。第二次读取没有读取任何内容,因为您唯一的消息之后的偏移点,之后没有任何内容。在再次阅读之前,您应该尝试重置偏移量。看这篇文章:
Reset consumer offset to the beginning from Kafka Streams
但如果您不想在本地或全局重置,您可以创建另一个消费者组,并且由于每个消费者组都有自己的偏移量,因此新消费者的第二次读取可以实现您想要的。请参阅此链接:
希望这会有所帮助。
【讨论】:
您可以手动将偏移量重置为所需的偏移量,或者如果您需要从起始偏移量(kafka 中可用的任何内容)开始消费,那么您可以设置消费者属性“auto.offset.reset=earliest”
【讨论】:
您还可以每次为消费者属性提供一个新的group.id 值。只需生成一个随机字符串值。属性auto.offset.reset 必须设置为earliest。
【讨论】:
auto.offset.reset 属性作为earliest吗?