【问题标题】:Scala Future for comprehension: sequential vs parallelScala 未来的理解:顺序与并行
【发布时间】:2019-04-07 23:04:45
【问题描述】:

这里我们有SeqPar 对象,它包含一个task 例程,它是一个简单的模拟Future,它打印出一些调试信息并返回Future[Int] 类型。

问题是:为什么experiment1允许并行运行,而experiment2总是顺序运行?

object SeqPar {
  def experiment1: Int = {
    val f1 = task(1)
    val f2 = task(2)
    val f3 = task(3)

    val computation = for {
      r1 <- f1
      r2 <- f2
      r3 <- f3
    } yield (r1 + r2 + r3)

    Await.result(computation, Duration.Inf)
  }

  def experiment2: Int = {
    val computation = for {
      r1 <- task(1)
      r2 <- task(2)
      r3 <- task(3)
    } yield (r1 + r2 + r3)

    Await.result(computation, Duration.Inf)
  }

  def task(i: Int): Future[Int] = {
    Future {
      println(s"task=$i thread=${Thread.currentThread().getId} time=${System.currentTimeMillis()}")
      i * i
    }
  }
}

当我运行 experiment1 时,它会打印出来:

task=3 thread=24 time=1541326607613
task=1 thread=22 time=1541326607613
task=2 thread=21 time=1541326607613

experiment2

task=1 thread=21 time=1541326610653
task=2 thread=20 time=1541326610653
task=3 thread=21 time=1541326610654

观察到差异的原因是什么?我确实知道for 理解像f1.flatMap(r1 =&gt; f2.flatMap(r2 =&gt; f3.map(r3 =&gt; r1 + r2 + r3))) 一样脱糖,但我仍然错过了为什么一个允许并行运行而另一个不允许并行运行的一点。

【问题讨论】:

标签: scala concurrency parallel-processing future


【解决方案1】:

这是Future(…)flatMap 所做的效果:

  • val future = Future(task) 开始并行运行任务
  • future.flatMap(result =&gt; task) 安排在 future 完成时运行 task

请注意,future.flatMap(result =&gt; task)future 完成之前不能开始并行运行任务,因为要运行task,我们需要result,它仅在future 完成时可用。

现在让我们看看你的example1

def experiment1: Int = {
  // construct three independent tasks and start running them
  val f1 = task(1)
  val f2 = task(2)
  val f3 = task(3)

  // construct one complicated task that is ...
  val computation =
    // ... waiting for f1 and then ...
    f1.flatMap(r1 =>
      // ... waiting for f2 and then ...
      f2.flatMap(r2 =>
        // ... waiting for f3 and then ...
        f3.map(r3 =>
          // ... adding some numbers.
          r1 + r2 + r3)))

  // now actually trigger all the waiting
  Await.result(computation, Duration.Inf)
}

所以在example1中,由于所有三个任务都占用相同的时间并且同时启动,我们可能只需要在等待f1时阻塞。当我们等待f2时,它的结果应该已经在那里了。

现在example2 有何不同?

def experiment2: Int = {
  // construct one complicated task that is ...
  val computation =
    // ... starting task1 and then waiting for it and then ...
    task(1).flatMap(r1 =>
      // ... starting task2 and then waiting for it and then ...
      task(2).flatMap(r2 =>
        // ... starting task3 and then waiting for it and then ...
        task(3).map(r3 =>
          // ... adding some numbers.
          r1 + r2 + r3)))

  // now actually trigger all the waiting and the starting of tasks
  Await.result(computation, Duration.Inf)
}

在这个例子中,我们甚至没有在等待task(1) 完成之前构造task(2),所以任务不能并行运行。

因此,当使用 Scala 的 Future 进行编程时,您必须通过在 example1 之类的代码和 example2 之类的代码之间进行正确选择来控制并发性。或者您可以查看提供更明确的并发控制的库。

【讨论】:

  • 谢谢@Toxaris,如此优质的答案,现在一切都清楚了!
【解决方案2】:

这是因为 Scala Futures 是严格的。 Future 内部的操作在 Future 创建后立即执行,然后它会记住它的值。所以你正在失去参考透明度。在您的情况下,您的期货在您的第一个 task 调用中执行,结果被记忆。它们不会在 for 中再次执行。在第二种情况下,futures 在你的理解中创建,结果是正确的。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-03-12
    相关资源
    最近更新 更多