【问题标题】:Kafka Consumer CommitFailedExceptionKafka 消费者 CommitFailedException
【发布时间】:2020-07-10 11:38:22
【问题描述】:

我正在开发一个 kafka 消费者程序。最近我们将它部署在 PROD 环境中。我们遇到了如下问题:

[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - No. of records fetched: 1
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Group coordinator opl-kafka-prd2-01:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Discovered group coordinator opl-kafka-prd2-01:9092 (id: 2147483644 rack: null)
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Attempt to heartbeat failed for since member id consumer-otm-opl-group-1-953dfa46-9ced-472f-b24f-36d78c6b940b is not valid.
[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - Batch start offset: 9329428
[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - Batch Processing Successful.
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Failing OffsetCommit request since the consumer is not part of an active group
Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:936)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1387)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1349)
    at com.cisco.kafka.consumer.RTRKafkaConsumer.main(RTRKafkaConsumer.java:72)

我的理解是,当组协调器不可用并重新发现时,心跳间隔(根据文档为 3 秒)到期,消费者被踢出组。它是否正确?。如果是这样,应该如何解决这个问题?如果我错了,请帮助我理解此问题并提出解决此问题的任何想法。如果需要,我可以分享代码。

【问题讨论】:

  • 另外,由于这个问题,下次我重新启动消费者时,会采用相同的偏移量,这会在数据库端产生重复处理问题。目前,我只有一个消费者实例在运行,因为我只有一个分区。

标签: java apache-kafka kafka-consumer-api


【解决方案1】:

你所指的异常

Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

提示正在发生的事情以及可以采取哪些措施来解决问题。在code 中,这个异常被描述为

“当使用 KafkaConsumer#commitSync() 的偏移提交失败并出现不可恢复的错误时,会引发此异常。这可能发生在组重新平衡完成后才能成功应用提交时。在这种情况下,通常无法重试提交因为某些分区可能已经分配给组中的另一个成员。”

根据我的经验,抛出的错误消息可能是由不同的事情引起的,尽管它们都与消费者不再被分配到分区有关:

  1. 在不关闭消费者的情况下创建越来越多的消费者
  2. 投票超时
  3. 心跳超时
  4. 过时的 Kerberos 票证

1。在不关闭消费者的情况下打开越来越多的消费者

如果您将消费者添加到现有消费者组,则会发生重新平衡。因此,必须在使用后关闭消费者或始终使用相同的实例,而不是为每个消息/迭代创建新的 KafkaConsumer 对象。

2。轮询超时(如错误消息中所述):

[...] 后续调用 poll() 之间的时间比配置的 max.poll.interval.ms 长,这通常意味着轮询循环花费了太多时间处理消息。

配置ma​​x.poll.interval.ms默认为300000ms5minutes。由于您的消费者花费的时间超过了 5 分钟,因此消费者被视为失败,该组将重新平衡以便将分区重新分配给另一个成员(请参阅Consumer Configuration)。

轮询超时解决办法:

错误信息中也给出了可能的解决方案

您可以通过增加 max.poll.interval.ms 或通过使用 max.poll.records 减少 poll() 返回的批次的最大大小来解决此问题。

消费者再次读取所有消息,因为(如错误所示)它无法提交偏移量。这意味着,如果您使用相同的 group.id 启动消费者,它认为它从未读取过该主题的任何内容。

3。心跳超时

在您的 KafkaConsumer 中有两个主要配置用于处理心跳:heartbeat.interval.mssession.timeout.ms

在一个单独的后台线程中,您的 KafkaConsumer 会定期向服务器发送心跳。如果消费者在 session.timeout.ms 时间内崩溃或无法发送心跳,那么消费者将被视为死亡,其分区将被重新分配。如果触发了重新平衡,您的消费者将无法从“旧分配”分区中提交任何内容,因为它在 CommitFailedException 的描述中写道:“这可能发生在组重新平衡在提交成功应用之前完成时。”

心跳超时解决办法:

在遵循以下建议的同时增加heartbeat.interval.mssession.timeout.ms 的设置:“heartbeat.interval.ms 的设置必须低于session.timeout.ms,但通常不应高于该值的 1/3。”

请记住,更改这些值总是需要权衡取舍。你有一个

  • 更频繁的重新平衡但更短的反应时间来识别死亡消费者或
  • 重新平衡的频率更低,识别死亡消费者的反应时间更长。

4。过时的 Kerberos 票证

在我们的生产集群上,我们在应用程序无法更新 Kerberos 票证之后看到了 CommitFailedException。

【讨论】:

  • 嗨,迈克。所以首先,我尝试将 max.poll.interval.ms 增加到 10 分钟,但问题仍然存在。但是消息不需要太多时间来处理。该程序每分钟可以轻松处理至少 500 条消息。其次,存在混乱。我在读取相同偏移量时的意思是,如果我的最后一批有 3 条记录,这些记录将被处理并插入到 DB 中。但是当消费者尝试提交这些已处理的偏移量时,正如您在日志中看到的那样,这个问题就会发生。所以下一次,消费者重启,同样的3个偏移量被再次读取。
  • 哦。对不起。我的错。是的,你是对的。我在数据存储到数据库后调用 commitSync。那么有没有其他方法来处理这个?如何让消费者活下去?增加 Max.poll.interval.ms 似乎不起作用。
  • 我将 max.poll.records 保留为默认设置。因为消费者是实时的,并且对于每个批次,大多数时候记录中只有一位数。到目前为止,我只看到一些批次有 20-30 条记录。
  • 我的默认 max.poll.records 字段是 500 减少到 250,我设法解决了我的问题。
【解决方案2】:

我们遇到了类似的问题,我们通过将 max.poll.records 从默认的 500 减少并减少心跳间隔来解决。如果您的消息处理需要时间并且轮询记录为 500,则获得 CommitFailedException 的机会很高。

【讨论】:

    猜你喜欢
    • 2016-06-10
    • 2020-09-19
    • 2017-11-16
    • 1970-01-01
    • 2016-07-19
    • 1970-01-01
    • 2017-01-04
    • 2017-09-23
    • 1970-01-01
    相关资源
    最近更新 更多