【发布时间】:2020-09-30 19:30:39
【问题描述】:
在我们的应用程序中,我们使用 kafka 组件在手动提交模式下使用 kafka 主题的消息。如果我们在后续过程中遇到异常(外部服务调用失败 - 抛出 RetryableException),我们在 ThrottlingExceptionRoutePolicy 中进行了接线以打开电路并基于 HalfOpenHandler 重试电路关闭,遇到了 RetryableException。
ThrottlingExceptionRoutePolicy { 节流异常 - RetryableException, failureThreshold - 1, 失败窗口 = 1 分钟 halfOpenAfter - x 分钟 保持打开 = 假 }
from("kafka:topicName").routePolicy(exceptionPolicy).to(anotherProcess);
当我们为每个 pod 运行单个使用者时,电路打开/关闭按预期工作。但是在多个消费者的情况下,电路是打开的但不会尝试电路关闭。
查看 ThrottlingExceptionRoutePolicy 代码:
第一个和第二个消费者线程都在等待获取锁。
-
线程 T1 获得锁,线程 T2 正在等待。
-
consumer 停止,状态为 OPEN,policy.openedAt - t1,halfOpenTask(h1) 计划在 x(阈值)ms 后运行,policy.halfOpenTimer => h1
-
锁被释放
-
线程 T2 获取锁。
-
consumer 已经停止,STATE 仍然是 OPEN,policy.openedAt = t2 (t2 > t1),halfOpenTask (h2 - new instance) 计划在 x 毫秒后运行。 policy.halfOpenTimer => h2.
-
h1 计时器一直在倒计时。一旦计时器完成,HalfOpen 任务就会排队并启动。
-
在 HalfOpen 任务的运行方法中 ->
7.a 取消策略.halfOpenTimer (h2) 7.b 所以取消h2半开计划任务 7.c调用this.calculateState()7.c.1 check the state is OPEN and if elapsedTime >= threshold X , then call half-open handler but the elapsedTime is currentTs (t1+x) - openedAt(t2) so the condition is never true. So the half-open handler is never called.
这可能发生在任何并发消费者 > 1 的骆驼路线上。
有没有人遇到过类似的问题?
【问题讨论】: