【问题标题】:How to retrieve lastest message from Kafka topic如何从 Kafka 主题中检索最新消息
【发布时间】:2019-06-08 09:47:12
【问题描述】:

我只是卡夫卡的新手,遇到一个问题:

我在 Kafka 中有主题“A”,我启动 Spring Boot 应用程序并使用 MessageChannel 向主题“A”发送一些消息,然后我停止应用程序。

当我再次启动应用程序时,是否有办法获取我发送到主题“A”的最新消息(不是所有消息)?我已经搜索了所有解决方案,但它们对我帮助不大,如果我只发送新消息,它总是会立即收到消息。如果您有可运行的代码,请分享,我非常感谢:(

    // Start application


    // Get latest message in topic 'A' then do some LOGIC
    if (exist latest message) {
          //Print latest message
    }

【问题讨论】:

  • 你检查过Kafka属性auto.offset.reset = latest吗?
  • @apandey846:是的,我添加了,我使用 Spring 云绑定和这些配置:resetOffsets: true startOffset: latest

标签: java apache-kafka


【解决方案1】:

您的消费者存储他们的偏移量(即最后读取位置)。重新启动后,他们从这一点继续阅读。此行为是设计使然。

当此特定消费者组的偏移量由于某种原因尚不清楚(例如,它是一个新的消费者组或偏移量已过期)时,使用偏移重置属性,但通常有两个选项 - a) 重新读取一切从头开始 b) 开始聆听新消息,忘记之前的一切。

hacky 方法可以实现您所描述的,但它们并不简单且不推荐(一种微不足道的方法:消费者消息,而只是跳过它们直到到达分区 EOF)

也许只有 Kafka 不是解决这个问题的正确工具。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-08-30
    • 1970-01-01
    • 2018-08-25
    • 2019-06-23
    • 2022-07-28
    • 1970-01-01
    • 2015-12-07
    • 2017-02-20
    相关资源
    最近更新 更多