【问题标题】:Spark RDD: multiple reducebykey or just onceSpark RDD:多个reducebykey或仅一次
【发布时间】:2016-10-08 04:37:57
【问题描述】:

我有如下代码:

// make a rd according to an id
def makeRDD(id:Int, data:RDD[(VertexId, Double)]):RDD[(Long, Double)] = { ... }  
val data:RDD[(VertexId, Double)] = ... // loading from hdfs
val idList = (1 to 100)
val rst1 = idList.map(id => makeRDD(id, data)).reduce(_ union _).reduceByKey(_+_)
val rst2 = idList.map(id => makeRDD(id, data)).reduce((l,r) => (l union r).reduceByKey(_+_))

rst1 和 rst2 得到样本结果。我认为 rst1 需要更多内存(100 次),但只有一个 reduceByKey 转换;但是,rst2 需要更少的内存,但需要更多的 reduceByKey 转换(99 次)。那么,这是一场时间和空间的权衡游戏吗?

我的问题是:我上面的分析是否正确,或者 Spark 对待在内部以相同的方式翻译动作?

P.S.: rst1 union all sub rdd 然后 reduceByKey,其中 reduceByKey 是 outside reduce。 rst2 reduceByKey 一个一个,reduceByKey 里面 reduce。

【问题讨论】:

  • 我不确定我是否理解您的问题。 rst1 和 rst2 有相同的代码,但是一个使用占位符来表示 reduce 而另一个没有。
  • rst1 union all sub rdd 然后 reduceByKey,其中 reduceByKey 是 outside reduce。 rst2 reduceByKey 一一对应,其中reduceByKey在里面 reduce。
  • 哦,抱歉,我还以为 rst2 中的 reduceByKey 也在外面。
  • 你对reduceByKey的数量是正确的,至于内存,我个人认为他们会使用相同的数量,但在做了一个非常简单的测试后发现第二个不仅仅是速度较慢,但​​也会占用更多内存。可能与它必须做的洗牌次数有关。
  • 其实两者都相当低效,虽然多少取决于配置。

标签: scala performance apache-spark rdd


【解决方案1】:

长话短说,这两种解决方案的效率都相对较低,但第二种解决方案比第一种更差。

让我们从回答最后一个问题开始。对于低级 RDD API,只有两种类型的全局自动优化(相反):

  • 使用显式或隐式缓存的任务结果而不是重新计算完整的沿袭
  • 将不需要洗牌的多个转换组合成一个ShuffleMapStage

其他一切几乎都是定义 DAG 的顺序转换。这与更严格的高级 Dataset (DataFrame) API 形成对比,后者对转换做出特定假设并执行执行计划的全局优化。

关于您的代码。第一个解决方案的最大问题是当您应用迭代 union 时,血统会不断增长。它使一些事情,比如故障恢复变得昂贵,并且由于 RDD 是递归定义的,可能会因StackOverflow 异常而失败。一个不太严重的副作用是越来越多的分区似乎在随后的减少中没有得到补偿*。你会在我对Stackoverflow due to long RDD Lineage 的回答中找到更详细的解释,但你真正需要的是一个像这样的union

sc.union(idList.map(id => makeRDD(id, data))).reduceByKey(_+_)

假设您应用真正的归约函数,这实际上是一个最佳解决方案。

第二种解决方案显然存在同样的问题,但它变得更糟。虽然第一种方法只需要两个阶段和一次洗牌,但这需要对每个RDD 洗牌。由于分区数量在增长,并且您使用默认的HashPartitioner,因此每条数据都必须多次写入磁盘,并且很可能会在网络上多次洗牌。忽略低级计算,每条记录都会被洗牌 O(N) 次,其中 N 是您合并的 RDD 的数量。

关于内存使用情况,如果不了解更多有关数据分布的信息,这并不明显,但在最坏的情况下,第二种方法可能会表现出明显更差的行为。

如果+ 使用常量空间,则减少的唯一要求是存储映射端组合结果的哈希映射。由于分区是作为数据流处理的,而不会将完整的内容读入内存,这意味着每个任务的总内存大小将与唯一键的数量成正比,而不是与数据量成正比。由于第二种方法需要更多任务,因此总体内存使用量将高于第一种情况。平均而言,由于数据被部分组织,它可能会稍微好一些,但它不太可能补偿额外的成本。


* 如果您想了解它如何影响整体性能,您可以查看Spark iteration time increasing exponentially when using join 这是一个略有不同的问题,但应该让您了解控制分区数量的重要性。

【讨论】:

  • sc.union 并没有明显提高性能。然而,受一般优化策略的启发:reduce lineage 和 shullfe,我存储了中间结果,小心使用了 persistcheckpoint,并删除了最后一个 reduceByKey。性能有了很大的提高。谢谢@zero323。另外,感谢 eliasah 提供的示例。
  • @eliasah 你能分享这些例子吗?
  • 我正在尝试加载小型数据集,然后执行 reduceByKey,然后保留 reduceRdd。然后使用持久化的 RDD 重复加载/减少。
猜你喜欢
  • 1970-01-01
  • 2020-09-04
  • 2016-06-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-05-17
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多