【问题标题】:Scala, Using Responder to abstract a possible Asynchronous computationScala,使用 Responder 来抽象可能的异步计算
【发布时间】:2012-05-14 20:24:24
【问题描述】:

我一直在研究 scala 和 AKKA 来管理一个明显可并行化的算法。 我有一些函数式编程的知识,并且主要使用 Java,所以我的 FP 可能还不是最好的。

我正在使用的算法非常简单,有一个顶级计算:

  def computeFull(...): FullObject

此计算调用子计算,然后将其相加(以简化):

  def computePartial(...): Int

computeFull 做了这样的事情(再次简化):

 val partials = for(x <- 1 to 10
     y <- 1 to 10) yield computePartial(x, y)
 partials.foldLeft(0)(_ + _)

因此,它非常接近 AKKA 示例,即进行 PI 计算。我有很多 computeFull 可以调用,并且每个里面都有很多 computePartial。所以我可以将所有这些包装在 AKKA 演员中,或者在 Futures 中简化,在单独的线程中调用每个 computeFull 和每个 computePartial。然后我可以使用http://doc.akka.io/docs/akka/snapshot/scala/futures.html的fold、zip和map函数来组合future。

但是,这意味着 computeFull 和 computePartial 必须返回包装实际结果的 Futures。因此,他们变得依赖于 AKKA,并假设事情是并行运行的。事实上,我还必须在我的函数中隐式传递执行上下文。

我认为这很奇怪,算法“不应该”知道它是如何并行化的细节,或者如果它是。

在阅读了 scala 中的 Futures(而不是 AKKA 的)并查看了代码延续之后。 scala (http://www.scala-lang.org/api/current/scala/Responder.html) 提供的 Responder monad 似乎是抽象函数调用如何运行的正确方法。 我有这种模糊的直觉,computeFull 和 computePartial 可以返回 Responders 而不是期货,并且当执行 monad 时,它决定嵌入在 Responder 中的代码如何执行(如果它产生一个新的 actor 或者它是否在同一个线程上执行)。

但是,我不确定如何得到这个结果。有什么建议?你觉得我走对了吗?

【问题讨论】:

    标签: scala monads akka future


    【解决方案1】:

    如果您不想依赖 Akka(但请注意,Akka 风格的期货将被移动并包含在 Scala 2.10 中)并且您的计算是对集合的简单折叠,您可以简单地使用 Scala 的并行集合:

    val partials = for { x <- (1 to 10).par
      y <- 1 to 10
    } yield computePartial(x, y)
    // blocks until everything is computed
    partials.foldLeft(0)(_ + _)
    

    当然,这会阻塞直到partials 准备好,所以当你真正需要期货时,它可能不是一个合适的情况。

    使用 Scala 2.10 风格的期货,您可以使其完全异步,而您的算法不会注意到它:

    def computePartial(x: Int, y: Int) = {
      Thread.sleep((1000 * math.random).toInt)
      println (x + ", " + y)
      x * y
    }
    
    import scala.concurrent.future
    import scala.concurrent.Future
    val partials: IndexedSeq[Future[Int]] = for {
      x <- 1 to 10
      y <- 1 to 10
    } yield future(computePartial(x, y))
    
    val futureResult: Future[Int] = Future.sequence(partials).map(_.fold(0)(_ + _))
    
    def useResult(result: Int) = println(s"The sum is $result")
    
    // now I want to use the result of my computation
    futureResult map { result => // called when ready
      useResult(result)
    }
    // still no blocking
    println("This is probably printed before the sum is calculated.")
    

    所以,computePartial 不需要知道它是如何执行的。 (尽管它不应该有任何副作用,尽管在本示例中,包含了 println 副作用。)

    一个可能的computeFull 方法应该管理算法,因此了解Futures 或并行性。毕竟这算法的一部分。

    (至于Responder:Scala 的旧期货使用它,所以我不知道这是怎么回事。- 执行上下文不正是您正在寻找的配置方式吗?)

    【讨论】:

    • 听起来不错,我会留意官方的2.10。但是,我可能不清楚,我想知道的不是如何独立于 AKKA,而是如何让 computePartial/computeFull 忽略 Futures。有点像你会使用 Reader monad 来进行依赖注入,但我们不是传递一个 DB 连接,而是传递一个执行函数的策略:并行或不并行,或者完全不同的东西。这样就可以在不更改这两种方法的代码的情况下比较不同的并行化选项。
    【解决方案2】:

    akka 中的单个演员不知道他是否并行运行。这就是akka的设计方式。但是,如果您不想依赖 akka,您可以使用并行集合,例如:

    for (i <- (0 until numberOfPartialComputations).par) yield (
      computePartial(i)
    ).sum
    

    求和在并行集合上调用并以并行方式执行。

    【讨论】:

      猜你喜欢
      • 2016-01-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-02-17
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多