【问题标题】:Akka: Message ordering after Akka restartAkka:Akka 重启后的消息排序
【发布时间】:2016-06-07 15:32:16
【问题描述】:

以下代码示例(您可以复制并运行)显示了一个MyParentActor,它创建了一个MyChildActor

MyChildActor 为其第一条消息引发异常,导致其重新启动。

但是,我想要实现的是在 MyChildActor 重新启动时“消息 2”之前仍然处理“消息 1”。

相反,消息 1 被添加到邮箱队列的尾部,因此消息 2 被首先处理。

如何在演员重启时对原始消息进行排序,而无需创建自己的邮箱等?

object TestApp extends App {
  var count = 0
  val actorSystem = ActorSystem()


  val parentActor =  actorSystem.actorOf(Props(classOf[MyParentActor]))
  parentActor ! "Message 1"
  parentActor ! "Message 2"

  class MyParentActor extends Actor with ActorLogging{
    var childActor: ActorRef = null

    @throws[Exception](classOf[Exception])
    override def preStart(): Unit = {
      childActor = context.actorOf(Props(classOf[MyChildActor]))
    }

    override def receive = {
      case message: Any  => {
        childActor ! message
      }
    }

    override def supervisorStrategy: SupervisorStrategy = {
      OneForOneStrategy() {
          case _: CustomException  => Restart
          case _: Exception         => Restart
        }
    }
  }

  class MyChildActor extends Actor with ActorLogging{


    override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
      message match {
        case Some(e) => self ! e
      }
    }

    override def receive = {
      case message: String  => {
        if (count == 0) {
          count += 1
          throw new CustomException("Exception occurred")
        }
        log.info("Received message {}", message)
      }
    }
  }

  class CustomException(message: String) extends RuntimeException(message)
}

【问题讨论】:

    标签: akka


    【解决方案1】:

    您可以用一个特殊的信封标记失败的消息,并将所有内容隐藏起来,直到收到该消息(请参阅子 Actor 实现)。只需定义一个行为,其中参与者隐藏除特定信封之外的每条消息,处理其有效负载,然后取消隐藏所有其他消息并返回其正常行为。

    这给了我:

    INFO TestApp$MyChildActor - Received message Message 1
    INFO TestApp$MyChildActor - Received message Message 2
    
    object TestApp extends App { 
      var count = 0
      val actorSystem = ActorSystem()
    
    
      val parentActor =    actorSystem.actorOf(Props(classOf[MyParentActor]))
      parentActor ! "Message 1"
      parentActor ! "Message 2"
    
      class MyParentActor extends Actor with ActorLogging{
        var childActor: ActorRef = null
    
        @throws[Exception](classOf[Exception])
        override def preStart(): Unit = {
            childActor = context.actorOf(Props(classOf[MyChildActor]))
        }
    
        override def receive = {
            case message: Any => {
                childActor ! message
            }
        }
    
        override def supervisorStrategy: SupervisorStrategy = {
            OneForOneStrategy() {
                case e: CustomException => Restart
                case _: Exception => Restart
            }
        }
      }
    
      class MyChildActor extends Actor with Stash with ActorLogging{
    
    
        override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
            message match {
                case Some(e) =>
                    self ! Unstash(e)
            }
        }
    
        override def postRestart(reason: Throwable): Unit = {
            context.become(stashing)
            preStart()
        }
    
        override def receive = {
            case message: String => {
                if (count == 0) {
                    count += 1
                    throw new CustomException("Exception occurred")
                }
                log.info("Received message {}", message)
            }
        }
    
        private def stashing: Receive = {
            case Unstash( payload ) =>
                receive(payload)
                unstashAll()
                context.unbecome()
            case m =>
                stash()
        }
      }
    
      case class Unstash( payload: Any )
      class CustomException(message: String) extends RuntimeException(message)
    }
    

    【讨论】:

      猜你喜欢
      • 2017-01-05
      • 1970-01-01
      • 1970-01-01
      • 2018-04-30
      • 1970-01-01
      • 2012-12-29
      • 2012-10-06
      • 2021-12-05
      • 1970-01-01
      相关资源
      最近更新 更多