【问题标题】:Spark DataFrame repartition : number of partition not preservedSpark DataFrame重新分区:未保留的分区数
【发布时间】:2017-01-25 15:01:36
【问题描述】:

根据 Spark 1.6.3 的文档,repartition(partitionExprs: Column*) 应该保留结果数据帧中的分区数:

返回由给定分区划分的新 DataFrame 保留现有分区数的表达式

(取自https://spark.apache.org/docs/1.6.3/api/scala/index.html#org.apache.spark.sql.DataFrame

但以下示例似乎显示了其他内容(请注意,在我的情况下 spark-master 是 local[4]):

val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local[4]"))
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._

val myDF = sc.parallelize(Seq(1,1,2,2,3,3)).toDF("x")
myDF.rdd.getNumPartitions // 4 
myDF.repartition($"x").rdd.getNumPartitions //  200 !

这怎么解释?我将 Spark 1.6.3 用作独立应用程序(即在 IntelliJ IDEA 中本地运行)

编辑:这个问题没有解决来自Dropping empty DataFrame partitions in Apache Spark 的问题(即如何在不产生空分区的情况下沿列重新分区),但为什么文档说的内容与我在示例中观察到的不同

【问题讨论】:

标签: scala apache-spark


【解决方案1】:

这与在 Spark 中启用的 Tungsten project 相关。它使用硬件优化并调用哈希分区来触发 shuffle 操作。默认情况下,spark.sql.shuffle.partitions 设置为 200。您可以在重新分区之前和之后调用数据帧上的说明来验证:

myDF.explain

val repartitionedDF = myDF.repartition($"x")

repartitionedDF.explain

【讨论】:

  • 在随机播放中也使用散列,分区的数量将根据映射器和减速器任务的数量而增加。
猜你喜欢
  • 2019-03-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-01-15
  • 1970-01-01
  • 2016-03-06
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多