【问题标题】:How to check the number of partitions of a Spark DataFrame without incurring the cost of .rdd如何在不产生 .rdd 成本的情况下检查 Spark DataFrame 的分区数
【发布时间】:2026-02-08 21:40:02
【问题描述】:

关于如何获得RDD 和/或DataFrame 的分区数有很多问题:答案总是:

 rdd.getNumPartitions

 df.rdd.getNumPartitions

不幸的是,这是对DataFrame昂贵操作,因为

 df.rdd

需要将DataFrame 转换为rdd。这是按运行时间排序的

 df.count

我正在编写 可选 repartition's 或 coalesce'sa DataFrame 的逻辑 - 基于 当前 分区数是否在一个范围内可接受的值或低于或高于它们。

  def repartition(inDf: DataFrame, minPartitions: Option[Int],
       maxPartitions: Option[Int]): DataFrame = {
    val inputPartitions= inDf.rdd.getNumPartitions  // EXPENSIVE!
    val outDf = minPartitions.flatMap{ minp =>
      if (inputPartitions < minp) {
        info(s"Repartition the input from $inputPartitions to $minp partitions..")
        Option(inDf.repartition(minp))
      } else {
        None
      }
    }.getOrElse( maxPartitions.map{ maxp =>
      if (inputPartitions > maxp) {
        info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
        inDf.coalesce(maxp)
      } else inDf
    }.getOrElse(inDf))
    outDf
  }

但我们不能以这种方式为每个 DataFrame 支付rdd.getNumPartitions 的费用。

是否没有任何方法可以获取此信息 - 例如从在线/临时catalog 查询registered 表可能吗?

更新 Spark GUI 显示 DataFrame.rdd 操作与作业中最长的 sql 一样长。我将重新运行该作业并在此处附上屏幕截图。

以下只是一个测试用例:它使用的是生产中数据大小的一小部分。最长的 sql 只有五分钟 - 而这个也将花费这么多时间(请注意,sql没有帮助这里:它还必须随后执行,从而有效地使累积执行时间加倍)。

我们可以看到DataFrameUtils 第 30 行的.rdd 操作(如上面的 sn-p 所示)需要 5.1 分钟 - 而 save 操作仍然 需要 5.2 分钟-IE就后续save 的执行时间而言,我们确实没有通过执行.rdd 节省任何时间。

【问题讨论】:

标签: scala apache-spark partition


【解决方案1】:

rdd.getNumPartitions 中的rdd 组件没有内在成本,因为返回的RDD 永远不会被评估。

虽然您可以通过使用调试器(我将把它作为练习留给读者)轻松地凭经验确定这一点,或者确定在基本案例场景中没有触发任何作业

Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val ds = spark.read.text("README.md")
ds: org.apache.spark.sql.DataFrame = [value: string]

scala> ds.rdd.getNumPartitions
res0: Int = 1

scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
res1: Boolean = true

这可能不足以说服您。所以让我们以更系统的方式来解决这个问题:

  • rdd 返回一个MapPartitionRDDds 如上所述):

    scala> ds.rdd.getClass
    res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD
    
  • RDD.getNumPartitionsinvokes RDD.partitions.

  • 在非检查点场景RDD.partitions invokes getPartitions(也可以随意跟踪检查点路径)。
  • RDD.getPartitionsis abstract.
  • 所以本例中使用的实际实现是MapPartitionsRDD.getPartitions,即delegates the call to the parent
  • rdd 和源之间只有MapPartitionsRDD

    scala> ds.rdd.toDebugString
    res3: String =
    (1) MapPartitionsRDD[3] at rdd at <console>:26 []
     |  MapPartitionsRDD[2] at rdd at <console>:26 []
     |  MapPartitionsRDD[1] at rdd at <console>:26 []
     |  FileScanRDD[0] at rdd at <console>:26 []
    

    同样,如果Dataset 包含一个交换,我们将跟随父母到最近的洗牌:

    scala> ds.orderBy("value").rdd.toDebugString
    res4: String =
    (67) MapPartitionsRDD[13] at rdd at <console>:26 []
     |   MapPartitionsRDD[12] at rdd at <console>:26 []
     |   MapPartitionsRDD[11] at rdd at <console>:26 []
     |   ShuffledRowRDD[10] at rdd at <console>:26 []
     +-(1) MapPartitionsRDD[9] at rdd at <console>:26 []
        |  MapPartitionsRDD[5] at rdd at <console>:26 []
        |  FileScanRDD[4] at rdd at <console>:26 []
    

    请注意,这个案例特别有趣,因为我们实际上触发了一个作业:

    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
    res5: Boolean = false
    
    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
    res6: Array[Int] = Array(0)
    

    这是因为我们遇到了无法静态确定分区的情况(请参阅Number of dataframe partitions after sorting?Why does sortBy transformation trigger a Spark job?)。

    在这种情况下getNumPartitions 也会触发作业:

    scala> ds.orderBy("value").rdd.getNumPartitions
    res7: Int = 67
    
    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)  // Note new job id
    res8: Array[Int] = Array(1, 0)
    

    但这并不意味着观察到的成本与.rdd 调用有某种关系。相反,在没有静态公式的情况下(例如,某些 Hadoop 输入格式,需要对数据进行全面扫描),查找 partitions 是一种内在成本。

请注意,此处提出的观点不应外推至Dataset.rdd 的其他应用程序。例如ds.rdd.count 确实会很昂贵而且很浪费。

【讨论】:

  • 删除.rdd.getNumPartitions calls – javadba 后,我的工作只用了一半以上的时间运行
  • @javadba 我相信我们之前讨论过这个 - 这不会以任何方式使这个答案无效 - 我明确指出计算分区可能很昂贵,并且在一般条件下,错误是将潜在成本归因于转换为外部类型。此外,我提出解决您提供的任何正式反例(抱歉,挥手或轶事证据不计算在内),或者完全修改(甚至撤销)我的答案,如果这样的假设反例证明它在某种程度上不正确或不完整。我真的没有其他东西可以提供。
  • 啊,好吧,答案的第二部分可能有助于移动到(病房)顶部:因为第一部分因为不承认我的观察的有效性而脱落。我会奖励这个。
【解决方案2】:

根据我的经验df.rdd.getNumPartitions 非常快,我从来没有遇到过超过一秒左右的时间。

你也可以试试

val numPartitions: Long = df
      .select(org.apache.spark.sql.functions.spark_partition_id()).distinct().count()

这将避免使用.rdd

【讨论】:

  • 我会试试这个:在任何情况下都会被认为是一种有趣的方法
  • 与 df.rdd.getNumPartitions 相比,该命令是否提高了性能?
  • @CarlosAG 我不这么认为
最近更新 更多