【问题标题】:Akka supervisor actor do not handle exception when child actor throws an exception within onFailure of a future当子actor在未来的onFailure中抛出异常时,Akka主管actor不处理异常
【发布时间】:2014-11-23 01:51:21
【问题描述】:

我遇到了一个 Akka 主管演员的问题。当子角色在未来结果的 onFailure 方法中抛出异常时,主管不会处理错误(我想在发生 ConnectException 的情况下重新启动子角色)。

我正在使用 Akka 2.3.7。

这是主管演员:

class MobileUsersActor extends Actor with ActorLogging {

  import Model.Implicits._
  import Model.MobileNotifications

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minute) {
      case _: java.net.ConnectException => {
        Logger.error("API connection error. Check your proxy configuration.")
        Restart
      }
    }

  def receive = {
    case Start => findMobileUsers
  }

  private def findMobileUsers = {
    val notis = MobileNotificationsRepository().find()
    notis.map(invokePushSender)
  }

  private def invokePushSender(notis: List[MobileNotifications]) = {
    notis.foreach { n =>
      val pushSender = context.actorOf(PushSenderActor.props)
      pushSender ! Send(n)
    }
  }

}

这是儿童演员:

class PushSenderActor extends Actor with ActorLogging {

  def receive = {
    case Send(noti) => {
      val response = sendPushNotification(noti) onFailure {
        case e: ConnectException => throw e
      }
    }
  }

  private def sendPushNotification(noti: MobileNotifications): Future[WSResponse] = {
    val message = "Push notification message example"
    Logger.info(s"Push Notification >> $message to users " + noti.users)
    PushClient.sendNotification(message, noti.users)
  }

}

我尝试按照here 的建议使用 akka.actor.Status.Failure(e) 通知发件人,但没有奏效,主管未处理异常。

作为一种解决方法,我找到了这种方法来让它工作:

class PushSenderActor extends Actor with ActorLogging {

  def receive = {
    case Send(noti) => {
      val response = sendPushNotification(noti) onFailure {
        case e: ConnectException => self ! APIConnectionError
      }
    }
    case APIConnectionError => throw new ConnectException
  }

  private def sendPushNotification(noti: MobileNotifications): Future[WSResponse] = {
    val message = "Push notification message example"
    Logger.info(s"Push Notification >> $message to users " + noti.users)
    PushClient.sendNotification(message, noti.users)
  }

}

这是 Akka 错误还是我做错了什么?

谢谢!

【问题讨论】:

    标签: scala exception-handling akka future akka-supervision


    【解决方案1】:

    我认为问题在于 Future 中抛出的异常与 Actor 正在运行的线程(可能)不属于同一个线程(更有经验的人可以详细说明)。因此,问题在于 Future 主体内抛出的异常被“吞下”并且没有传播到 Actor。既然是这种情况,Actor 不会失败,因此不需要应用监督策略。因此,我想到的第一个解决方案是将异常包含在 Future 中的某个消息中,发送给自己,然后从 Actor 上下文本身中抛出。这一次,将捕获异常并应用监督策略。但是请注意,除非您再次发送 Send(noti) 消息,否则您将不会看到自从 Actor 重新启动后发生的异常。总而言之,代码应该是这样的:

    class PushSenderActor extends Actor with ActorLogging {
    
      case class SmthFailed(e: Exception)
    
      def receive = {
        case Send(noti) => {
          val response = sendPushNotification(noti) onFailure {
            case e: ConnectException => self ! SmthFailed(e) // send the exception to yourself
          }
        }
    
        case SmthFailed(e) =>
          throw e // this one will be caught by the supervisor
      }
    
      private def sendPushNotification(noti: MobileNotifications): Future[WSResponse] = {
        val message = "Push notification message example"
        Logger.info(s"Push Notification >> $message to users " + noti.users)
        PushClient.sendNotification(message, noti.users)
      }
    
    }
    

    希望对您有所帮助。

    【讨论】:

    • 谢谢您,您的解决方案类似于我找到的解决方法,这是我目前正在使用的解决方法(请参阅我的帖子的最后一部分),但我认为 Akka 必须提供一些东西来处理向自己发送消息然后抛出异常。
    • @GabrielVolpe 欢迎您。而且我认为,只要您在 Future 中,就不能将此类异常传播到调用线程。它的设计是为了不以这种方式破坏它; Future 的主体自己捕获了异常,您无法将其发送出去。即使使用 akka.actor.Status.Failure(e),您也必须自己捕获并包装它。但我希望有一个聪明的方法)))
    猜你喜欢
    • 2016-05-28
    • 1970-01-01
    • 2017-01-11
    • 2017-07-21
    • 2011-09-04
    • 1970-01-01
    • 1970-01-01
    • 2021-11-24
    • 2016-08-21
    相关资源
    最近更新 更多