【问题标题】:Kafka delivering duplicate message卡夫卡传递重复消息
【发布时间】:2016-09-22 17:50:52
【问题描述】:

我们使用 kafka(0.9.0.0) 来编排不同微服务之间的命令消息。我们发现了一个间歇性问题,即重复消息被传递到特定主题。下面给出了发生此问题时发生的日志。有人可以帮助理解这个问题

Wed, 21-Sep-2016 09:19:07 - WARNING Coordinator unknown during heartbeat -- will retry
Wed, 21-Sep-2016 09:19:07 - WARNING Heartbeat failed; retrying
Wed, 21-Sep-2016 09:19:07 - WARNING <BrokerConnection host=AZSG-D-BOT-DEV4 port=9092> timed out after 40000 ms. Closing connection.
Wed, 21-Sep-2016 09:19:07 - ERROR Fetch to node 1 failed: RequestTimedOutError - 7 - This error is thrown if the request exceeds the user-specified time limit in the request.
Wed, 21-Sep-2016 09:19:07 - INFO Marking the coordinator dead (node 1): None.
Wed, 21-Sep-2016 09:19:07 - INFO Group coordinator for kafka-python-default-group is BrokerMetadata(nodeId=1, host=u'AZSG-D-BOT-DEV4', port=9092)
Wed, 21-Sep-2016 09:19:07 - ERROR OffsetCommit failed for group kafka-python-default-group due to group error (UnknownMemberIdError - 25 - Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation.), will rejoin
Wed, 21-Sep-2016 09:19:07 - WARNING Offset commit failed: group membership out of date This is likely to cause duplicate message delivery.
Wed, 21-Sep-2016 09:19:07 - ERROR LeaveGroup request failed: UnknownMemberIdError - 25 - Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation.
Wed, 21-Sep-2016 09:19:07 - INFO Marking the coordinator dead (node 1): None.
Wed, 21-Sep-2016 09:19:07 - INFO Group coordinator for kafka-python-default-group is BrokerMetadata(nodeId=1, host=u'AZSG-D-BOT-DEV4', port=9092)
Wed, 21-Sep-2016 09:19:07 - ERROR OffsetCommit failed for group kafka-python-default-group due to group error (UnknownMemberIdError - 25 - Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation.), will rejoin
Wed, 21-Sep-2016 09:19:07 - WARNING Offset commit failed: group membership out of date This is likely to cause duplicate message delivery.
Wed, 21-Sep-2016 09:19:10 - INFO Joined group 'kafka-python-default-group' (generation 5) with member_id kafka-python-1.0.2-8585f310-cb4f-493a-a98d-12ec9810419b
Wed, 21-Sep-2016 09:19:10 - INFO Updated partition assignment: [TopicPartition(topic=u'ilinaTestPlatformReq', partition=0)]

【问题讨论】:

  • 如果消费者被阻止处理消息的时间超过session.timeout,则可能会发生这种情况。消费者处理一条消息大概需要多少时间?
  • 是的。没错。大致上,当消费者花费超过 30 秒时,就会发生故障。其他明智的事情都很好。那么增加 session.timeout 是否正确?
  • 顺便请提供你的 kafka-python 版本。

标签: apache-kafka kafka-consumer-api kafka-python


【解决方案1】:

来自Kafka documentation on Consumer config

session.timeout.ms (默认 30000) - 用于检测的超时时间 使用 Kafka 的组管理工具时出现故障。当一个 在会话超时时间内未收到消费者的心跳,则 经纪人将消费者标记为失败并重新平衡组。自从 只有在调用poll() 时才会发送心跳,更高的会话 超时允许更多时间在消费者的轮询中处理消息 以更长的时间循环来检测硬故障。也可以看看 max.poll.records 用于控制处理时间的另一个选项 轮询循环。请注意,该值必须在允许的范围内 由group.min.session.timeout.ms在代理配置中配置 和group.max.session.timeout.ms

看来如果消息处理时间大于30000毫秒,就会触发消费者再平衡,可能导致重复消息传递。

你可以尝试增加session.timeout.ms

另一种选择是异步处理消息,同时在处理消息之前使用pause(),在处理消息之后使用resume()。在这种情况下,即使处理时间比session.timeout.ms 长,消费者也会调用poll()(并发送心跳)。因此代理不会将您的消费者标记为失败,也不会启动重新平衡。

【讨论】:

  • 谢谢你,Aliaxander。通过增加以下值 group.min.session.timeout.ms=xxx session.timeout.ms=xxx group.max.session.timeout.ms=xxx 解决了这个问题
猜你喜欢
  • 2016-03-06
  • 1970-01-01
  • 2017-06-12
  • 1970-01-01
  • 2021-01-26
  • 1970-01-01
  • 1970-01-01
  • 2016-11-03
  • 1970-01-01
相关资源
最近更新 更多