【问题标题】:RDD split and do aggregation on new RDDsRDD 拆分并在新的 RDD 上进行聚合
【发布时间】:2016-05-07 04:01:40
【问题描述】:

我的 RDD 为 (String,String,Int)

  1. 我想根据前两个字符串减少它
  2. 然后根据第一个字符串,我想对 (String,Int) 进行分组并对它们进行排序
  3. 排序后,我需要将它们分成小组,每组包含 n 个元素。

我已经完成了下面的代码。问题是步骤 2 中的元素数量对于单个键来说非常大 reduceByKey(x++y) 需要很多时间。

//Input
val data = Array(
  ("c1","a1",1), ("c1","b1",1), ("c2","a1",1),("c1","a2",1), ("c1","b2",1), 
  ("c2","a2",1), ("c1","a1",1), ("c1","b1",1), ("c2","a1",1))

val rdd = sc.parallelize(data)
val r1 = rdd.map(x => ((x._1, x._2), (x._3)))
val r2 = r1.reduceByKey((x, y) => x + y ).map(x => ((x._1._1), (x._1._2, x._2)))

// This is taking long time.
val r3 = r2.mapValues(x => ArrayBuffer(x)).reduceByKey((x, y) => x ++ y) 

// from the list I will be doing grouping.
val r4 = r3.map(x => (x._1 , x._2.toList.sorted.grouped(2).toList)) 

问题是“c1”有很多独特的条目,如 b1 ,b2 ....million 和 reduceByKey 正在消磨时间,因为所有值都将发送到单个节点。 有没有办法更有效地实现这一目标?

// output
 Array((c1,List(List((a1,2), (a2,1)), List((b1,2), (b2,1)))), (c2,List(List((a1,2), (a2,1)))))

【问题讨论】:

  • 您可以尝试在第一个reduceByKey之后重新分区数据,然后使用combineByKey代替map-mapValues-reduceByKey,我认为这有助于平衡工作量

标签: scala apache-spark rdd reduce


【解决方案1】:

对数据进行分组的方式至少存在一些问题。第一个问题由

引入
 mapValues(x => ArrayBuffer(x))

它创建了大量不提供额外价值的可变对象,因为您无法在随后的reduceByKey 中利用它们的可变性

reduceByKey((x, y) => x ++ y) 

每个++ 创建一个新集合,并且两个参数都不能安全地改变。由于reduceByKey 应用了地图端聚合,情况更糟,几乎会造成 GC 地狱。

有没有办法更有效地实现这一目标?

除非您对可用于定义更智能分区器的数据分布有更深入的了解,否则最简单的改进是将mapValues + reduceByKey 替换为简单的groupByKey

val r3 = r2.groupByKey

还应该可以对reduceByKey 调用和mapPartitions 使用自定义分区器,使用preservesPartitioning 而不是map

class FirsElementPartitioner(partitions: Int)
    extends org.apache.spark.Partitioner {
  def numPartitions  = partitions
  def getPartition(key: Any): Int = {
    key.asInstanceOf[(Any, Any)]._1.## % numPartitions
  }
}

val r2 = r1
  .reduceByKey(new FirsElementPartitioner(8), (x, y) => x + y)
  .mapPartitions(iter => iter.map(x => ((x._1._1), (x._1._2, x._2))), true)

// No shuffle required here.
val r3 = r2.groupByKey

它只需要一次随机播放,groupByKey 只是一个本地操作:

r3.toDebugString
// (8) MapPartitionsRDD[41] at groupByKey at <console>:37 []
//  |  MapPartitionsRDD[40] at mapPartitions at <console>:35 []
//  |  ShuffledRDD[39] at reduceByKey at <console>:34 []
//  +-(8) MapPartitionsRDD[1] at map at <console>:28 []
//     |  ParallelCollectionRDD[0] at parallelize at <console>:26 []

【讨论】:

  • 说我是否有自定义分区。在 partition 之后,单个分区可能包含大量数据。这个分区是驻留在单个节点还是多个节点中?
  • 分区始终驻留在单个节点上。除非您想使用一些解决方法,例如使用代理键的自定义 RDD,否则没有解决方法。从技术上讲,唯一的实际问题应该是单个键的大量值。如果需要,其他所有内容都可以溢出到磁盘。
  • 感谢自定义分区。我能够使用我的数据创建多个分区并有效地并行化任务
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-09-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-08-10
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多