【问题标题】:GroupByKey faster than CombineByKey [duplicate]GroupByKey 比 CombineByKey 快 [重复]
【发布时间】:2017-10-22 02:32:52
【问题描述】:

我在 RDD 中有 (Int, (Int, Int, Int) 类型的元素。目的是将具有相同键的元素限制为某个阈值t。更简单的解决方案如下:

rdd.groupByKey().flatMapValues{iterable => {
  iterable.take(t)
}}

我认为将这段代码替换为 combineByKey 会很有用,以便通过组合器利用地图端聚合,因为可能有超过 t 单个分区中的相同键,导致以下结果:

val function_createCombiner = (x: (Int, Int, Int)) => {
   ArrayBuffer[(Int, Int, Int)](x)
 }
val function_mergeValue = (accumulator: ArrayBuffer[(Int, Int, Int)],
                           x: (Int, Int, Int)) => {
  if(accumulator.size < t){
    accumulator += x
  }
  accumulator
}
val function_mergeCombiners = (accumulator1: ArrayBuffer[(Int, Int, Int)],
                               accumulator2: ArrayBuffer[(Int, Int, Int)]) => {
  val iter = accumulator2.iterator
  var saturated = false
  while(!saturated && iter.hasNext){
    if(accumulator1.length < t){
      accumulator1 += iter.next()
    } else {
      saturated = true
    }
  }
  accumulator1
}

rdd
.combineByKey(function_createCombiner, function_mergeValue, function_mergeCombiners)
.flatMapValues(x => x.toList)

令人惊讶的是,combineByKey 解决方案的性能比 groupByKey 解决方案差。 combineByKey 解决方案的 GC 工作时间为 50%,所以我认为我创建了许多临时缓冲区。另一方面,网上到处都在说,应该尽量避免 groupByKey。

CombineByKey 时间:11 分钟

GroupByKey 时间:4.1 分钟

我的 combineByKey 解决方案是否存在一些可怕的缺陷?还是我错过了什么?

提前致谢!

编辑:这个问题实际上是重复的,我很抱歉。这是因为只有非常少量的元素出现超过 t 次。因此很明显,我(几乎)尝试通过 combineByKey 重新实现 groupByKey。唯一的选择是我使用 groupByKey,这似乎更快,或者如果可能的话完全省略该步骤。 无论如何,感谢您的帮助!

【问题讨论】:

  • 您是否考虑过查看 RDD 中的分区数量并比较这两种方法之间为完成整个阶段而生成的任务数量?
  • @AndrewMo 两种方法的分区数完全相同
  • 我不同意重复的标签。 OP 确实在限制为 t 个元素时确实会减少输出...
  • 您可以尝试减轻 GC 负载的一件事是将 ArrayBuffers 的大小调整为 t。 (默认的arrayBuffer大小是16,所以如果t &lt; 16,你赢了,因为每个新缓冲区会更小,你会分配更少的内存,如果t &gt; 16,它可能会更好,因为你不必调整大小(例如,重新分配!)缓冲区初始容量溢出时)。
  • 这很奇怪,我同意 combineByKey 应该快得多。你运行了多少次测试?你能确认 combineByKey 和 groupByKey 的结果是一样的吗(每个键的值相同(显然不能保证每个键都具有相同的值))?作为测试,您可以在使用 groupByKey/combineByKey 之前对数据进行分区并确认 combineByKey 更快(它应该基于您的代码)吗?

标签: scala apache-spark


【解决方案1】:

就个人而言,与rdds合作时,我会选择reduceByKey:)

rdd
 .mapValues(List(_))
 .reduceByKey((v1, v2) => (v1++v2).take(t))
 .flatMapValues(identity(_))

我发现它比combineByKey 容易得多,而且它通常groupByKey 更有效,因为它在重新排列数据之前确实映射了边减少。我说通常是因为在某些情况下(例如,当您收集每个键的所有值时)groupByKey 的性能与reduceByKey 一样好。

【讨论】:

  • 但是使用此解决方案,您可以为每个元素创建一个列表,而使用 combineByKey 解决方案(也进行地图端聚合),您只需为每个分区的每个元素创建一个列表(我希望这是可以理解的)。此外,您将 v1 和 v2 连接起来,然后对其进行修剪,这在性能方面是否等同于我的解决方案?
  • 我测试了你的解决方案,它比 groupByKey 方法花费的时间要长得多,每个任务大约 45% 的 GC 时间。
  • 我觉得这很奇怪。你能分享一些关于你的数据集的信息吗?你的rdd 有多大(有多少行),大致有多少值具有相同的键,你的 t 设置为多少?
  • 另外,您使用的是哪个版本的 Spark?
  • 我使用的是 Spark 2.1.0。我不太确定有多少元素共享同一个键,我会找出来的。我使用 t=254。由于数据量减少了相当多,我会说在我的情况下有很多键的值超过 t 。同样,我会提供准确的数字。
猜你喜欢
  • 1970-01-01
  • 2017-09-07
  • 2013-09-20
  • 1970-01-01
  • 1970-01-01
  • 2016-01-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多