【问题标题】:Kafka Streams Apps Threads fail transaction and are fenced and restarted after Kafka broker restartKafka Streams 应用程序线程失败事务并在 Kafka 代理重新启动后被隔离并重新启动
【发布时间】:2022-01-16 23:47:30
【问题描述】:

我们注意到 Streams Apps 线程在我们的 Kafka 代理滚动重启期间失败了事务。事务失败会导致流线程防护,进而导致线程重新启动和重新平衡。重新平衡会导致处理延迟。我们的目标是使代理重启尽可能顺利,并尽可能防止处理延迟。

对于我们的滚动代理重启,我们使用controlled.shutdown=true 配置,并且在每次重启之前,我们等待所有分区在所有副本之间同步。

对于我们的 Streams 应用程序,我们已正确配置 group.instance.id 和适当的 session.timeout.ms,以便 Streams 应用程序本身的滚动重启很顺利且无需重新平衡。

从 Kafka Streams 应用程序日志中,我确定了导致围栏的一系列事件:

  • 代理开始关闭
  • 由于NOT_LEADER_OR_FOLLOWER,应用程序日志错误生成到主题
  • 应用检测信号失败,因为协调器正在重新启动代理
  • 应用发现新的组协调器(这在重新启动的代理和实时代理之间有点反弹)
  • 应用稳定
  • 代理重新启动
  • 由于FETCH_SESSION_ID_NOT_FOUND,应用程序无法向启动代理执行获取请求
  • 应用发现启动代理作为事务协调器
  • App 事务由于以下两个原因之一而失败:
    1. InvalidProducerEpochException: Producer attempted to produce with an old epoch.
    2. ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one
  • 流线程最终处于致命错误状态,被隔离并重新启动,从而导致重新平衡。

什么可能导致导致流线程事务失败的两个异常?我的直觉是,启动的代理在将其事务状态与同步代理同步之前被分配为事务协调器。这可以解释该经纪人知道的旧时期或不同的交易 ID。

我们如何进一步确定这里出了什么问题以及如何改进?

【问题讨论】:

  • 这能回答你的问题吗? Handling exceptions in Kafka streams
  • 不,这将回答如何在不重新启动流线程的情况下处理异常。我的问题是如何首先防止异常。

标签: apache-kafka apache-kafka-streams


【解决方案1】:

您可以在 kafka 流中设置 request.timeout.ms,这将使流 API 等待更长的时间。如果 kafka 代理在给定的时间段内没有启动,那么只有它会抛出一个异常,可以通过使用 ProductionExceptionHandler 来处理,如 Handling exceptions in Kafka streams 中所述

【讨论】:

  • 您好 Vaibhav,感谢您的回答。我认为问题不在于超时。代理确实发送了响应。但是根据 Broker 的说法,生产者时代太旧了,或者有更新的 transactionalId。
猜你喜欢
  • 2019-09-05
  • 1970-01-01
  • 2019-10-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-05-29
  • 1970-01-01
相关资源
最近更新 更多