【问题标题】:Spring Kafka consuming old messages which are already consumed by the consumerSpring Kafka 消费已被消费者消费的旧消息
【发布时间】:2020-08-11 00:09:23
【问题描述】:

我有一个 Spring Boot 应用程序并使用 Spring Kafka。我们创建了一个消费者,它正在消费来自 4 个主题的消息。这些主题没有任何分区。我在这里面临的问题是三个主题中的一个随机行为,在任何一个主题偏移量停止并且我的消费者继续一次又一次地使用来自该主题的相同消息,直到我们需要手动将偏移量移动到最新。下面是配置我有 YAML 配置:

spring:
  kafka:
   consumer:
      bootstrap-servers:  ${KAFKA_BOOTSTRAP_SERVERS}
      group-id: group_id
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:
  consumer:
    allTopicList: user.topic,student.topic,class.topic,teachers.topic**

因为它是一个 Spring Boot 应用程序,默认偏移设置为最新。 我在这里做错了什么,请帮助我理解。

【问题讨论】:

    标签: spring-boot apache-kafka kafka-consumer-api spring-kafka


    【解决方案1】:

    你用的是什么版本?

    你应该设置

    ...consumer:
         enable-auto-commit: false
    

    侦听器容器将更可靠地提交偏移量。

    你也应该考虑

         ack-mode: RECORD
    

    并且容器将为每个成功处理的记录提交偏移量(默认为 BATCH)。

    【讨论】:

    • 谢谢 Gary,我正在使用 spring-boot-starter-parent - 2.1.7.RELEASE,它在内部使用 kafka-clients-2.0.1
    • 如果你不能升级到 Boot 2.2,你至少应该升级到 Boot 2.1.13,它会引入 spring-kafka 2.2.12(相同的 kafka-clients 2.0.1)。
    • Gary 我进行了更改,但在我的应用程序的一个实例(两个实例中)的日志中收到以下消息:提交无法完成,因为该组已经重新平衡并将分区分配给另一个成员。这意味着后续调用 poll() 之间的时间比配置的 session.timeout.ms 长,这通常意味着轮询循环花费了太多时间处理消息。您可以通过增加会话超时或使用 max.poll.records 减少 poll() 中返回的批处理的最大大小来解决此问题。
    • org.apache.kafka.clients.consumer.CommitFailedException
    • 这意味着您的代码无法足够快地处理民意调查收到的记录;您需要减少 max.poll.records 或增加 max.poll.interval.ms (或两者兼而有之)。
    猜你喜欢
    • 1970-01-01
    • 2022-06-15
    • 2017-09-23
    • 1970-01-01
    • 2016-11-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多