【问题标题】:How to execute multiple tasks parallellly?如何并行执行多个任务?
【发布时间】:2016-08-02 05:18:13
【问题描述】:

我参加了Parallel Programming的课程,它显示了并行接口:

def parallel[A, B](taskA: => A, taskB: => B): (A, B) = {
  val ta = taskA
  val tb = task {taskB}
  (ta, tb.join())
}

以下是错误的:

def parallel[A, B](taskA: => A, taskB: => B): (A, B) = {
  val ta = taskB
  val tb = task {taskB}.join()
  (ta, tb)
}

https://gist.github.com/ChenZhongPu/fe389d30626626294306264a148bd2aa查看更多界面

它还向我们展示了执行四个任务的正确方法:

def parallel[A, B, C, D](taskA: => A, taskB: => B, taskC: => C, taskD: => D): (A, B, C, D) = {
    val ta = task { taskA }
    val tb = task { taskB }
    val tc = task { taskC }
    val td = taskD
    (ta.join(), tb.join(), tc.join(), td)
}

我的问题:如果我不知道提前完成的任务数量(任务列表),如何正确调用每个任务的join

tasks.map(_.join()) // wrong

编辑

类似的讨论也发生在Discuss this week's module: Parallel Programming

【问题讨论】:

  • 注意:map 返回一个新集合,其中每个元素都由函数转换。也许您需要转换而不创建新集合?
  • @jwvh 怎么办?
  • 既然你不打算返回任何东西,那么就使用foreach怎么样?
  • @sebszyller 使用foreach 也将按顺序执行这些任务,而不是并行执行。
  • @chenzhongpu 我的意思是加入他们而不是实际执行。

标签: scala concurrency parallel-processing


【解决方案1】:

使用来自Parallel Programming课程的framework

你可以像这样实现这个方法:

def parallel[A](tasks: (() => A)*): Seq[A] = {
  if (tasks.isEmpty) Nil
  else {
    val pendingTasks = tasks.tail.map(t => task { t() })
    tasks.head() +: pendingTasks.map(_.join())
  }
}

(注意你can't have variable number of by-name arguments - 虽然这是can change

然后像这样使用它:

object ParallelUsage {
  def main(args: Array[String]) {
    val start = System.currentTimeMillis()

    // Use a list of tasks:
    val tasks = List(longTask _, longTask _, longTask _, longTask _)
    val results = parallel(tasks: _*)
    println(results)

    // or pass any number of individual tasks directly:
    println(parallel(longTask, longTask, longTask))
    println(parallel(longTask, longTask))
    println(parallel(longTask))
    println(parallel())

    println(s"Done in ${ System.currentTimeMillis() - start } ms")
  }

  def longTask() = {
    println("starting longTask execution")
    Thread.sleep(1000)
    42 + Math.random
  }
}

使用Scala's parallel collections

你不能比这更简单:

val tasks = Vector(longTask _, longTask _, longTask _)
val results = tasks.par.map(_()).seq

【讨论】:

    【解决方案2】:

    四处寻找构建parallel() 的实用方法,我发现它可以从Future 构建。任何使用现代 Javascript Promises 的人都会觉得这个范例很熟悉:

    import scala.concurrent.{Await,Future}
    import scala.concurrent.duration.Duration
    import scala.concurrent.ExecutionContext.Implicits.global
    
    def parallel[A, B](taskA: =>A, taskB: =>B): (A,B) = {
      val fB:Future[B] = Future { taskB }
      val a:A = taskA
      val b:B = Await.result(fB, Duration.Inf)
      (a,b)
    }
    

    这将 taskB 分离到它自己的线程并在主线程中执行 taskA。我们做taskA 并等待,如果有必要,永远等待fB 完成。请注意,我没有使用此设置测试异常,它可能会停止或行为不端。

    【讨论】:

      【解决方案3】:

      受到Future.sequence 的启发并有点作弊。你需要一个 Task 实现,它也是一个 Monad 来使这个设计工作。

        /** Transforms a `TraversableOnce[Task[A]]` into a `Task[TraversableOnce[A]]`.
         *  Useful for reducing many `Task`s into a single `Task`.
         */
        def parallel[
          A,
          M[X] <: TraversableOnce[X]
        ](in: M[Task[A]])(
          implicit cbf: CanBuildFrom[M[Task[A]], A, M[A]],
          executor: ExecutionContext
        ): Task[M[A]] = {
          in.foldLeft(Task.point(cbf(in))) {
            (fr, fa) => for (r <- fr; a <- fa) yield (r += a)
          }.map(_.result())(executor)
        }
      

      这可以对大多数 Scala 集合并行执行操作,唯一的条件是 Task 定义了 mapflatMap,无论实现是什么,因为您可以使用 @987654327 抽象特定的集合类型@ 构造,这是 Scala 库的内部结构。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-04-08
        • 2018-06-08
        • 2017-02-28
        相关资源
        最近更新 更多