【问题标题】:How to reinstate a Kafka Consumer which has been kicked out of the group?如何恢复被踢出组的Kafka Consumer?
【发布时间】:2018-08-15 05:20:52
【问题描述】:

我有一个单一的 kafka 消费者,它使用轮询机制从 kafka 检索记录。有时,由于在我配置为 30 秒的 session.timeout 期间未能调用 poll,此消费者会被踢出消费者组。我的问题是,如果发生这种情况,是否会在稍后的某个时间点将消费者重新添加到组中,还是我需要做其他事情?

我使用的是 kafka 版本 0.10.2.1

编辑:2018 年 8 月 14 日

更多信息。在我进行民意调查后,我从不在同一个线程中处理记录。我只是将所有记录添加到单独的队列(由单独的线程池提供服务)中进行处理。

【问题讨论】:

    标签: apache-kafka kafka-consumer-api


    【解决方案1】:

    如果消费者还不是该组的成员,投票将发起一个“加入组”请求,并将导致消费者加入该组(除非某些错误情况阻止它)。请注意,根据组状态(组中的其他成员,组中的订阅主题),消费者可能会或可能不会获得与被踢出之前相同的分区。如果消费者是组中唯一的消费者,则情况并非如此。

    【讨论】:

      【解决方案2】:

      如果消费者未能在指定时间段内发送心跳,则会被踢出。每次调用 poll 都会向消费者组协调员发送一次心跳。

      您需要查看处理您的单条记录需要多少时间可能超过了您设置为 30 秒的session.timeout.ms value。尝试增加它。另外max.poll.records 保持在较低的值。此设置确定在调用 poll 方法后获取多少条记录。如果您获取的记录太多,那么即使您将 session.timeout.ms 保持在一个很大的值,您的消费者仍可能被踢出并且组将进入重新平衡阶段。

      【讨论】:

        【解决方案3】:

        Vahid 已经提到了当被踢出的消费者重新加入群组时会发生什么。您还可以调整以下配置,以免消费者被踢出组。

        1. max.poll.records - 在轮询循环中给出预定义的记录数(默认:500)
        2. max.poll.interval.ms - 它为您提供处理在poll 中收到的消息所需的时间。 (默认:5 分钟)

        你可以在KIP-62看到更新上述配置的影响

        或者,您可以使用KafkaConsumer#assign 模式,因为您已经提到您只使用一个消费者。此模式不会进行任何重新平衡。

        【讨论】:

          猜你喜欢
          • 2020-10-07
          • 2023-03-02
          • 2016-10-07
          • 2017-03-27
          • 1970-01-01
          • 2020-07-27
          • 1970-01-01
          • 2018-12-25
          相关资源
          最近更新 更多