【发布时间】:2016-10-01 20:31:41
【问题描述】:
我有 5 个洗牌键值 rdds,一个大的(1,000,000 条记录)和 4 个相对较小的(100,000 条记录)。所有 rdds 都使用相同数量的分区,我有两种策略来合并 5 个,
- 将 5 个 rdds 合并在一起
- 将 4 个小 rdds 合并到一起,然后加入 bigone
我认为策略 2 会更有效,因为它不会重新洗牌。但实验结果表明策略1更有效。代码和输出如下:
代码
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
object MergeStrategy extends App {
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
val conf = new SparkConf().setMaster("local[4]").setAppName("test")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val bigRddSize = 1e6.toInt
val smallRddSize = 1e5.toInt
println(bigRddSize)
val bigRdd = sc.parallelize((0 until bigRddSize)
.map(x => (scala.util.Random.nextInt, 0))).repartition(100).cache
bigRdd.take(10).foreach(println)
val smallRddList = (0 until 4).map(i => {
val rst = sc.parallelize((0 until smallRddSize)
.map(x => (scala.util.Random.nextInt, 0))).repartition(100).cache
println(rst.count)
rst
}).toArray
// strategy 1
{
val begin = System.currentTimeMillis
val s1Rst = sc.union(Array(bigRdd) ++ smallRddList).distinct(100)
println(s1Rst.count)
val end = System.currentTimeMillis
val timeCost = (end - begin) / 1000d
println("S1 time count: %.1f s".format(timeCost))
}
// strategy 2
{
val begin = System.currentTimeMillis
val smallMerged = sc.union(smallRddList).distinct(100).cache
println(smallMerged.count)
val s2Rst = bigRdd.fullOuterJoin(smallMerged).flatMap({ case (key, (left, right)) => {
if (left.isDefined && right.isDefined) Array((key, left.get), (key, right.get)).distinct
else if (left.isDefined) Array((key, left.get))
else if (right.isDefined) Array((key, right.get))
else throw new Exception("Cannot happen")
}
})
println(s2Rst.count)
val end = System.currentTimeMillis
val timeCost = (end - begin) / 1000d
println("S2 time count: %.1f s".format(timeCost))
}
}
输出
1000000
(688282474,0)
(-255073127,0)
(872746474,0)
(-792516900,0)
(417252803,0)
(-1514224305,0)
(1586932811,0)
(1400718248,0)
(939155130,0)
(1475156418,0)
100000
100000
100000
100000
1399777
S1 time count: 39.7 s
399984
1399894
S2 time count: 49.8 s
我对 shuffled rdd 的理解是错误的?有人可以给一些建议吗? 谢谢!
【问题讨论】:
-
在加入策略 2 时,您没有加入策略 1(仅联合)。这是为什么呢?请记住,联合不需要洗牌数据——它可以将每个执行器上存在的 RDD 拼接起来。更具体地说,Union 只创建一个狭义的依赖关系,而 join 创建一个 shuffle 依赖关系。因此,策略 1 和 2 似乎是苹果和橙子。
-
@SachinTyagi 我的目标是区分 5 rdds,策略 1 和 2 最后都不同。 distinct 将打乱数据。由于 big rdd 已经洗牌了,所以策略 2 不会洗牌 big 的,应该更有效率,但实验结果正好相反。
-
不太清楚我是否理解,但无论何时加入,都会引入随机依赖关系,因此最终会(重新)随机播放数据。无论您的 rdd 是否更早地洗牌。这与您所看到的一致。
标签: apache-spark rdd