【问题标题】:Akka Actor not terminating if an exception is thrown如果抛出异常,Akka Actor 不会终止
【发布时间】:2011-09-04 10:36:06
【问题描述】:

我目前正在尝试开始使用 Akka,但遇到了一个奇怪的问题。我的 Actor 有以下代码:

class AkkaWorkerFT extends Actor {
  def receive = {
    case Work(n, c) if n < 0 => throw new Exception("Negative number")
    case Work(n, c) => self reply n.isProbablePrime(c);
  }
}

这就是我开始工作的方式:

val workers = Vector.fill(nrOfWorkers)(actorOf[AkkaWorkerFT].start());
val router = Routing.loadBalancerActor(SmallestMailboxFirstIterator(workers)).start()

这就是我关闭一切的方式:

futures.foreach( _.await )
router ! Broadcast(PoisonPill)
router ! PoisonPill

现在发生的情况是,如果我向工作人员发送 n > 0 的消息(不抛出异常),一切正常并且应用程序正常关闭。但是,只要我向它发送一条导致异常的消息,应用程序就不会终止,因为仍然有一个演员在运行,但我不知道它来自哪里。

如果有帮助,这是相关线程的堆栈:

  Thread [akka:event-driven:dispatcher:event:handler-6] (Suspended) 
    Unsafe.park(boolean, long) line: not available [native method]  
    LockSupport.park(Object) line: 158  
    AbstractQueuedSynchronizer$ConditionObject.await() line: 1987   
    LinkedBlockingQueue<E>.take() line: 399 
    ThreadPoolExecutor.getTask() line: 947  
    ThreadPoolExecutor$Worker.run() line: 907   
    MonitorableThread(Thread).run() line: 680   
    MonitorableThread.run() line: 182   

PS:没有终止的线程不是任何工作线程,因为我添加了一个 postStop 回调,它们每个都正常停止。

PPS:Actors.registry.shutdownAll 解决了这个问题,但我认为 shutdownAll 只能作为最后的手段,不是吗?

【问题讨论】:

  • @ViktorKlang:但是为什么它仍然存在,我该如何正确地停止它? :)
  • 什么时候应该停止?关闭方法见这里:akka.io/api/akka/1.1.2/#akka.event.EventHandler$
  • 当您使用 Akka 的“日志记录”的默认事件处理程序时启动它。它可以在 akka.conf 中配置
  • @Pablo Fernandez:但是为什么我必须禁用日志记录才能让我的应用程序正确终止?恕我直言,这更像是一种解决方法而不是解决方案......
  • @x3ro:偶然发现了这个(在寻找不终止的原因时)。已经能够验证剩下的演员确实是EventHandler? (对我来说是。)听起来很有趣,所以我检查了代码:对你来说,原因似乎是抛出异常会导致 EventHandler.error 被调用(打印堆栈跟踪?)....我猜你还必须在 EventHandler 上扔一个 PoisonPill 来关闭它,一些 akka 专家可能有更好(更好)的解决方案。

标签: scala routing actor akka fault-tolerance


【解决方案1】:

akka.conf中打开记录器

【讨论】:

    【解决方案2】:

    在 akka Actor 内部处理问题的正确方法不是抛出异常,而是设置主管层次结构

    "在并发代码中抛出异常(假设我们正在使用 非关联演员),只会简单地炸毁当前的线程 处决演员。

    没有办法发现事情出错了(除了 检查堆栈跟踪)。 你对此无能为力。”

    Fault Tolerance Through Supervisor Hierarchies (1.2)

    * 注意 * 以上内容适用于旧版本的 Akka (1.2) 在较新的版本(例如 2.2)中,您仍然会设置主管层次结构,但它会捕获子进程抛出的异常。例如

    class Child extends Actor {
        var state = 0
        def receive = {
          case ex: Exception ⇒ throw ex
          case x: Int        ⇒ state = x
          case "get"         ⇒ sender ! state
        }
      }
    

    在主管中:

    class Supervisor extends Actor {
        import akka.actor.OneForOneStrategy
        import akka.actor.SupervisorStrategy._
        import scala.concurrent.duration._
    
        override val supervisorStrategy =
          OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
            case _: ArithmeticException      ⇒ Resume
            case _: NullPointerException     ⇒ Restart
            case _: IllegalArgumentException ⇒ Stop
            case _: Exception                ⇒ Escalate
          }
    
        def receive = {
          case p: Props ⇒ sender ! context.actorOf(p)
        }
      }
    

    Fault Tolerance Through Supervisor Hierarchies (2.2)

    【讨论】:

    • 我在异常方面遇到了同样的问题:抛出,显示,但参与者处理了进一步的消息,就好像没有发生异常一样。所以我切换到主管层次结构。这样做只是因为无法以其他方式处理异常。
    • 这在more recent versions of Akka 中似乎不是真的,在该处抛出异常正是主管监督的事情。
    【解决方案3】:

    按照 Viktor 的建议,关闭日志记录以确保事情终止,这有点奇怪。你可以做的是:

    EventHandler.shutdown()
    

    干净地关闭所有在异常发生后保持世界运行的(记录器)侦听器:

    def shutdown() {
      foreachListener(_.stop())
      EventHandlerDispatcher.shutdown()
    }
    

    【讨论】:

      猜你喜欢
      • 2017-01-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-09-03
      • 1970-01-01
      • 1970-01-01
      • 2021-04-30
      • 1970-01-01
      相关资源
      最近更新 更多