这很慢的原因是 Spark 非常擅长对存储在一个大数据帧中的大量数据进行并行计算。但是,它在处理大量数据帧方面非常糟糕。它将使用它的所有执行器(即使它们不是全部需要)开始计算,并等待它完成,然后再开始下一个。这会导致许多不活动的处理器。这很糟糕,但这不是 spark 的设计目的。
我有一个窍门给你。可能需要对其进行一些改进,但你会有这个想法。这就是我要做的。从路径列表中,我将提取 parquet 文件的所有模式并创建一个新的大模式来收集所有列。然后,我会要求 spark 使用此模式读取所有 parquet 文件(不存在的列将自动设置为 null)。然后我将合并所有数据帧并对这个大数据帧执行转换,最后使用partitionBy 将数据帧存储在单独的文件中,同时仍然并行执行所有这些操作。它看起来像这样。
// let create two sample datasets with one column in common (id)
// and two different columns x != y
val d1 = spark.range(3).withColumn("x", 'id * 10)
d1.show
+---+----+
| id| x |
+---+----+
| 0| 0|
| 1| 10|
| 2| 20|
+---+----+
val d2 = spark.range(2).withColumn("y", 'id cast "string")
d2.show
+---+---+
| id| y|
+---+---+
| 0| 0|
| 1| 1|
+---+---+
// And I store them
d1.write.parquet("hdfs:///tmp/d1.parquet")
d2.write.parquet("hdfs:///tmp/d2.parquet")
// Now let's create the big schema
val paths = Seq("hdfs:///tmp/d1.parquet", "hdfs:///tmp/d2.parquet")
val fields = paths
.flatMap(path => spark.read.parquet(path).schema.fields)
.toSet //removing duplicates
.toArray
val big_schema = StructType(fields)
// and let's use it
val dfs = paths.map{ path =>
spark.read
.schema(big_schema)
.parquet(path)
.withColumn("path", lit(path.split("/").last))
}
// The we are ready to create one big dataframe
dfs.reduce( _ unionAll _).show
+---+----+----+----------+
| id| x| y| file|
+---+----+----+----------+
| 1| 1|null|d1.parquet|
| 2| 2|null|d1.parquet|
| 0| 0|null|d1.parquet|
| 0|null| 0|d2.parquet|
| 1|null| 1|d2.parquet|
+---+----+----+----------+
然而,我不建议在大量数据帧上使用 unionAll。由于 spark 对执行计划的分析,对于许多数据帧,它可能会非常慢。我会使用 RDD 版本,虽然它更冗长。
val rdds = sc.union(dfs.map(_.rdd))
// let's not forget to add the path to the schema
val big_df = spark.createDataFrame(rdds,
big_schema.add(StructField("path", StringType, true)))
transform(big_df)
.write
.partitionBy("path")
.parquet("hdfs:///tmp/processed.parquet")
看看我处理过的目录,我明白了:
hdfs:///tmp/processed.parquet/_SUCCESS
hdfs:///tmp/processed.parquet/path=d1.parquet
hdfs:///tmp/processed.parquet/path=d2.parquet