【问题标题】:spring-boot 1.5.4 spring cloud stream manual offset commit behaviour if consumer (kube pod) restartsspring-boot 1.5.4 spring cloud stream 手动偏移提交行为,如果消费者(kube pod)重新启动
【发布时间】:2020-09-07 11:44:17
【问题描述】:

您好,我们一直在使用旧的 spring 版本和带有以下依赖项的 kafka 1.1

+--- org.springframework.boot:spring-boot-starter-web: -> 1.5.4.RELEASE (*)
+--- org.springframework.cloud:spring-cloud-starter-config: -> 1.3.1.RELEASE (*)
+--- org.springframework.cloud:spring-cloud-starter-bus-kafka: -> 1.3.1.RELEASE (*)
+--- org.springframework.cloud:spring-cloud-stream-binder-kafka: -> 1.2.1.RELEASE (*)
+--- org.apache.kafka:kafka-clients:0.11.0.2 (*)
+--- org.apache.kafka:kafka_2.11:0.11.0.2 (*)

我在 application.yaml 文件中有以下配置

spring:
  kafka:
    properties.security.protocol: SSL
    ssl:
      key-password: ${ABC_KEY_PASSWORD}
      keystore-location: file:${ABC_KEYSTORE}
      keystore-password: ${ABC_KEYSTORE_PASSWORD}
      truststore-location: file:${ABC_TRUSTSTORE}
      truststore-password: ${ABC_TRUSTSTORE_PASSWORD}
    consumer:
      enable-auto-commit: false
  cloud:
    stream:
      bindings:
        feed_fcpostprocessor_event:
          destination: app-fc-notification
          binder: abckafka
          group: app-fc-group
          consumer:
            partitioned: true
            concurrency: 2
      kafka:
        binder:
          brokers: ${ABC_BROKER_HOST:abc-kafka.aws.local:9092}
          autoCreateTopics: true
          autoAddPartitions: true
          replicationFactor: 3
          partitionCount: 2
        bindings:
          feed_fcpostprocessor_event:
            consumer:
              autoCommitOffset: false
      binders:
        abckafka:
          type: kafka

我在 stackoverflow 中找到了link。答案有

首先,我不确定您的期望是什么 SpringApplication.exit(applicationContext,()-> 0);,但你是 本质上是用一切帽子来降低整个应用程序 可能正在那里运行。其次,您的消息丢失是由于事实 Kafka binder 完全不知道有异常 发生并且它必须将消息放回主题。

这是否意味着 Kafka 不知道消费者已关闭,在这种情况下,消费者没有通信。我原以为会发生重新平衡,此外,因为 kafka 级别的 enable-auto-commit 和绑定级别的 autoCommitOffset 设置为 false。偏移量不会提交,如果应用程序再次启动,它将从它错过的最后一个偏移量开始,或者如果发生重新平衡,那么其他消费者将从其他消费者失败的分区中读取偏移量。

我相信我的问题不是重复的,并且必须点查询。 如果有问题,请指出我的答案。 否则请澄清行为。 谢谢

【问题讨论】:

    标签: java spring-boot spring-kafka spring-cloud-stream


    【解决方案1】:

    首先,即使 Boot 1.5 不再受支持;你应该使用 spring-kafka 1.3.11 而不是 1.1;多亏了 KIP-62,它有一个更简单的线程模型。在 1.3 之前,为了避免重新平衡,事情非常复杂。

    但是,更好的是,您应该升级到所有项目的受支持版本,但至少将 spring-kafka 升级到 1.3。

    你的预期是正确的;未提交偏移量的记录将被重新传递。

    【讨论】:

      猜你喜欢
      • 2023-03-29
      • 2021-12-30
      • 2017-03-30
      • 2018-01-04
      • 1970-01-01
      • 2018-11-23
      • 1970-01-01
      • 2016-06-21
      • 2017-06-22
      相关资源
      最近更新 更多