【发布时间】:2017-10-22 02:32:52
【问题描述】:
我在 RDD 中有 (Int, (Int, Int, Int) 类型的元素。目的是将具有相同键的元素限制为某个阈值t。更简单的解决方案如下:
rdd.groupByKey().flatMapValues{iterable => {
iterable.take(t)
}}
我认为将这段代码替换为 combineByKey 会很有用,以便通过组合器利用地图端聚合,因为可能有超过 t 单个分区中的相同键,导致以下结果:
val function_createCombiner = (x: (Int, Int, Int)) => {
ArrayBuffer[(Int, Int, Int)](x)
}
val function_mergeValue = (accumulator: ArrayBuffer[(Int, Int, Int)],
x: (Int, Int, Int)) => {
if(accumulator.size < t){
accumulator += x
}
accumulator
}
val function_mergeCombiners = (accumulator1: ArrayBuffer[(Int, Int, Int)],
accumulator2: ArrayBuffer[(Int, Int, Int)]) => {
val iter = accumulator2.iterator
var saturated = false
while(!saturated && iter.hasNext){
if(accumulator1.length < t){
accumulator1 += iter.next()
} else {
saturated = true
}
}
accumulator1
}
rdd
.combineByKey(function_createCombiner, function_mergeValue, function_mergeCombiners)
.flatMapValues(x => x.toList)
令人惊讶的是,combineByKey 解决方案的性能比 groupByKey 解决方案差。 combineByKey 解决方案的 GC 工作时间为 50%,所以我认为我创建了许多临时缓冲区。另一方面,网上到处都在说,应该尽量避免 groupByKey。
CombineByKey 时间:11 分钟
GroupByKey 时间:4.1 分钟
我的 combineByKey 解决方案是否存在一些可怕的缺陷?还是我错过了什么?
提前致谢!
编辑:这个问题实际上是重复的,我很抱歉。这是因为只有非常少量的元素出现超过 t 次。因此很明显,我(几乎)尝试通过 combineByKey 重新实现 groupByKey。唯一的选择是我使用 groupByKey,这似乎更快,或者如果可能的话完全省略该步骤。 无论如何,感谢您的帮助!
【问题讨论】:
-
您是否考虑过查看 RDD 中的分区数量并比较这两种方法之间为完成整个阶段而生成的任务数量?
-
@AndrewMo 两种方法的分区数完全相同
-
我不同意重复的标签。 OP 确实在限制为 t 个元素时确实会减少输出...
-
您可以尝试减轻 GC 负载的一件事是将 ArrayBuffers 的大小调整为
t。 (默认的arrayBuffer大小是16,所以如果t < 16,你赢了,因为每个新缓冲区会更小,你会分配更少的内存,如果t > 16,它可能会更好,因为你不必调整大小(例如,重新分配!)缓冲区初始容量溢出时)。 -
这很奇怪,我同意 combineByKey 应该快得多。你运行了多少次测试?你能确认 combineByKey 和 groupByKey 的结果是一样的吗(每个键的值相同(显然不能保证每个键都具有相同的值))?作为测试,您可以在使用 groupByKey/combineByKey 之前对数据进行分区并确认 combineByKey 更快(它应该基于您的代码)吗?
标签: scala apache-spark