【问题标题】:Akka and concurrent Actor executionAkka 和并发 Actor 执行
【发布时间】:2013-07-25 16:18:58
【问题描述】:

我有一个演员(称为 Worker)向其他 3 个演员(称为 Filter1、Filter2、Filter3)发送相同的消息

每个过滤器都有一个随机时间来解决此操作。然后,在 Worker actor 中,我使用 ask 模式并等待未来的成功:

class Worker2 extends Actor with ActorLogging {

  val filter1 = context.actorOf(Props[Filter1], "filter1")
  val filter2 = context.actorOf(Props[Filter2], "filter2")
  val filter3 = context.actorOf(Props[Filter3], "filter3")

  implicit val timeout = Timeout(100.seconds)

  def receive = {
    case Work(t) =>

      val futureF3 = (filter3 ? Work(false)).mapTo[Response]
      val futureF2 = (filter2 ? Work(true)).mapTo[Response]
      val futureF1 = (filter1 ? Work(true)).mapTo[Response]

      val aggResult: Future[Boolean] =
        for {
          f3 <- futureF3
          f2 <- futureF2
          f1 <- futureF1
        } yield f1.reponse && f2.reponse && f3.reponse

      if (Await.result(aggResult, timeout.duration)) {
        log.info("Response: true")
        sender ! Response(true)
      } else {
        log.info("Response: false")
        sender ! Response(false)
      }
  }
}

如果任何过滤器参与者返回 false,那么我不需要其他答案。例如,如果我并行运行 3 个过滤器 Actor,如果在一种情况下,过滤器 1 响应为假,则工作已解决,我不需要过滤器 2 和过滤器 3 的答案。

在这段代码中,我总是需要等待 3 次执行来决定,这似乎没有必要。有没有办法设置短路?

【问题讨论】:

    标签: scala akka


    【解决方案1】:

    解决这个问题的方法是使用 Future.find() -- Scaladoc Here

    你可以这样解决:

    val failed = Future.find([f1,f2,f3]) { res => !res }
    Await.result(failed, timeout.duration) match {
        None => // Success
        _ => // Failed
    }
    

    Future.find() 将返回完成并匹配谓词的第一个未来。如果所有期货都已完成并且没有结果与谓词匹配,则它返回 None。

    编辑:

    更好的解决方案是防止一起阻塞,并在找到响应时使用 akka 管道功能将结果直接通过管道传递给发送者。这样你就不会使用这个actor阻塞线程:

    import akka.pattern.pipe
    
    val failed = Future.find([f1,f2,f3]) { res => !res }
    val senderRef = sender
    failed.map(res => Response(res.getOrElse(true))).pipeTo(senderRef)
    

    在 getOrElse(true) 部分,如果我们像以前一样找到未来,则结果为 false,否则返回 true。

    【讨论】:

    • +1 来自我。这是一个可靠的解决方案。请记住,尽管其他 Futures 仍会运行到完成,但您不必等待它们。
    • 太棒了!它工作得非常好。对于其他期货,我正在考虑取消任务+他们的优先收件箱。我的意思是,在以 false 解决后,我向所有参与者发送一条带有任务 ID 的取消消息。由于该消息具有最高优先级,如果该消息未被处理,我将在 Actor 状态中存储任务 ID。然后,当任务到达时,我会检查这个 ID。如果存在,则完全忽略。这样,我可以避免执行昂贵的任务
    【解决方案2】:

    如果响应为真,我认为您想要的是过滤未来。由于 for 表达式的工作方式,它会短路并且不会费心等待其余的期货完成以组装响应。它仍然会返回一个带有 MatchError 异常(根据 [1])的失败未来,您需要使用 onFailure 处理程序来处理该异常

    所以

    val aggResult = for {
      f3 <- futureF3 if (f3.response)
      f2 <- futureF2 if (f2.response)
      f1 <- futureF1 if (f1.response)
    } yield f1.reponse && f2.reponse && f3.reponse
    
    aggResult.onFailure { case MatchError => sender ! false } 
    

    [1]:https://groups.google.com/forum/#!msg/akka-user/oCBpAMRekks/X4y0QV-oOPYJ

    【讨论】:

    • 不完全是……最好在 for 的 yield 部分看到。如果其中一个是假的,那么等待其他人是没有意义的,因为答案是假的!不管怎样,你的代码比我的要好得多!:)
    • 公平的'nuf。 Bryan 的解决方案要好得多。
    猜你喜欢
    • 2013-10-26
    • 2020-07-03
    • 2023-04-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-06-23
    相关资源
    最近更新 更多