【问题标题】:Parallel collection processing of data larger than memory size并行收集处理大于内存大小的数据
【发布时间】:2013-06-30 19:40:20
【问题描述】:

有没有一种简单的方法来使用 scala 并行集合而不将完整集合加载到内存中?

例如,我有一个大型集合,我想仅在一个适合内存的小块上并行执行特定操作(折叠),而不是在另一个块上等等,最后重新组合所有结果块。

我知道,可以使用演员,但使用 par-collections 真的很好。

我已经写了一个解决方案,但它并不好:

  def split[A](list: Iterable[A], chunkSize: Int): Iterable[Iterable[A]] = {
    new Iterator[Iterable[A]] {
      var rest = list
      def hasNext = !rest.isEmpty
      def next = {
        val chunk = rest.take(chunkSize)
        rest = rest.drop(chunkSize)
        chunk
      }
    }.toIterable
  }                                               

  def foldPar[A](acc: A)(list: Iterable[A], chunkSize: Int, combine: ((A, A) => A)): A = {
    val chunks: Iterable[Iterable[A]] = split(list, chunkSize)
    def combineChunk: ((A,Iterable[A]) => A) = { case (res, entries) => entries.par.fold(res)(combine) }
    chunks.foldLeft(acc)(combineChunk)
  }                                               

  val chunkSize = 10000000                        
    val x = 1 to chunkSize*10                 

    def sum: ((Int,Int) => Int) = {case (acc,n) => acc + n }

    foldPar(0)(x,chunkSize,sum)

【问题讨论】:

  • 我会说这里正确的计算模型将是 map reduce(因此它可能是 Spark),而不是演员本身。
  • 正式 - 是的,但在这种情况下处理时间并不合理,因此完全可以在单台机器上运行。

标签: scala parallel-processing parallel-collections


【解决方案1】:

您的想法非常简洁,可惜目前还没有这样的功能(AFAIK)。

我只是将您的想法改写为更短的代码。首先,我觉得对于并行折叠,使用monoid 的概念很有用——它是一个具有关联操作和零元素的结构。关联性很重要,因为我们不知道合并并行计算的结果的顺序。零元素很重要,因此我们可以将计算分成块并从零开始折叠每个块。不过,这并没有什么新意,这正是 fold Scala 的集合所期望的。

// The function defined by Monoid's apply must be associative
// and zero its identity element.
trait Monoid[A]
  extends Function2[A,A,A]
{
  val zero: A
}

接下来,Scala 的Iterators 已经有了一个有用的方法grouped(Int): GroupedIterator[Seq[A]],它将迭代器分割成固定大小的序列。它与您的split 非常相似。这允许我们将输入切割成固定大小的块,然后对它们应用 Scala 的并行收集方法:

def parFold[A](c: Iterator[A], blockSize: Int)(implicit monoid: Monoid[A]): A =
  c.grouped(blockSize).map(_.par.fold(monoid.zero)(monoid))
                      .fold(monoid.zero)(monoid);

我们使用并行集合框架折叠每个块,然后(不进行任何并行化)组合中间结果。

一个例子:

// Example:
object SumMonoid extends Monoid[Long] {
  override val zero: Long = 0;
  override def apply(x: Long, y: Long) = x + y;
}
val it = Iterator.range(1, 10000001).map(_.toLong)
println(parFold(it, 100000)(SumMonoid));

【讨论】:

  • 幺半群的好用法,以前从来不知道。关于分组方法,我怀疑它是否可以将整个内容加载到内存中,但事实证明它不能。
  • 稍后我会测试您的解决方案,但它似乎应该可以工作并且更加简洁。非常感谢!
  • @MikhailGolubtsov 请让我知道您的测试进展如何,我也很好奇。我自己只做了一些非常基本的测试。
  • 所以我已经运行了一个大集合的处理并且没有超出堆空间。所以它确实有效。但在我的任务中,我注意到只有 some 并行性,并且迭代器中的条目在单个线程中进行预处理。所以它可以进一步改进..
  • @MikhailGolubtsov 是的,我知道这一点。首先,在每个内部并行折叠结束时,如果块的一部分需要比其他部分更长的时间来计算,则可能会发生未完全使用核心的情况。其次,如果在主迭代器上调用 next 需要一些可测量的时间,则不会并行化。
猜你喜欢
  • 1970-01-01
  • 2017-12-11
  • 2016-07-16
  • 2012-10-24
  • 2013-02-05
  • 1970-01-01
  • 2016-06-27
  • 1970-01-01
  • 2011-02-04
相关资源
最近更新 更多