【问题标题】:partition ordering aggregateByKey Spark分区排序 aggregateByKey Spark
【发布时间】:2016-12-01 20:36:45
【问题描述】:

所以如果我之前有过改造:

myRDD = someRDD.map()

mySecondRDD = myRDD.aggregateByKey(initValue)(CombOp , MergeOp)

此时 myRDD 没有分区器,但 mySecondRDD 有一个 hashPartitioner。 首先我想问:

1)我必须在 myRDD 中指定一个分区器吗?如果我这样做,怎么可能将它作为参数传递给 aggregateByKey?

*请注意,myRDD 是一种转换,没有分区器

2) myRDD 不应该在这两个命令的末尾有与 mySecondRDD 相同的分区器,而不是没有?

3) 这 2 个命令将执行多少次随机播放?

4) 如果我在 myRDD 中使用 partitionBy 指定一个分区器,并设法将它作为参数传递给 aggregateByKey,我是否会将 shuffle 减少到 1 而不是 2?

很抱歉,我还是不太明白它是如何工作的。

【问题讨论】:

    标签: scala apache-spark partitioning


    【解决方案1】:

    我会尽力回答你的问题:

    1. 您不必显式分配分区程序。在您提供的代码中,Spark 会自动分配。如果 RDD 没有分区器,则使用默认的 HashPartitioner。查看here 了解更多详情。要指定您自己的分区器,您应该使用另一个版本的aggregateByKey(),它接受一个分区器和初始值。它看起来像myRdd.aggregateByKey(initialValue, partitioner)(CombOp, MergeOp)

    2. 如果myRDD 已经有一个分区器并且您没有在aggregateByKey() 中明确指定新的分区器,您的mySecondRDD 将使用来自myRDD 的分区器。

    3. 您将只有 1 次随机播放操作,因为 map() 转换不会触发它。相反,aggregateByKey() 将需要在一台机器上找到具有相同键的记录。

    4. 即使您将代码保持原样,您也只能进行一次随机播放。

    【讨论】:

    • 感谢您的回答,但我对此仍有一些疑问:1)您说:您将只有 1 次随机播放操作,因为 map() 转换不会触发它。是的,但是 AggregateByKey 将触发“动作”,必须先计算 map() !因为 AggragateByKey 踩在这个分区上。是的,但是 map() 无论如何都会进行洗牌(它不保留密钥)。所以这是 map() 的 1 次 shuffle,AggregateByKey 的 1 次,对吧?
    • @Spartan 我不确定我是否正确理解了您的评论。你能提供来自 Spark UI 的阶段的 DAG 吗?它会告诉我们你有多少次洗牌。我仍然相信map() 是一对一的转换,不需要洗牌。您可以找到哪些转换导致洗牌here
    • Okolnychychyi 好吧,我想了解 Spark 的工作原理。根据我的阅读, (key,Value) RDD 中的 map() 不会保留键,也不会保留分区!所以这意味着它会重新分区数据,对吧?但重新分区意味着洗牌!所以 map() 是 1 shuffle 和 aggregateByKey 另一个?你也可以在这里查看我的新问题:(stackoverflow.com/questions/41074276/…)。此外,我试图理解为什么在 RDD 中的 map() 或 mapValues() 之后有 partition = none。谢谢
    猜你喜欢
    • 1970-01-01
    • 2015-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-04-13
    • 1970-01-01
    • 2016-05-18
    相关资源
    最近更新 更多