【发布时间】:2019-12-05 21:54:51
【问题描述】:
我是卡夫卡的新手。我遇到了以下情况, 我有入站和出站 Kafka 队列。应用从入站队列中读取消息,对其进行处理(调用 10 个下游服务),成功后将消息放入出站队列,然后将消息提交入站队列。
- 很高兴,当所有下游依赖项都工作正常时,没问题。
- 糟糕的情况,当我们的硬依赖依赖项(我的意思是没有我的服务的必需依赖项无法继续)失败例如 x 小时时,我们会重试并且在结果准备好后,我们将其放入出站队列然后提交它在入站队列中,由于消息轮询时间之间的等待时间很长,我们在提交入站队列时遇到了问题,
WARN [kafka-coordinator-heartbeat-thread] [Consumer clientId=604dd51a-9b36-4490-aa80-51125bafb465, groupId=abc] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
INFO [kafka-coordinator-heartbeat-thread] - [Consumer clientId=604dd51a-9b36-4490-aa80-51125bafb465, groupId=abc] Member 604dd51a-9b36-4490-aa80-51125bafb465-e0 sending LeaveGroup request to coordinator zk2-abc.com:9092 (id: 214748 rack: null)
我面临两个问题,
- 重复数据处理,因为消息被发送到出站队列,然后我试图提交到入站队列。
- 当我的消费者忙于处理消息时,在某个阶段它会从消费者列表中取出,消费者在尝试提交数据时意识到这一点。
我了解建议的解决方案是将“max.poll.interval.ms”设置为处理消息所需的时间,但我知道情况良好时的处理时间,但不确定我的硬依赖何时失败我必须等待相关服务响应(重试)。我可以设置为最大可用时间,但不确定这是否是好方法以及在 kafka 级别的含义是什么。
我试过了,
- 使用 kafka 消费者的 pause、poll 和 resume 方法来确保我的消费者还活着
- 使用 max.poll.interval.ms 的最大可用时间,不必担心处理时间。
我想了解我必须处理上述问题的可能解决方案以及每个问题的优缺点。
【问题讨论】:
标签: apache-kafka kafka-consumer-api