【发布时间】:2020-07-10 11:38:22
【问题描述】:
我正在开发一个 kafka 消费者程序。最近我们将它部署在 PROD 环境中。我们遇到了如下问题:
[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - No. of records fetched: 1
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Group coordinator opl-kafka-prd2-01:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Discovered group coordinator opl-kafka-prd2-01:9092 (id: 2147483644 rack: null)
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Attempt to heartbeat failed for since member id consumer-otm-opl-group-1-953dfa46-9ced-472f-b24f-36d78c6b940b is not valid.
[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - Batch start offset: 9329428
[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - Batch Processing Successful.
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Failing OffsetCommit request since the consumer is not part of an active group
Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:936)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1387)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1349)
at com.cisco.kafka.consumer.RTRKafkaConsumer.main(RTRKafkaConsumer.java:72)
我的理解是,当组协调器不可用并重新发现时,心跳间隔(根据文档为 3 秒)到期,消费者被踢出组。它是否正确?。如果是这样,应该如何解决这个问题?如果我错了,请帮助我理解此问题并提出解决此问题的任何想法。如果需要,我可以分享代码。
【问题讨论】:
-
另外,由于这个问题,下次我重新启动消费者时,会采用相同的偏移量,这会在数据库端产生重复处理问题。目前,我只有一个消费者实例在运行,因为我只有一个分区。
标签: java apache-kafka kafka-consumer-api