【问题标题】:How can I consume message in topic many times如何多次消费主题中的消息
【发布时间】:2018-05-08 04:30:03
【问题描述】:

我有一个生产者,我调用它并将记录发布到 Kafka,然后我调用一个返回记录的消费者,但是当我再次调用消费者时,消费者不会返回任何记录。 (我需要再次获取我发布到 Kafka 的记录)。我该怎么做?(任何代码将不胜感激)

【问题讨论】:

  • @STaefi 但我该怎么做呢?

标签: java apache-kafka


【解决方案1】:

Kafka 在消息被消费后不会删除它。但它为任何消费者保留了阅读的偏移量。因此,在您从中读取消息后,偏移量会继续前进。第二次读取没有读取任何内容,因为您唯一的消息之后的偏移点,之后没有任何内容。在再次阅读之前,您应该尝试重置偏移量。看这篇文章:

Reset consumer offset to the beginning from Kafka Streams

但如果您不想在本地或全局重置,您可以创建另一个消费者组,并且由于每个消费者组都有自己的偏移量,因此新消费者的第二次读取可以实现您想要的。请参阅此链接:

kafka-tutorial-kafka-consumer

希望这会有所帮助。

【讨论】:

  • 我在那里没有找到任何有用的代码。我正在发送字符串的记录,再见[]
  • 尝试使用两个不同的消费者来消费相同的消息。创建两个消费者对象,看看这样你是否可以两次消费相同的消息。
  • 更新了答案以指定 消费者群体 共享偏移量。新组是新的偏移处理
  • 感谢@cricket_007 的提示,我错过了重要的一点。
【解决方案2】:

您可以手动将偏移量重置为所需的偏移量,或者如果您需要从起始偏移量(kafka 中可用的任何内容)开始消费,那么您可以设置消费者属性“auto.offset.reset=earliest”

【讨论】:

    【解决方案3】:

    您还可以每次为消费者属性提供一个新的group.id 值。只需生成一个随机字符串值。属性auto.offset.reset 必须设置为earliest

    【讨论】:

    • 你有auto.offset.reset 属性作为earliest吗?
    猜你喜欢
    • 1970-01-01
    • 2020-04-04
    • 2019-04-15
    • 2019-06-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-01-10
    • 2018-03-08
    相关资源
    最近更新 更多