【问题标题】:Why does consumer read all messages from Kafka topic after restart?为什么消费者在重启后会读取来自 Kafka 主题的所有消息?
【发布时间】:2020-12-07 16:17:13
【问题描述】:

我使用 confluent .net 客户端。 订阅者总是在重启后(订阅者服务重启)读取来自 Kafka 主题的所有消息。 如何提交消费者已经实现并从中读取的偏移量? 也许一些消费者配置可以帮助......

【问题讨论】:

    标签: apache-kafka confluent-kafka-dotnet


    【解决方案1】:

    这只是一个疯狂的猜测,但是如何声明消费者的组 id? 我见过一些使用这种随机分配的例子:

         ["group.id"] = Guid.NewGuid().ToString(),
    

    如果您每次启动消费者时都声明一个新的/随机的group.id,这将导致在每次执行时注册一个新的消费者组,这涉及到auto.offset.reset

    如果此属性设置为“earliest”,那么每次启动消费者时(假设他们每次都有不同的 group.id),他们将从第一个可用偏移量开始,就像你的情况一样,读取所有的消息从头开始。

    如果此属性设置为“latest”,并且您的生产者当前没有发送任何消息,您将无法读取任何内容,这可能会造成一些混乱。

    尝试设置一个固定的group.id:开始消费,当消息在代理上仍然可用时停止进程,然后再次启动消费者,而不更改最后一个group.id

    这一次,由于消费者组已经注册,auto.offset.reset 将被忽略,起始位置将由您提交的偏移量定义,默认情况下存储在名为 __consumer_offsets 的特殊主题中。

    【讨论】:

      【解决方案2】:

      你有两个选择:

      • 通过设置消费者属性 EnableAutoCommit = true 启用自动提交(消息在可配置的时间后提交,通常为 5 秒),或者

      • 通过consumer.Commit(consumeResult)手动提交获取的偏移量。

      GitHub 上显示了手动提交的示例。

      【讨论】:

      • EnableAutoCommit = true 已设置且组名不变。但这不是5秒,而是十几个小时。可能在经纪人或其他地方设置了一些额外的设置
      • 您可以查看其他 ConsumerProducer auto.commit.interval.ms 记录在案的 here
      猜你喜欢
      • 2021-07-09
      • 2018-07-21
      • 2016-04-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-12-12
      • 2022-10-23
      • 2017-11-09
      相关资源
      最近更新 更多