【问题标题】:Is there any configuartion available to clear uncommitted message during kafka or consumer restart?在 kafka 或消费者重启期间是否有任何配置可用于清除未提交的消息?
【发布时间】:2026-02-19 02:45:01
【问题描述】:

我有一个业务场景,消费者不应该从主题消费已提交/未提交的消息 当消费者或卡夫卡重新启动时。我尝试应用auto.offset.reset: latest。但它从主题中提取未提交的偏移量。例如有一个具有 1 个主题和 1 个分区的实例的应用程序。假设我发布了 10 条消息,消费者选择了 5 条消息并提交了偏移量。现在我重新启动我的消费者实例 /kafka。重新启动后它不应该选择旧的 5 条消息未提交。正在寻找任何其他配置或解决方法。

【问题讨论】:

  • 您的意思是您只想读取已提交的偏移量而不是未提交的偏移量?
  • 否。每次重新启动都应该像干净的设置一样。
  • 那你为什么不用auto.offset.reset: earliest呢?我认为您没有正确解释您想要在这里实现的目标。
  • 我尝试重构这个问题。它可能会帮助你理解。我是这个 kafka 和 cloudstream 的新手。很抱歉
  • 我不知道这是您的措辞还是仅仅因为您是 kafka 的新手,但让我澄清一下;您将未提交的偏移量称为“旧消息”。事实并非如此。它实际上是相反的。新消息被认为是那些具有未提交偏移量的消息。

标签: spring-boot apache-kafka spring-cloud-stream


【解决方案1】:

每次启动时使用唯一的 group.id(例如 UUID),或在启动期间使用 seekToEnd 每个分配的分区。

Seeking to a Specific Offset

【讨论】:

    【解决方案2】:

    您需要确保每次重新启动应用程序时,您的使用者都会获得一个新的使用者组(在 Java API 中,为此的使用者配置称为 group.id)。即使您重新启动代理,您仍然会使用新的group.id 重新启动您的应用程序。并保留配置auto.offset.reset=latest

    另一种选择是在每次代理重启后手动更改消费者组的偏移量。 Kafka 带有一个 ConsumerGroupCommand 工具。您可以在 Kafka 文档Managing Consumer Groups 中找到一些信息。

    如果您打算重置特定的消费者组(“myConsumerGroup”),您可以使用

    > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group myConsumerGroup --topic topic1 --to-latest
    

    根据您的要求,您可以使用该工具重置主题的每个分区的偏移量。帮助功能或文档解释了这些选项。

    【讨论】:

      最近更新 更多