【问题标题】:Processing Future Comprehension Scala处理未来理解 Scala
【发布时间】:2014-04-11 08:23:58
【问题描述】:

我有一个演员接收消息并运行两个期货。这些期货可以并行运行,所以我想我可以使用 for 理解来运行这两个期货并将它们的结果组合成一个对发件人的响应。我可以自己获取每个结果,但是当它们都完成时,我不知道该怎么做才能聚合它们。

def receive = {
    case "pcbStatus" => {
      val currentSender = sender
      //first future
      val wsf = (self ? "workhorseStats")(5 seconds)

      val psf = Future.traverse(context.children)(x => {
        (x ? "reportStatus")(5 seconds)
      });

      val combined = for {
        r1 <- wsf
        r2 <- psf
      } yield (r1, r2)



      combined.onComplete {
        case Success(result:Any) => {
          val response = new SomeCaseClass(r1,r2)
          println("YAY: " + response)
          currentSender ! response
        }
        case Failure(failure) => {
          println("FAIL: " + failure)
        }
      }
   }
}

【问题讨论】:

  • 但是你已经得到了一个 Future[(Int,Int)] (或者你的 wsf 和 psf 的任何类型),并且在 onComplete 中展开它,那么问题是什么?
  • 我想获取结果(r1 和 r2)并用它们创建响应...更新代码以反映
  • 你写new SomeCaseClass(r1, r2)的地方,r1r2都不在范围内。但是,您可以匹配 case Success((r1, r2)) =&gt; 并跳过讨厌的 Any 业务。
  • @RandallSchulz 超时了,还有其他想法吗?
  • 您是 asking self,但您的 Actor 没有任何情况可以回复 "workhorseStats"。等待时间自然会超时。

标签: scala akka future


【解决方案1】:

我编写了一个小例子来说明我认为你正在尝试做的事情。一、两个actor类:

class ParentActor extends Actor{
  import context._
  import akka.pattern.pipe
  implicit val timeout = Timeout(5 seconds)      

  override def preStart = {
    context.actorOf(Props[ChildActor], "child-a")
    context.actorOf(Props[ChildActor], "child-b")
  }    

  def receive = {
    case "foo" =>
      val fut1 = (self ? "bar").mapTo[Int]
      val fut2 = Future.traverse(context.children)(child => (child ? "baz").mapTo[Int])

      val aggFut = for{
        f1 <- fut1
        f2 <- fut2
      } yield SomeResult(f1, f2.toList)

      aggFut pipeTo sender

    case "bar" =>
      sender ! 2
  }
}

class ChildActor extends Actor{
  def receive = {
    case "baz" =>
      sender ! 1
  }
}

然后你可以用这段代码测试它:

implicit val timeout = Timeout(5 seconds)

val system = ActorSystem("foo")
val actor = system.actorOf(Props[ParentActor])
val result = actor ? "foo"

import system._
result onComplete{
  case tr => println(tr)
}

当你运行它时,它应该打印Success(SomeResult(2,List(1, 1)))

这里有几件事。首先,使用mapTo 允许知道类型,而不必处理Any。其次,pipeTo 是一个不错的选择,可以避免关闭发送者,并且还可以稍微简化代码。

【讨论】:

  • 优秀的答案。对于那些在搜索中遇到这个问题的人,我也没有从 (self ? "workhorseStats") 发回结果,当然这会导致它超时。关于不必关闭发件人的建议也很有帮助。 @cmbaxter,感谢您的彻底回应,这正是我所追求的。
【解决方案2】:

有一种简单的方法可以组合 Futures。例如(没有 akka):

  import scala.concurrent.ExecutionContext.Implicits.global

  val promiseInt = Promise[Int]
  val promiseString = Promise[String]

  val futureInt = promiseInt.future
  val futureString = promiseString.future

  case class Special(i: Int, s: String)

  futureInt.onSuccess { case(i) =>
    futureString.onSuccess { case(s) =>
      println(Special(i, s))
    }
  }

  promiseInt.success(3)
  promiseString.success("no")
  Thread.sleep(100)

两个期货完成的顺序无关紧要。您可以尝试颠倒两个成功触发器,您会得到相同的结果。

我在这里使用Promise 只是为了构建一个运行示例;它与合并 Futures 无关。

【讨论】:

    猜你喜欢
    • 2015-11-24
    • 1970-01-01
    • 1970-01-01
    • 2014-07-27
    • 1970-01-01
    • 1970-01-01
    • 2015-03-12
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多