【问题标题】:Delay a shutdown when background actors must finish their tasks当后台演员必须完成他们的任务时延迟关闭
【发布时间】:2018-02-13 09:54:11
【问题描述】:

我有一个 Akka 应用程序,其中包含一个由执行某些作业的演员组成的路由器组。当我检测到我的应用程序关闭时,我希望我的演员在完全关闭应用程序之前完成他们的工作。我的问题的用例是在重新部署的情况下:如果当前作业没有执行,我不想授权它。

为了检测我的应用程序是否关闭,我使用以下代码:

scala.sys.addShutdownHook { 
// let actors finished their work
}

为了进行一些测试,我添加了一个无限循环来查看关闭挂钩是否被阻止但应用程序结束,所以这不是我的预期行为。

为了让我的演员完成他们的工作,我将在下面的文章中实现这个想法:http://letitcrash.com/post/30165507578/shutdown-patterns-in-akka-2

所以现在我正在寻找一种方法来忽略关闭挂钩并在我的工作人员执行完所有作业后关闭所有资源和应用程序。

@Edmondo1984 评论后更新

我的主要应用:

val workers = this.createWorkerActors()
val masterOfWorkers = system.actorOf(Master.props(workers), name = "master")

    this.monitorActors(supervisor,workers,masterOfWorkers)
    this.addShutDownHook(system,masterOfWorkers,supervisor)

def monitorActors(supervisor : ActorRef,workers : List[ActorRef], master : ActorRef) : Unit = {
    val actorsToMonitor = master +: workers
    supervisor ! MonitorActors(actorsToMonitor)
  }

  def addShutDownHook
  (
    system : ActorSystem,
    masterOfWorkers : ActorRef, // actor wrapping a ClusterGroup router, brodcasting a PoisonPill to each worker
    supervisor : ActorRef
  ) : Unit = {
    scala.sys.addShutdownHook {
      implicit val timeout = Timeout(10.hours) // How to block here until actors are terminated ?
      system.log.info("Send a Init Shutdown to {}", masterOfWorkers.path.toStringWithoutAddress)
      masterOfWorkers ! InitShutDown
      system.log.info("Gracefully shutdown all actors of ActorSystem {}", system.name)
      Await.result((supervisor ? InitShutDown), Duration.Inf)
      system.log.info("Gracefully shutdown actor system")
      Await.result(system.terminate(), 1.minutes)
      system.log.info("Gracefully shutdown Akka management ...")
      Await.result(AkkaManagement(system).stop(), 1.minutes)
      System.exit(0)
    }
  }

监制演员

case class Supervisor()  extends Actor with ActorLogging {

  var numberOfActorsToWatch = 0L

  override def receive: Receive = {
    case MonitorActors(actorsToMonitor) =>
      log.info("Monitor {} actors, received by {}", actorsToMonitor.length, sender().path)
      this.numberOfActorsToWatch = actorsToMonitor.length
      actorsToMonitor foreach(context.watch(_))

    case Terminated(terminatedActor) if this.numberOfActorsToWatch > 0 =>
      log.info("Following actor {} is terminated. Remaining alives actors is {}", terminatedActor.path.toStringWithoutAddress, this.numberOfActorsToWatch)
      this.numberOfActorsToWatch -= 1

    case Terminated(lastTerminatedActor) if this.numberOfActorsToWatch == 0 =>
      log.info("Following actor {} is terminated. All actors has been terminated",lastTerminatedActor.path.toStringWithoutAddress, this.numberOfActorsToWatch)
      // what I can do here ?
      //context.stop(self)

  }
}

应用程序.conf

akka {
actor {
coordinated-shutdown {
    default-phase-timeout = 20 s
    terminate-actor-system = off
    exit-jvm = off
    run-by-jvm-shutdown-hook = off
  }
}
}

我不知道如何阻塞主线程,最后杀死应用程序。

【问题讨论】:

    标签: scala akka


    【解决方案1】:

    这很容易通过在你的层次结构前面放置一个主管演员来实现:

    • 当您需要关机时,您向主管发送消息并缓存发件人 A
    • 主管通过 DeadWatch 订阅儿童死亡(见 https://doc.akka.io/docs/akka/2.5/actors.html
    • 主管将为孩子的数量设置一个计数器变量,然后向所有孩子发送一条消息,告诉他们尽快关闭。当孩子们完成后,他们将终止自己。主管将收到通知并减少计数器
    • 当计数器达到 0 时,主管将向 A 发送一条消息,说 ShutdownTerminated 并自行终止。

    你的代码会变成这样:

    class Supervisor  extends Actor with ActorLogging {
    
      var shutdownInitiator:ActorRef = _
      var numberOfActorsToWatch = 0L
    
      override def receive: Receive = {
        case InitShutdown =>
          this.numberOfActorsToWatch = context.children.length
          context.children.foreach(context.watch(_))
          context.children.foreach { s => s ! TerminateSomehow } 
          shutdownInitiator = sender
        case Terminated(terminatedActor) if this.numberOfActorsToWatch > 0 =>
          log.info("Following actor {} is terminated. Remaining alives actors is {}", terminatedActor.path.toStringWithoutAddress, this.numberOfActorsToWatch)
          this.numberOfActorsToWatch -= 1
    
        case Terminated(lastTerminatedActor) if this.numberOfActorsToWatch == 0 =>
          log.info("Following actor {} is terminated. All actors has been terminated",lastTerminatedActor.path.toStringWithoutAddress, this.numberOfActorsToWatch)
          // what I can do here ?
          shutdownInitiator ! Done
          context.stop(self)
    
      }
    }
    

    在您的关闭挂钩上,您需要对主管的引用并使用询问模式:

    Await.result(supervisor ? InitShutdown, Duration.Inf)
    actorSystem.terminate()
    

    【讨论】:

    • 我用一些代码更新了我的帖子,但我仍然不知道如何阻止我所有演员的终止。
    • 您只需要在处理 MonitorsActor 时获取对发送者的引用
    猜你喜欢
    • 1970-01-01
    • 2016-06-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-08-20
    • 2019-06-11
    相关资源
    最近更新 更多