【问题标题】:Parallel iterator in ScalaScala中的并行迭代器
【发布时间】:2013-06-15 05:13:53
【问题描述】:

是否有可能,使用 Scala 的并行集合来并行化 Iterator 事先完全评估它?

这里我说的是在Iterator 上并行化函数转换,即mapflatMap。 我认为这需要提前评估Iterator 的一些元素,然后计算更多,一旦通过next 消耗了一些元素。

我能找到的所有内容都需要将迭代器最多转换为 IterableStream。然后,当我调用 .par 时,Stream 会得到完全评估。

如果这不是现成的,我也欢迎实施建议。实现应该支持并行mapflatMap

【问题讨论】:

  • 答案是可能没有,但你能多说一点你想从中得到什么吗?特别是,计算应该什么时候开始运行——在你创建迭代器之后,或者一旦你调用了强制评估的东西?
  • @RexKerr 似乎是一种设计选择;但是让它在第一个请求时开始会使第一个请求有点特别。我目前正在尝试实现类似的东西,我选择立即开始运行并存储下一个n 结果。一旦消耗了一个,我就会计算一个替换。

标签: scala parallel-processing scala-collections


【解决方案1】:

我意识到这是一个老问题,但是 iterata 库中的 ParIterator 实现是否符合您的要求?

scala> import com.timgroup.iterata.ParIterator.Implicits._
scala> val it = (1 to 100000).toIterator.par().map(n => (n + 1, Thread.currentThread.getId))
scala> it.map(_._2).toSet.size
res2: Int = 8 // addition was distributed over 8 threads

【讨论】:

  • 它解决了这个问题。不过,它可能会更有效一些,因为如果您在一个块中的操作运行时有很大的变化,您会遇到很多阻塞。
  • 如何提高效率@ziggystar?
  • ParIteratorIterator 分割成块。因此,如果您有小块(例如大小 2)并且一个元素需要 1 秒而另一个需要 10 秒,那么您的并行化就会很糟糕。一旦工作人员空闲,不同的实现可以从迭代器中为工作人员提供新元素。
  • @ziggystar ParIterator in iterata 将这种考虑推迟到标准库并行集合。那么在单个块中,scala 并行集合的行为方式是这样的吗?
  • 我不确定你明白我的意思。即使你在一个块中做了最好的事情,分块也会产生没有并行化的障碍。这意味着您无法获得最大的 CPU 利用率。另一个缺点是更高的内存要求。为了并行化块,Scala 需要强制它们,这导致整个块同时在内存中(假设迭代器创建对象)。从理论上讲,您只需要在内存中拥有当前正在处理的元素。大块 -> 好标准/坏内存和小块 -> 坏标准/好内存。
【解决方案2】:

使用标准库的最佳选择可能不是使用并行集合,而是concurrent.Future.traverse

import concurrent._
import ExecutionContext.Implicits.global
Future.traverse(Iterator(1,2,3))(i => Future{ i*i })

虽然我认为这会尽快开始执行整个事情。

【讨论】:

    【解决方案3】:

    从 ML,并行遍历迭代器元素:

    https://groups.google.com/d/msg/scala-user/q2NVdE6MAGE/KnutOq3iT3IJ

    出于类似的原因,我离开了Future.traverse。对于我的用例,保持 N 个作业正常工作,我最终编写了代码来限制从作业队列中提供执行上下文。

    我的第一次尝试涉及阻塞馈线线程,但这也有可能阻塞想要在执行上下文中生成任务的任务。你知道吗,阻塞是邪恶的。

    【讨论】:

    • 你能评论一下为什么你使用(NUM_CPUs + 1)^2作为阻塞队列的大小吗?
    • 我也发现了一个困难的方法 1. 我不擅长并发编程 2. flatMap 更难。
    • @ziggystar “你”是指 ML 上的“Juha”。我不认为这是一个神奇的数字:足够大,因此消费者不会领先于原始迭代器(可能会执行 i/o,也许)加上映射函数(CPU 绑定,他说,但长或短跑步?)。我看到未来的队列将被阻塞而不调用blocking;也许 +1 是从“所需的并行性”中遗留下来的。我的解决方案是在管道结束时检查更多工作,即工作要做的最后一件事是检查是否有足够的工作正在进行,如果没有,就喂给野兽。我同意这很难,简单是关键。
    • 这似乎工作正常,API 比Future.traverse 容易得多。我将它与iterator.grouped 结合起来,以便将元素分块在一起,我认为这会减少开销。
    【解决方案4】:

    要准确地追随你所追求的有点困难,但也许是这样的:

    val f = (x: Int) => x + 1
    val s = (0 to 9).toStream map f splitAt(6) match { 
      case (left, right) => left.par; right 
    }
    

    这将在前 6 个元素上并行计算 f,然后在其余元素上返回一个流。

    【讨论】:

    • 这似乎不是并行运行的 - 您不需要将 map f 移动到 par 之后吗?
    猜你喜欢
    • 2014-07-08
    • 2013-01-21
    • 1970-01-01
    • 1970-01-01
    • 2021-04-21
    • 2012-09-20
    • 1970-01-01
    • 2012-03-08
    • 2013-04-27
    相关资源
    最近更新 更多