【问题标题】:Kafka consumer message commit issueKafka消费者消息提交问题
【发布时间】:2020-05-30 18:20:11
【问题描述】:

卡夫卡新手。

Kafka 版本:2.3.1

我正在尝试使用 spring cloud 来使用来自两个主题的 Kafka 消息。除了 kafka binder 和一些简单的配置,我没有做太多的配置,如下所示。每当(Group coordinator lbbb111a.uat.pncint.net:9092 (id: 2147483641 rack: null) 不可用或无效时,将尝试重新发现)发生,已经处理的消息束正在再次处理。不知道发生了什么。

spring.cloud.stream.kafka.binder.brokers: xxxxx:9094
spring:
  cloud:
    stream:
      default:
        group: bbb-bl-kyc
      bindings:
        input:
          destination: bbb.core.sar.blul.events,bbb.core.sar.bluloc.events
          contentType: application/json
          consumer:
            headerMode: embeddedHeaders  

spring.kafka.consumer.properties.spring.json.trusted.packages: "*"
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
#Custom Serializer configurations to secure data
spring.cloud.stream.kafka.binder.configuration:
  key.serializer: org.apache.kafka.common.serialization.StringSerializer
  value.serializer: pnc.aop.core.kafka.serialization.MessageSecuredByteArraySerializer
  value.deserializer: pnc.aop.core.kafka.serialization.MessageSecuredByteArrayDeserializer
  key.deserializer: org.apache.kafka.common.serialization.StringDeserializer



2020-05-29 07:01:11.389  INFO 1 --- [container-0-C-1] p.a.b.k.service.KYCOrchestrationService  : Done with Customer xxxx MS call response handling  Confm Id: 159073553171893 Appln Id: HSUKQJDJNZNMWVZZ
2020-05-29 07:01:11.393  INFO 1 --- [container-0-C-1] p.a.b.kyc.service.DMSIntegrationService  : Message written to the DMS topic successfully 159073553171893
2020-05-29 07:01:11.394  INFO 1 --- [container-0-C-1] p.a.b.k.s.AdminConsoleProducerService    : Message written to Admin console Application Log topic successfully  Confm Id: 159073553171893 Appln Id: HSUKQJDJNZNMWVZZ
2020-05-30 17:21:13.140  INFO 1 --- [ad | bbb-bl-kyc] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Group coordinator lbbb111a.uat.pncint.net:9092 (id: 2147483641 rack: null) is unavailable or invalid, will attempt rediscovery
2020-05-30 17:21:13.122  INFO 1 --- [ad | bbb-bl-kyc] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Group coordinator lbbb111a.uat.pncint.net:9092 (id: 2147483641 rack: null) is unavailable or invalid, will attempt rediscovery
2020-05-30 17:21:14.522  INFO 1 --- [ad | bbb-bl-kyc] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Discovered group coordinator lbbb111a.uat.pncint.net:9092 (id: 2147483641 rack: null)
2020-05-30 17:21:14.692  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Discovered group coordinator lbbb111a.uat.pncint.net:9092 (id: 2147483641 rack: null)
2020-05-30 17:21:15.151  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Attempt to heartbeat failed for since member id consumer-4-f5a03efd-75cd-425b-94e1-efd3d728d7ca is not valid.
2020-05-30 17:21:15.152  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Revoking previously assigned partitions [bbb.core.sar.bluloc.events-0]
2020-05-30 17:21:15.173  INFO 1 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : bbb-bl-kyc: partitions revoked: [bbb.core.sar.bluloc.events-0]
2020-05-30 17:21:15.141  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Attempt to heartbeat failed for since member id consumer-2-52012bae-1b22-4211-b107-803fb3765720 is not valid.
2020-05-30 17:21:15.175  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] (Re-)joining group
2020-05-30 17:21:15.176  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Revoking previously assigned partitions [bbb.core.sar.blul.events-0]
2020-05-30 17:21:15.184  INFO 1 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : bbb-bl-kyc: partitions revoked: [bbb.core.sar.blul.events-0]
2020-05-30 17:21:15.184  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] (Re-)joining group
2020-05-30 17:21:18.200  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Successfully joined group with generation 66
2020-05-30 17:21:18.200  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Successfully joined group with generation 66
2020-05-30 17:21:18.200  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Setting newly assigned partitions: bbb.core.sar.bluloc.events-0
2020-05-30 17:21:18.200  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Setting newly assigned partitions: bbb.core.sar.blul.events-0
2020-05-30 17:21:18.203  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Found no committed offset for partition bbb.core.sar.blul.events-0
2020-05-30 17:21:18.203  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Found no committed offset for partition bbb.core.sar.bluloc.events-0
2020-05-30 17:21:18.537  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Resetting offset for partition bbb.core.sar.blul.events-0 to offset 4.
2020-05-30 17:21:18.538  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Resetting offset for partition bbb.core.sar.bluloc.events-0 to offset 0.
2020-05-30 17:21:18.621  INFO 1 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : bbb-bl-kyc: partitions assigned: [bbb.core.sar.blul.events-0]
2020-05-30 17:21:18.625  INFO 1 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : bbb-bl-kyc: partitions assigned: [bbb.core.sar.bluloc.events-0]
2020-05-30 17:21:18.822  INFO 1 --- [container-0-C-1] p.a.b.k.stream.KYCbbbCoreEventsListener  : Initiating KYC Orchestration 159071814927374
2020-05-30 17:21:18.826  INFO 1 --- [container-0-C-1] p.a.b.k.stream.KYCbbbCoreEventsListener  : Initiating KYC Orchestration null
2020-05-30 17:21:18.928  INFO 1 --- [container-0-C-1] p.a.b.k.s.AdminConsoleProducerService    : Message written to Admin console Application topic successfully Confm Id: null Appln Id: XQZ58K3H3XZADTAT

【问题讨论】:

  • >Whenever this happen, 每当发生什么?你的 Kafka 代理是什么版本。
  • @GaryRussell:更新。谢谢

标签: apache-kafka spring-cloud spring-cloud-stream spring-cloud-config


【解决方案1】:

在不更改大部分消费者配置的情况下,您将拥有至少一次交付语义。

当组协调器暂时不可用时,您的消费者将无法提交它处理的消息。重新加入后,您的消费者将再次处理相同的消息(因为它们尚未提交),从而导致重复。

您可以找到有关 GroupCoordinator 和交付语义的更多详细信息here

【讨论】:

  • 感谢迈克的回复。如果你看到我上面的调试日志。小组协调员仅在短时间内不可用。但是一旦重新加入,它就会显示“找不到分区 bbb.core.sar.blul.events-0 的已提交偏移量”。所以这导致我再次重新处理我过去两天的所有消息。这使得应用程序重复提交到下游系统。提交应该只在那么短的时间内丢失。但它说没有承诺的偏移量。如果我错了,请纠正我。
  • 我们刚刚遇到了类似的问题。我们在融合卡夫卡。我们按顺序重启了zookeeper服务器、kafka服务器和connect服务器。 Mohan 在他的 OP 中发布的相同日志消息,我们大约 10% 的主题从我们默认为 7 天的日志保留开始开始​​处理。我们其他 90% 的主题/连接器不受影响。
猜你喜欢
  • 2017-03-19
  • 2020-08-24
  • 2020-06-28
  • 2017-09-23
  • 1970-01-01
  • 2023-01-31
  • 2017-10-07
  • 1970-01-01
  • 2018-06-14
相关资源
最近更新 更多