【问题标题】:How actor should handle responses from other actors?演员应该如何处理来自其他演员的回应?
【发布时间】:2013-11-08 20:54:29
【问题描述】:

我的问题是关于处理来自其他参与者的参与者的响应并使用这些结果执行一些操作。

这是接收方法的简短伪示例:

case data: Data =>
    val data2 = actor1 ? data.fieldA
    val almostFinished = data2.flatMap { d2 => 
        val data3 = actor2 ? d2.fieldB
        data3.map { d3 => 
            actor3 ? someStuff(d3)
        }
    }
    almostFinished.map { r =>
        someFinalStuff(r)
    } pipeTo sender

这里是一些通用的业务处理逻辑。

首先,它看起来完全不可读。 其次 - 地图中的故障没有得到处理,也没有无处报告。

您能否解释一下我应该如何处理这种关于参与者和消息的逻辑?

谢谢!

【问题讨论】:

    标签: scala akka


    【解决方案1】:

    除了使用 ask 和 futures 之外的另一种方法是使用临时 actor 以及 Akka 2.2.3 中引入的 Aggregator Pattern

    【讨论】:

      【解决方案2】:

      你可以试试这个:

      case data: Data =>
        val data2Fut = (actor1 ? data.fieldA).mapTo[Data]
        val result = 
          for{
            data2 <- data2Fut
            data3 <- (actor2 ? data2.fieldB).mapTo[Data]
            data4 <- (actor3 ? someStuff(data3)).mapTo[Data]
          } yield someFinalStuff(data4)
        result pipeTo sender
      

      任何时候您发现自己将 flatMapmap 链接在一起时,您应该考虑使用这样的 for-comprehension 进行清理。

      【讨论】:

        【解决方案3】:

        至于代码不可读,一些for语法可以给我们一些提神

        case data: Data =>
          val combinedFuture: Future[Stuff] =
            for {
              data2 <- actor1 ? data.fieldA
              data3 <- actor2 ? data2.fieldB
              stuff <- actor3 ? someStuff(data3)
            } yield someFinalStuff(stuff)
        
          combinedFuture pipeTo sender
        

        实际上,您的示例似乎完全是为了展示for-comprehension 的有用性


        关于在“未来调用管道”中检测故障的问题,您可能想知道Future[A] 类型只能有两个可能的实例之一:

        Success[A]
        Failure[T <: Throwable]
        

        你可以测试类型或者更好,你应该使用onCompleteonSuccessonFailure回调方法,定义在Future类上

        正如Kuhn 博士所澄清的,使用pipeTo 管道传递一个失败的Future 将向接收者传递一个Status.Failure 消息,您可以从中获取错误作为Throwable 实例

        编辑

        正如 cmbaxter 所注意到的,? 模式将返回一个“无类型”Future,这将需要某种强制来返回正确类型的值,使用 mapTo

        正确的代码会变成

          val combinedFuture: Future[Stuff] =
            for {
              data2 <- (actor1 ? data.fieldA).mapTo[Data2]
              data3 <- (actor2 ? data2.fieldB).mapTo[Data3]
              stuff <- (actor3 ? someStuff(data3)).mapTo[Intermediate]
            } yield someFinalStuff(stuff)
        

        【讨论】:

        • pipeTo 将向给定的 ActorRef 发送一个 Status.Failure 消息,以防对 Futures 的某些操作失败。
        猜你喜欢
        • 1970-01-01
        • 2011-04-07
        • 2016-10-20
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2013-10-21
        • 1970-01-01
        • 2018-06-15
        相关资源
        最近更新 更多