【问题标题】:Retry to consume messages from Kafka topic重试消费来自 Kafka 主题的消息
【发布时间】:2019-08-01 14:37:53
【问题描述】:

我正在开发一个模块,它使用来自 Kafka 主题的消息并发布到下游系统。如果下游系统不可用,消费者不确认 Kakfa 消息。因此,当我的消费者在下游系统不可用时收到消息时,不会提交 kakfa 的偏移量。但是,如果我在下游系统启动后收到新消息并且当我确认该消息时,将提交最新的偏移量,并且消费者永远不会收到主题中没有偏移量提交的那些消息。

即假设我的消费者被消耗到偏移量 4。当下游不可用时,消费者会收到两条消息,因此我的消费者没有提交偏移量。所以 toipc 中的消息数量现在是 6,但偏移量仍然是 4。现在下游系统可用,消费者收到一条新消息(第 7 条消息)。由于下游没有问题,消费者确认第 7 条消息,主题的偏移量将设置为 7。

有没有什么方法可以让我的消费者在收到第 7 条消息之前收到第 5 条和第 6 条消息?我在实现中使用了spring cloud stream。

【问题讨论】:

    标签: apache-kafka kafka-consumer-api spring-cloud-stream


    【解决方案1】:

    this answer

    您需要一个SeekToCurrentErrorHandler 并引发异常以便重置偏移量。

    【讨论】:

      猜你喜欢
      • 2018-07-21
      • 1970-01-01
      • 2021-07-09
      • 2022-10-23
      • 2020-12-12
      • 2020-03-05
      • 2021-12-12
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多