【问题标题】:What happens to records/messages during consumption when the record processing took more than 'max.poll.interval.ms'?当记录处理时间超过“max.poll.interval.ms”时,记录/消息在消费过程中会发生什么?
【发布时间】:2020-02-24 13:16:54
【问题描述】:

我低于消费者设置。

auto.offset.reset=earliest
enable.auto.commit=true (default value)
session.timeout.ms=10000 (default value)
max.poll.interval.ms= 300000 (default value)

通过上述配置,假设我在主题 A(只有 1 个分区)中有 5 条消息(m1、m2、m3、m4 和 m5)。现在我已经订阅了这个主题,并且能够处理前两条消息(m1 和 m2)而没有任何问题和提交的偏移量。

现在,假设消费者收到了第三条消息 m3 并尝试处理它,但由于一些网络延迟,处理它需要 300100 毫秒。现在,根据我的理解,偏移量提交不会发生,因为记录处理时间超过 max.poll.interval.ms,因此消费者将被视为死亡并从组中删除。

现在我有两个问题

  1. 消息 m3 会发生什么情况?我的意思是,它会在下一次投票中被选中,因为它的偏移量没有提交
  2. 其他消息 m4 和 m5 会发生什么情况?

【问题讨论】:

    标签: apache-kafka


    【解决方案1】:

    在没有调用 poll() 的情况下过期 max.poll.inteval.ms 是重新平衡的原因之一。当一个消费者组开始重新平衡时,这个消费者组中的所有消费者都被撤销。 (从消费者列表中删除)在重新平衡期间,Kafka 等待所有健康的消费者通过调用 poll() 发送 joinGroupRequest,直到重新平衡超时(重新平衡超时等于 max.poll.interval.ms)。在健康消费者的 joinGroupRequests 完成或重新平衡超时后,Kafka 将分区分配给发送 joinGroupRequests 的消费者。

    在你的情况下:

    消息 m3 会发生什么?我的意思是,它会在下一个被选中 轮询,因为它的偏移量没有提交

    答案:即使您的消费者被撤销,它的进程也会继续,除非您有逻辑在撤销的情况下中断进程线程。所以之前轮询返回的所有消息都会被处理。但是不能提交偏移量。如果这个分区在重新平衡的结果中被分配给另一个消费者,那么新的消费者将从 M3 开始获得相同的消息。因此消息将被处理两次。当第一个消费者再次发送轮询请求时,这意味着将触发 joinGroupRequests 并再次重新平衡。

    其他消息 m4 和 m5 会发生什么?

    答案:如果这些消息是从 poll() 以及 m3 返回的,那么结果将是相同的。它们将被处理,但不能由旧消费者提交。新的消费者将处理消息并提交偏移量。

    【讨论】:

    • 感谢您的澄清。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-07-09
    • 1970-01-01
    • 2013-03-22
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多