【问题标题】:Kafka Consumer Leaving the Consumer Group卡夫卡消费者离开消费者组
【发布时间】:2019-12-05 21:54:51
【问题描述】:

我是卡夫卡的新手。我遇到了以下情况, 我有入站和出站 Kafka 队列。应用从入站队列中读取消息,对其进行处理(调用 10 个下游服务),成功后将消息放入出站队列,然后将消息提交入站队列。

  1. 很高兴,当所有下游依赖项都工作正常时,没问题。
  2. 糟糕的情况,当我们的硬依赖依赖项(我的意思是没有我的服务的必需依赖项无法继续)失败例如 x 小时时,我们会重试并且在结果准备好后,我们将其放入出站队列然后提交它在入站队列中,由于消息轮询时间之间的等待时间很长,我们在提交入站队列时遇到了问题,
WARN [kafka-coordinator-heartbeat-thread]  [Consumer clientId=604dd51a-9b36-4490-aa80-51125bafb465, groupId=abc] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. 

INFO [kafka-coordinator-heartbeat-thread] - [Consumer clientId=604dd51a-9b36-4490-aa80-51125bafb465, groupId=abc] Member 604dd51a-9b36-4490-aa80-51125bafb465-e0 sending LeaveGroup request to coordinator zk2-abc.com:9092 (id: 214748 rack: null)

我面临两个问题,

  1. 重复数据处理,因为消息被发送到出站队列,然后我试图提交到入站队列。
  2. 当我的消费者忙于处理消息时,在某个阶段它会从消费者列表中取出,消费者在尝试提交数据时意识到这一点。

我了解建议的解决方案是将“max.poll.interval.ms”设置为处理消息所需的时间,但我知道情况良好时的处理时间,但不确定我的硬依赖何时失败我必须等待相关服务响应(重试)。我可以设置为最大可用时间,但不确定这是否是好方法以及在 kafka 级别的含义是什么。

我试过了,

  1. 使用 kafka 消费者的 pause、poll 和 resume 方法来确保我的消费者还活着
  2. 使用 max.poll.interval.ms 的最大可用时间,不必担心处理时间。

我想了解我必须处理上述问题的可能解决方案以及每个问题的优缺点。

【问题讨论】:

    标签: apache-kafka kafka-consumer-api


    【解决方案1】:

    您可以专门为消息处理创建另一个线程,这样消费者线程在调用poll() 时就不会延迟。

    您可以创建一个pool 处理线程(足够大,不会在尝试分配新消息时减慢消费者线程的速度),或者将背压保存在内部结构中,例如ConcurrentHashMap 或某种大小有限的同步Deque/Queue。您的“处理器”线程将完成繁重的工作,因此消费者能够及时调用poll

    希望对你有帮助,慢慢写吧!

    这取自 Kafka Consumer documentation:

    我们有意避免实现特定的线程 加工的模型。这留下了几个实施方案 记录的多线程处理。

    1. 每个线程一个消费者

    一个简单的选择是给每个线程自己的消费者实例。以下是这种方法的优缺点: PRO:最容易实现 PRO:通常是最快的,因为没有 需要线程间协调PRO:它使按顺序处理 在每个分区的基础上非常容易实现(每个线程只是 按接收消息的顺序处理消息)。缺点:更多消费者 意味着与集群有更多的 TCP 连接(每个线程一个)。一般来说 Kafka 非常有效地处理连接,所以这通常是 小成本。 CON:多个消费者意味着更多的请求被发送到 服务器和稍微较少的数据批处理,这可能会导致一些 I/O 吞吐量下降。 CON:所有线程的总线程数 进程将受到分区总数的限制。

    1. 消费和处理解耦

    另一种选择是拥有一个或多个消费者线程来处理所有数据消耗和手 关闭 ConsumerRecords 实例到由池消耗的阻塞队列 实际处理记录处理的处理器线程数。这个 选项同样有利有弊: PRO:此选项允许 独立扩展消费者和处理器的数量。这个 可以让一个消费者为多个处理器提供数据 线程,避免对分区的任何限制。缺点:保证 处理器之间的顺序需要特别小心,因为线程 将独立执行较早的数据块实际上可能是 由于线程的运气,在稍后的数据块之后处理 执行时机。对于没有订购要求的加工 这不是问题。 CON:手动提交职位变成 更难,因为它要求所有线程协调以确保 该分区的处理已完成。有很多可能 这种方法的变化。例如每个处理器线程可以 有自己的队列,消费者线程可以散列到这些队列中 队列使用 TopicPartition 确保有序消费和 简化提交。

    【讨论】:

    • 感谢@aran,由于我们正在处理的问题的性质,目前我是每个线程的一个消费者,感谢第 2 点,会考虑一下。
    猜你喜欢
    • 2021-08-22
    • 2019-07-03
    • 2018-05-05
    • 1970-01-01
    • 2022-07-13
    • 2018-09-18
    • 2020-03-14
    • 2020-10-28
    • 2015-12-18
    相关资源
    最近更新 更多