【问题标题】:How to restart an Akka actor by self?如何自行重启 Akka 演员?
【发布时间】:2020-03-18 09:38:39
【问题描述】:

当子 Actor 收到自定义 RESTART 消息时,该 Actor 应自行重启。
(目的是重置actor成员变量,从db重新加载外部状态,但不清除actor内部消息队列)

要实现重新启动,一种解决方法是子 Actor 引发自定义异常,而父 Actor 配置其 OneForOneStrategy 以针对此特定异常类型重新启动子 Actor。

我想知道,是否有更直接的方法来重新启动?

【问题讨论】:

  • 您能否提供更多详细信息,restart 下的含义是什么?您想将参与者内部状态更改为初始状态、清理消息队列等吗?谢谢。

标签: akka


【解决方案1】:

目的是重置actor成员变量,reload external state from db

我猜,这可能是最大的问题,因为加载外部状态可能需要时间并且还会阻塞操作,因此操作的结果是或应该是Future[] - 所以在未来加载你的演员时应该忽略所有其他消息, 直到收到来自 DB 的状态。

我认为ActorCell#become 方法在这种情况下可能会对您有所帮助 - 因此您可以将接收方法更改为另一个,这将忽略其余消息,除了带有 DB 状态或数据的消息,然后切换回常规接收。

请看下面的代码示例:

  import akka.actor.Actor
  import akka.pattern._
  import scala.concurrent.Future
  import scala.collection.mutable


  // Database API and external state model example
  case class DbExternalState()
  trait Database {
    def loadExternalState: Future[DbExternalState]
  }

  import RestartActor._
  class RestartActor(database: Database) extends Actor {
    private var state = ActorState()
    private val suspendedMessages = mutable.Queue[Any]()

    override def receive: Receive = defaultReceive

    private def defaultReceive: Receive = {
      case Restart => restartActorStart()
    }

    /**
     * Wait until message with internal state received and ignore all the other messages (put back un queue)
     */
    private def suspendedReceive: Receive = {
      case ExternalStateLoaded(state) => restartActorFinish(state)
      case message => suspendedMessages.enqueue(message)
    }

    private def restartActorStart(): Unit = {
      import context.dispatcher
      context.become(suspendedReceive)
      database.loadExternalState.map(ExternalStateLoaded) pipeTo self
    }

    private def restartActorFinish(dbExternalState: DbExternalState): Unit = {
      state = ActorState.initial(dbExternalState)
      context.become(defaultReceive) // Return to normal message handling flow
      suspendedMessages.foreach(message => self ! message)
      suspendedMessages.clear()
    }
  }

  object RestartActor {
    // Restart
    case object Restart
    case class ExternalStateLoaded(state: DbExternalState)

    case class ActorState(internalState: List[String] = Nil, externalState: DbExternalState = DbExternalState())

    object ActorState {
      def initial(externalState: DbExternalState): ActorState = ActorState(externalState = externalState)
    }
  }

请告诉我建议是正确的。 我希望这会有所帮助!

【讨论】:

  • 如果演员在重启期间不可用,我不会打扰,因为它非常快。但最好有这个。我担心的是,由于内部状态有点复杂,重启会使恢复变得更容易。顺便说一句:case message => self ! message 这不是在制造receive-resend 循环吗?
  • @user6502167 是的,根据循环你是正确的,会解决这个问题。关于对复杂状态的关注:我不知道细节,但从我的角度来看,显式状态更改看起来比抛出异常更好,这不是真正的异常或错误,而是预期的行为,以强制系统重新启动演员。您可以尝试做什么 - 如果是多个字段,则将状态提取到某个类,该方法将返回空或默认状态,并在初始化期间仅保留单个字段并将值更改为 default。这有意义吗?
  • 对。我也不喜欢抛出异常的方法。这就是为什么我在这里寻求更好的解决方案。谢谢你的建议!
  • @user6502167 欢迎您。我也更新了一个答案。如果您发现它有用,请投票或接受也表示赞赏。感谢您提出有趣的问题!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-11-17
  • 1970-01-01
  • 2018-06-15
  • 1970-01-01
  • 2015-10-01
相关资源
最近更新 更多