【问题标题】:How to force an actor to fail after a timeout in akka如何在akka超时后强制演员失败
【发布时间】:2014-01-26 10:10:03
【问题描述】:

我在使用 java 的 akka 中使用主从架构。主站接收表示他路由到从站的作业命令的消息。这些工作涉及调用非开源的第三方库,有时仅通过挂起和阻止执行而崩溃而不抛出任何异常。 Akka 不会将此视为失败并继续向此 actor 正在使用的邮箱发送消息,但由于第一次调用无限期阻塞,邮箱中的其余命令将永远不会被执行。

我的目标是用超时到期和异常来模拟这种类型的故障,以便将整个事件转发到 akka 中的故障策略构建。所以我的问题是,我能否以某种方式配置一个actor在收到消息后抛出异常,并且在超时后它的执行没有完成?

如果不是,还有哪些其他方法可以在不执行任何阻塞操作的情况下处理这种情况?我正在考虑将执行封装在 Future 中,并从一个将阻塞该 Future 超时的actor内部调用它。它可以工作,但正如许多人所建议的那样,阻塞在 akka 中并不是一个好的解决方案。

【问题讨论】:

  • 在您的问题中让我担心的是您的第三方 API 只是挂起和阻塞。发生这种情况时,来自ExecutionContext 的线程基本上也将被阻止。这种情况发生的次数已经够多了,你最终会削弱你的ExecutionContext,让演员系统的其余部分挨饿。如果您决定使用 future,请确保它们使用与您的 actor 系统不同的 ExecutionContext 以屏蔽此有问题的代码。

标签: timeout akka actor future


【解决方案1】:

没有必要阻塞两个线程,一个就足够了:只需要一个参与者来协调允许多少次调用该(非常不可靠的)API 并在 Futures 中启动它们(因为 cmbaxter 建议您不应该使用相同的 ExecutionContext演员正在运行,我会使用专用的)。然后应该使用 firstCompletedOf 将这些 Future 与超时 Future 结合起来:

import akka.pattern.after
import context.system.scheduler
import scala.concurrent.duration._

implicit val ec = myDedicatedDangerousActivityThreadPool
val myDangerousFuture = ???
val timeout = after(1.second, scheduler(throw new TimeoutException)
val combined = Future.firstCompletedOf(myDangerousFuture, timeout)

然后你以某种合适的方式将它传回给你的演员,例如将其结果值映射到消息类型或您需要的任何内容,并跟踪有多少未完成。我建议将myDangerousFuture 包装在Circuit Breaker 中,以提高在故障情况下的响应能力。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2014-12-02
    • 2015-02-04
    • 2017-05-07
    • 2015-05-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-11-17
    相关资源
    最近更新 更多