这对我很有效:
data.repartition(n, "key").write.partitionBy("key").parquet("/location")
它在每个输出分区(目录)中生成 N 个文件,并且(传闻)比使用 coalesce 更快 和(再次传闻,在我的数据集上)比仅在输出。
如果您正在使用 S3,我还建议您在本地驱动器上执行所有操作(Spark 在写出期间执行大量文件创建/重命名/删除),一旦完成,请使用 hadoop FileUtil(或仅使用 aws cli) 复制所有内容:
import java.net.URI
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
// ...
def copy(
in : String,
out : String,
sparkSession: SparkSession
) = {
FileUtil.copy(
FileSystem.get(new URI(in), sparkSession.sparkContext.hadoopConfiguration),
new Path(in),
FileSystem.get(new URI(out), sparkSession.sparkContext.hadoopConfiguration),
new Path(out),
false,
sparkSession.sparkContext.hadoopConfiguration
)
}
编辑:根据 cmets 中的讨论:
您的数据集的分区列为 YEAR,但每个给定的 YEAR 中的数据量都大不相同。因此,一年可能有 1GB 的数据,但另一年可能有 100GB。
这是处理此问题的一种方法的伪代码:
val partitionSize = 10000 // Number of rows you want per output file.
val yearValues = df.select("YEAR").distinct
distinctGroupByValues.each((yearVal) -> {
val subDf = df.filter(s"YEAR = $yearVal")
val numPartitionsToUse = subDf.count / partitionSize
subDf.repartition(numPartitionsToUse).write(outputPath + "/year=$yearVal")
})
但是,我实际上不知道这会起作用。 Spark 可能会在每个列分区读取可变数量的文件时遇到问题。
另一种方法是编写您自己的自定义分区程序,但我不知道其中涉及什么,所以我无法提供任何代码。