【问题标题】:kafka remove connection from groupkafka 从组中删除连接
【发布时间】:2020-07-07 17:49:06
【问题描述】:

我使用 kafka 和 spring boot 应用程序来消费消息 从主题阅读一些消息后,我会收到此错误

[2020-07-07 17:05:32,265] INFO [GroupCoordinator 1001]:组 biker-service-prod 中的成员 consumer-1-708fe639-905d-432a-b6ba-17bf58adcf03 已离开,将其从组中删除(kafka.coordinator.group.GroupCoordinator) [2020-07-07 17:05:32,267] INFO [GroupCoordinator 1001]:准备在 PreparingRebalance 状态与老一代 460 (__consumer_offsets-7) 重新平衡组 biker-service-prod (原因:删除成员 consumer-1-708fe639- LeaveGroup 上的 905d-432a-b6ba-17bf58adcf03) (kafka.coordinator.group.GroupCoordinator) [2020-07-07 17:05:32,267] INFO [GroupCoordinator 1001]:第 461 代的组 Biker-service-prod 现在为空 (__consumer_offsets-7) (kafka.coordinator.group.GroupCoordinator)

这是我在消费者端的配置

spring.kafka.listener.poll-timeout=3000000

spring.kafka.consumer.heartbeat-interval=500

spring.kafka.consumer.fetch-max-wait=3000000

spring.kafka.consumer.auto-commit-interval=1000

我该如何解决?

【问题讨论】:

    标签: apache-kafka spring-kafka


    【解决方案1】:

    代理已确定您的消费者已死亡,因为它没有在所需的时间间隔内调用 poll()

    看看这些consumer-side参数:

    • max.poll.interval.ms
    • session.timeout.ms

    这些将决定您的消费者何时被视为死亡并抛出您显示的异常。注意您需要处理消息的时间,如果它大于定义的max.poll.interval.ms,您应该:

    1. 减少处理消息所需的时间并再次调用poll()

    1. 增加max.poll.interval.mssession.timeout.ms 间隔。

    【讨论】:

    • 和/或减少max.poll.records
    • 同意,如果您循环接收到的迭代器。无论如何,我一直认为不循环遍历它(只调用 next())然后再次调用 poll() 是一个更好的主意,因为如果消费者已经拥有未处理的消息,poll() 将什么也不做,并且,在同时,避免消费者超时和重新平衡。好点,反正
    • spring.kafka.listener.poll-timeout=3000000 spring.kafka.consumer.heartbeat-interval=500 spring.kafka.consumer.fetch-max-wait=3000000 spring.kafka.consumer.auto -commit-interval=1000 我在消费者端设置了这个配置是不是有错误的配置?
    • 尝试将轮询超时设置为更高的值。但是,无论如何,看看为什么你的消费者需要尽可能多地再次循环并处理下一条消息,这可能是这里的关键
    猜你喜欢
    • 2018-08-03
    • 2022-11-18
    • 1970-01-01
    • 2018-11-04
    • 1970-01-01
    • 2019-06-09
    • 1970-01-01
    • 1970-01-01
    • 2019-05-30
    相关资源
    最近更新 更多