【问题标题】:how to watch multiple akka actors for termination如何观看多个akka演员终止
【发布时间】:2018-12-15 12:50:17
【问题描述】:

我有一个 akka 系统,它基本上是两个生产者参与者,它们向一个消费者参与者发送消息。在一个简化的形式中,我有这样的东西:

class ProducerA extends Actor {
    def receive = {
        case Produce => Consumer ! generateMessageA()
    }

    ... more code ...
}

class ProducerB extends Actor {
    def receive = {
        case Produce => Consumer ! generateMessageB()
    }

    ... more code ...
}

class Consumer extends Actor {
    def receive = {
        case A => handleMessageA(A)
        case B => handleMessageB(B)
    }

    ... more code ...
}

他们都是同一个akka系统的兄弟姐妹。

我试图弄清楚如何优雅地终止这个系统。这意味着在关闭时,我希望 ProducerAProducerB 立即停止,然后我希望 Consumer 完成处理消息队列中剩余的所有消息,然后关闭。

似乎我想要的是Consumer 演员能够看到ProducerAProducerB 的终止。或者一般来说,我想要的是能够在两个生产者都停止后向Consumer 发送PoisonPill 消息。

https://alvinalexander.com/scala/how-to-monitor-akka-actor-death-with-watch-method

上面的教程很好地解释了一个actor如何监视另一个actor的终止,但不确定一个actor如何监视多个actor的终止。

【问题讨论】:

    标签: scala akka actor


    【解决方案1】:
    import akka.actor._
    import akka.util.Timeout
    import scala.concurrent.duration.DurationInt
    
    class Producer extends Actor {
      def receive = {
        case _ => println("Producer received a message")
      }
    }
    
    case object KillConsumer
    
    class Consumer extends Actor {
    
      def receive = {
        case KillConsumer =>
          println("Stopping Consumer After All Producers")
          context.stop(self)
        case _ => println("Parent received a message")
      }
    
      override def postStop(): Unit = {
        println("Post Stop Consumer")
        super.postStop()
      }
    }
    
    class ProducerWatchers(producerListRef: List[ActorRef], consumerRef: ActorRef) extends Actor {
      producerListRef.foreach(x => context.watch(x))
      context.watch(consumerRef)
      var producerActorCount = producerListRef.length
      implicit val timeout: Timeout = Timeout(5 seconds)
      override def receive: Receive = {
        case Terminated(x) if producerActorCount == 1 && producerListRef.contains(x) =>
          consumerRef ! KillConsumer
    
        case Terminated(x) if producerListRef.contains(x) =>
          producerActorCount -= 1
    
        case Terminated(`consumerRef`) =>
          println("Killing ProducerWatchers On Consumer End")
          context.stop(self)
    
        case _ => println("Dropping Message")
      }
    
      override def postStop(): Unit = {
        println("Post Stop ProducerWatchers")
        super.postStop()
      }
    }
    
    object ProducerWatchers {
      def apply(producerListRef: List[ActorRef], consumerRef: ActorRef) : Props = Props(new ProducerWatchers(producerListRef, consumerRef))
    }
    
    object AkkaActorKill {
      def main(args: Array[String]): Unit = {
        val actorSystem = ActorSystem("AkkaActorKill")
        implicit val timeout: Timeout = Timeout(10 seconds)
    
        val consumerRef = actorSystem.actorOf(Props[Consumer], "Consumer")
        val producer1 = actorSystem.actorOf(Props[Producer], name = "Producer1")
        val producer2 = actorSystem.actorOf(Props[Producer], name = "Producer2")
        val producer3 = actorSystem.actorOf(Props[Producer], name = "Producer3")
    
        val producerWatchers = actorSystem.actorOf(ProducerWatchers(List[ActorRef](producer1, producer2, producer3), consumerRef),"ProducerWatchers")
    
        producer1 ! PoisonPill
        producer2 ! PoisonPill
        producer3 ! PoisonPill
    
        Thread.sleep(5000)
        actorSystem.terminate
      }
    }
    

    可以使用 ProducerWatchers actor 来实现,它管理被杀死的生产者,一旦所有的生产者都被杀死,您可以杀死 Consumer actor,然后杀死 ProducerWatchers actor。

    【讨论】:

    • 呵呵——有没有什么方法可以等待基于期货的毒丸起作用,而不是仅仅等待任意时间?
    • 所以你的意思是要等一个制作人在下一个之前死掉?
    • 好吧,如果毒丸通过系统工作并让所有演员正常死亡所需的时间超过睡眠时间怎么办?
    • 除非生产者相互依赖,否则我同意你的观点。但是直到所有的生产者都死了,消费者实际上可能不会死。
    【解决方案2】:

    一个演员可以通过多次调用context.watch来观察多个演员,每次调用都传入一个不同的ActorRef。例如,您的 Consumer 演员可以通过以下方式观看 Producer 演员的终止:

    case class WatchMe(ref: ActorRef)
    
    class Consumer extends Actor {
      var watched = Set[ActorRef]()
    
      def receive = {
        case WatchMe(ref) =>
          context.watch(ref)
          watched = watched + ref
        case Terminated(ref) =>
          watched = watched - ref
          if (watched.isEmpty) self ! PoisonPill
        // case ...
      }
    }
    

    Producer 的两个参与者都会将他们各自的引用发送到 Consumer,然后它将监视 Producer 参与者的终止。当Producer 演员都被终止时,Consumer 向自己发送一个PoisonPill。因为PoisonPill is treated like a normal message in an actor's mailboxConsumer 将在处理PoisonPill 并自行关闭之前处理已经入队的所有消息。

    Derek Wyatt's "Shutdown Patterns in Akka 2" blog post 中描述了类似的模式,Akka 文档中提到了这一点。

    【讨论】:

      【解决方案3】:

      所以我最终采用的解决方案受到Derek Wyatt's terminator pattern的启发

      val shutdownFut = Future.sequence(
        Seq(
          gracefulStop(producerA, ProducerShutdownWaitSeconds seconds),
          gracefulStop(producerB, ProducerShutdownWaitSeconds seconds),
        )
      ).flatMap(_ => gracefulStop(consumer, ConsumerShutdownWaitSeconds seconds))
      
      Await.result(shutdownFut, (ProducerShutdownWaitSeconds seconds) + (ConsumerShutdownWaitSeconds seconds))
      

      这或多或少正是我想要的。消费者关闭等待生产者根据期货的履行关闭。此外,整个关闭本身会导致您可以等待的未来,因此能够使线程保持足够长的时间以使所有内容都正确清理。

      【讨论】:

        猜你喜欢
        • 2014-07-11
        • 2018-04-20
        • 1970-01-01
        • 1970-01-01
        • 2017-05-29
        • 2019-01-01
        • 2014-12-02
        • 1970-01-01
        • 2015-05-06
        相关资源
        最近更新 更多