【问题标题】:What is the most efficient way to iterate a collection in parallel in Scala (TOP N pattern)在Scala中并行迭代集合的最有效方法是什么(TOP N模式)
【发布时间】:2016-11-04 15:26:05
【问题描述】:

我是 Scala 新手,想构建一个实时应用程序来匹配某些人。对于给定的 Person,我想获得匹配分数最高的 TOP 50 人。

成语如下:

val persons = new mutable.HashSet[Person]() // Collection of people
/* Feed omitted */
val personsPar = persons.par // Make it parall
val person = ... // The given person

res = personsPar
        .filter(...) // Some filters
        .map{p => (p,computeMatchingScoreAsFloat(person, p))}
        .toList
        .sortBy(-_._2)
        .take(50)
        .map(t => t._1 + "=" + t._2).mkString("\n")

在上面的示例代码中,使用了 HashSet,但它可以是任何类型的集合,因为我很确定它不是最优的

问题在于,persons 包含超过 5M 的元素,computeMatchingScoreAsFloat 方法计算一种具有 200 个浮点数的 2 个向量的相关值。在我的 6 核计算机上,这个计算大约需要 2 秒。

我的问题是,在 Scala 中执行这种 TOPN 模式的最快方法是什么?

子问题: - 我应该使用什么集合(或其他东西?)的实现? - 我应该使用期货吗?

注意:它必须并行计算,仅计算computeMatchingScoreAsFloat(没有排名/TOP N)的纯计算需要超过一秒,如果我是多线程的,则

编辑:感谢 Guillaume,计算时间从 2 秒减少到 700 毫秒

def top[B](n:Int,t: Traversable[B])(implicit ord: Ordering[B]):collection.mutable.PriorityQueue[B] = {

  val starter = collection.mutable.PriorityQueue[B]()(ord.reverse) // Need to reverse for us to capture the lowest (of the max) or the greatest (of the min)

  t.foldLeft(starter)(
    (myQueue,a) => {
      if( myQueue.length <= n ){ myQueue.enqueue(a);myQueue}
      else if( ord.compare(a,myQueue.head) < 0  ) myQueue
      else{
        myQueue.dequeue
        myQueue.enqueue(a)
        myQueue
      }
    }
  )
}

谢谢

【问题讨论】:

  • 您对此进行了基准测试吗? computeMatchingScoreAsFloat 是最昂贵的部分吗?它是否足够重,值得并行化?
  • 是的,它必须并行计算,computeMatchingScoreAsFloat(没有排名/TOP N)的纯计算需要超过一秒,如果在我的计算机上使用多线程则需要 40 毫秒

标签: scala collections parallel-processing


【解决方案1】:

我会提出一些改变:

1- 我相信过滤器和映射步骤需要遍历集合两次。拥有一个惰性集合会将其减少到一个。要么有一个惰性集合(如 Stream),要么将其转换为一个,例如列表:

myList.view

2- 排序步骤需要对所有元素进行排序。相反,您可以使用存储前 N 条记录的累加器执行 FoldLeft。有关实现的示例,请参见那里: Simplest way to get the top n elements of a Scala Iterable 。如果你想要最大的性能(真的落入它的驾驶室),我可能会测试一个优先队列而不是一个列表。例如,这样的事情:

  def IntStream(n:Int):Stream[(Int,Int)] = if(n == 0) Stream.empty else (util.Random.nextInt,util.Random.nextInt) #:: IntStream(n-1)

  def top[B](n:Int,t: Traversable[B])(implicit ord: Ordering[B]):collection.mutable.PriorityQueue[B] = {

    val starter = collection.mutable.PriorityQueue[B]()(ord.reverse) // Need to reverse for us to capture the lowest (of the max) or the greatest (of the min)

    t.foldLeft(starter)(
      (myQueue,a) => {
        if( myQueue.length <= n ){ myQueue.enqueue(a);myQueue}
        else if( ord.compare(a,myQueue.head) < 0  ) myQueue
        else{
          myQueue.dequeue
          myQueue.enqueue(a)
          myQueue
        }
      }
    )
  }

def diff(t2:(Int,Int)) =  t2._2
 top(10,IntStream(10000))(Ordering.by(diff)) // select top 10 

我真的认为您的问题需要 SINGLE 集合遍历,以便您能够将运行时间降至 1 秒以下

祝你好运!

【讨论】:

  • 感谢您的帮助,我将对此进行测试并告诉您新的计算时间
  • 看来我无法查看并行集合(并行和惰性的集合),我可以处理 ParSeq xor SeqView。你知道我怎么能做到这一点吗? Scala 中有 ParSeqView 吗?
  • 你试过在非并行集合上运行它吗?我认为它有并行集合有很大的开销,可能不合理(另外,你有一个“toList”,我怀疑它合并到一个非并行集合中)。否则,您可以运行 flatMap 来组合过滤器和映射(请参阅此处 stackoverflow.com/questions/32234132/…),这实际上与惰性集合的结果相同
  • 感谢您的帮助。我已经更新了我原来的帖子。是的,我已经单独测试了 computeMatchingScoreAsFloat 计算,它在非标准集合上花费 > 1 秒,在标准集合上花费
  • 2s -> 700 ms 与您的代码。还有其他建议吗?我将尝试手动将我的数据拆分为桶,并在没有任何收集方法的情况下运行多线程计算。
猜你喜欢
  • 1970-01-01
  • 2017-11-21
  • 1970-01-01
  • 2017-04-05
  • 1970-01-01
  • 2019-08-31
  • 2011-05-27
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多