【问题标题】:spark: How to merge shuffled rdd efficiently?spark:如何有效地合并 shuffled rdd?
【发布时间】:2016-10-01 20:31:41
【问题描述】:

我有 5 个洗牌键值 rdds,一个大的(1,000,000 条记录)和 4 个相对较小的(100,000 条记录)。所有 rdds 都使用相同数量的分区,我有两种策略来合并 5 个,

  1. 将 5 个 rdds 合并在一起
  2. 将 4 个小 rdds 合并到一起,然后加入 bigone

我认为策略 2 会更有效,因为它不会重新洗牌。但实验结果表明策略1更有效。代码和输出如下:

代码

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}


object MergeStrategy extends App {

    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("akka").setLevel(Level.ERROR)

    val conf = new SparkConf().setMaster("local[4]").setAppName("test")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    val bigRddSize = 1e6.toInt
    val smallRddSize = 1e5.toInt
    println(bigRddSize)

    val bigRdd = sc.parallelize((0 until bigRddSize)
        .map(x => (scala.util.Random.nextInt, 0))).repartition(100).cache
    bigRdd.take(10).foreach(println)

    val smallRddList = (0 until 4).map(i => {
        val rst = sc.parallelize((0 until smallRddSize)
            .map(x => (scala.util.Random.nextInt, 0))).repartition(100).cache
        println(rst.count)
        rst
    }).toArray

    // strategy 1
    {
        val begin = System.currentTimeMillis

        val s1Rst = sc.union(Array(bigRdd) ++ smallRddList).distinct(100)
        println(s1Rst.count)

        val end = System.currentTimeMillis
        val timeCost = (end - begin) / 1000d
        println("S1 time count: %.1f s".format(timeCost))
    }

    // strategy 2
    {
        val begin = System.currentTimeMillis

        val smallMerged = sc.union(smallRddList).distinct(100).cache
        println(smallMerged.count)

        val s2Rst = bigRdd.fullOuterJoin(smallMerged).flatMap({ case (key, (left, right)) => {
            if (left.isDefined && right.isDefined) Array((key, left.get), (key, right.get)).distinct
            else if (left.isDefined) Array((key, left.get))
            else if (right.isDefined) Array((key, right.get))
            else throw new Exception("Cannot happen")
        }
        })
        println(s2Rst.count)

        val end = System.currentTimeMillis
        val timeCost = (end - begin) / 1000d
        println("S2 time count: %.1f s".format(timeCost))
    }

}

输出

1000000
(688282474,0)
(-255073127,0)
(872746474,0)
(-792516900,0)
(417252803,0)
(-1514224305,0)
(1586932811,0)
(1400718248,0)
(939155130,0)
(1475156418,0)
100000
100000
100000
100000
1399777
S1 time count: 39.7 s
399984
1399894
S2 time count: 49.8 s

我对 shuffled rdd 的理解是错误的?有人可以给一些建议吗? 谢谢!

【问题讨论】:

  • 在加入策略 2 时,您没有加入策略 1(仅联合)。这是为什么呢?请记住,联合不需要洗牌数据——它可以将每个执行器上存在的 RDD 拼接起来。更具体地说,Union 只创建一个狭义的依赖关系,而 join 创建一个 shuffle 依赖关系。因此,策略 1 和 2 似乎是苹果和橙子。
  • @SachinTyagi 我的目标是区分 5 rdds,策略 1 和 2 最后都不同。 distinct 将打乱数据。由于 big rdd 已经洗牌了,所以策略 2 不会洗牌 big 的,应该更有效率,但实验结果正好相反。
  • 不太清楚我是否理解,但无论何时加入,都会引入随机依赖关系,因此最终会(重新)随机播放数据。无论您的 rdd 是否更早地洗牌。这与您所看到的一致。

标签: apache-spark rdd


【解决方案1】:

我找到了一种更高效的合并rdd的方法,看下面2个合并策略:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.{HashPartitioner, SparkContext, SparkConf}
import scala.collection.mutable.ArrayBuffer

object MergeStrategy extends App {

    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("akka").setLevel(Level.ERROR)

    val conf = new SparkConf().setMaster("local[4]").setAppName("test")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    val rddCount = 20
    val mergeCount = 5
    val dataSize = 20000
    val parts = 50

    // generate data
    scala.util.Random.setSeed(943343)
    val testData = for (i <- 0 until rddCount)
        yield sc.parallelize(scala.util.Random.shuffle((0 until dataSize).toList).map(x => (x, 0)))
            .partitionBy(new HashPartitioner(parts))
            .cache
    testData.foreach(x => println(x.count))

    // strategy 1: merge directly
    {
        val buff = ArrayBuffer[RDD[(Int, Int)]]()
        val begin = System.currentTimeMillis
        for (i <- 0 until rddCount) {
            buff += testData(i)
            if ((buff.size >= mergeCount || i == rddCount - 1) && buff.size > 1) {
                val merged = sc.union(buff).distinct
                    .partitionBy(new HashPartitioner(parts)).cache
                println(merged.count)

                buff.foreach(_.unpersist(false))
                buff.clear
                buff += merged
            }
        }
        val end = System.currentTimeMillis
        val timeCost = (end - begin) / 1000d
        println("Strategy 1 Time Cost: %.1f".format(timeCost))
        assert(buff.size == 1)

        println("Strategy 1 Complete, with merged Count %s".format(buff(0).count))
    }


    // strategy 2: merge directly without repartition
    {
        val buff = ArrayBuffer[RDD[(Int, Int)]]()
        val begin = System.currentTimeMillis
        for (i <- 0 until rddCount) {
            buff += testData(i)
            if ((buff.size >= mergeCount || i == rddCount - 1) && buff.size > 1) {
                val merged = sc.union(buff).distinct(parts).cache
                println(merged.count)

                buff.foreach(_.unpersist(false))
                buff.clear
                buff += merged
            }
        }
        val end = System.currentTimeMillis
        val timeCost = (end - begin) / 1000d
        println("Strategy 2 Time Cost: %.1f".format(timeCost))
        assert(buff.size == 1)

        println("Strategy 2 Complete, with merged Count %s".format(buff(0).count))
    }

}

结果表明策略1(时间成本20.8秒)比策略2(时间成本34.3秒)效率更高。我的电脑是窗口 8,CPU 4 核 2.0GHz,8GB 内存。

唯一的区别是策略由HashPartitioner分区,而策略2不是。结果,策略1产生了ShuffledRDD,但是策略1产生了MapPartitionsRDD。我认为 RDD.distinct 函数处理 ShuflledRDD 比 MapPartitionsRDD 更有效。

【讨论】:

    猜你喜欢
    • 2023-03-18
    • 1970-01-01
    • 1970-01-01
    • 2016-03-22
    • 2016-09-16
    • 2015-07-05
    • 1970-01-01
    • 1970-01-01
    • 2015-10-18
    相关资源
    最近更新 更多