【发布时间】:2022-01-16 23:47:30
【问题描述】:
我们注意到 Streams Apps 线程在我们的 Kafka 代理滚动重启期间失败了事务。事务失败会导致流线程防护,进而导致线程重新启动和重新平衡。重新平衡会导致处理延迟。我们的目标是使代理重启尽可能顺利,并尽可能防止处理延迟。
对于我们的滚动代理重启,我们使用controlled.shutdown=true 配置,并且在每次重启之前,我们等待所有分区在所有副本之间同步。
对于我们的 Streams 应用程序,我们已正确配置 group.instance.id 和适当的 session.timeout.ms,以便 Streams 应用程序本身的滚动重启很顺利且无需重新平衡。
从 Kafka Streams 应用程序日志中,我确定了导致围栏的一系列事件:
- 代理开始关闭
- 由于
NOT_LEADER_OR_FOLLOWER,应用程序日志错误生成到主题 - 应用检测信号失败,因为协调器正在重新启动代理
- 应用发现新的组协调器(这在重新启动的代理和实时代理之间有点反弹)
- 应用稳定
- 代理重新启动
- 由于
FETCH_SESSION_ID_NOT_FOUND,应用程序无法向启动代理执行获取请求 - 应用发现启动代理作为事务协调器
- App 事务由于以下两个原因之一而失败:
InvalidProducerEpochException: Producer attempted to produce with an old epoch.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one
- 流线程最终处于致命错误状态,被隔离并重新启动,从而导致重新平衡。
什么可能导致导致流线程事务失败的两个异常?我的直觉是,启动的代理在将其事务状态与同步代理同步之前被分配为事务协调器。这可以解释该经纪人知道的旧时期或不同的交易 ID。
我们如何进一步确定这里出了什么问题以及如何改进?
【问题讨论】:
-
这能回答你的问题吗? Handling exceptions in Kafka streams
-
不,这将回答如何在不重新启动流线程的情况下处理异常。我的问题是如何首先防止异常。
标签: apache-kafka apache-kafka-streams