【发布时间】:2026-02-08 21:40:02
【问题描述】:
关于如何获得RDD 和/或DataFrame 的分区数有很多问题:答案总是:
rdd.getNumPartitions
或
df.rdd.getNumPartitions
不幸的是,这是对DataFrame 的昂贵操作,因为
df.rdd
需要将DataFrame 转换为rdd。这是按运行时间排序的
df.count
我正在编写 可选 repartition's 或 coalesce'sa DataFrame 的逻辑 - 基于 当前 分区数是否在一个范围内可接受的值或低于或高于它们。
def repartition(inDf: DataFrame, minPartitions: Option[Int],
maxPartitions: Option[Int]): DataFrame = {
val inputPartitions= inDf.rdd.getNumPartitions // EXPENSIVE!
val outDf = minPartitions.flatMap{ minp =>
if (inputPartitions < minp) {
info(s"Repartition the input from $inputPartitions to $minp partitions..")
Option(inDf.repartition(minp))
} else {
None
}
}.getOrElse( maxPartitions.map{ maxp =>
if (inputPartitions > maxp) {
info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
inDf.coalesce(maxp)
} else inDf
}.getOrElse(inDf))
outDf
}
但我们不能以这种方式为每个 DataFrame 支付rdd.getNumPartitions 的费用。
是否没有任何方法可以获取此信息 - 例如从在线/临时catalog 查询registered 表可能吗?
更新 Spark GUI 显示 DataFrame.rdd 操作与作业中最长的 sql 一样长。我将重新运行该作业并在此处附上屏幕截图。
以下只是一个测试用例:它使用的是生产中数据大小的一小部分。最长的 sql 只有五分钟 - 而这个也将花费这么多时间(请注意,sql没有帮助这里:它还必须随后执行,从而有效地使累积执行时间加倍)。
我们可以看到DataFrameUtils 第 30 行的.rdd 操作(如上面的 sn-p 所示)需要 5.1 分钟 - 而 save 操作仍然 需要 5.2 分钟-IE就后续save 的执行时间而言,我们确实没有通过执行.rdd 节省任何时间。
【问题讨论】:
-
这里有一个类似的问题*.com/questions/54269477/…
标签: scala apache-spark partition