【问题标题】:repartition and coalesce not working as expected重新分区和合并没有按预期工作
【发布时间】:2021-03-16 05:36:21
【问题描述】:
我有一个大小为 2.5 GB 的数据框。分区数为 5000。我正在尝试对其重新分区,然后将其持久化。但是在我读取持久数据后,分区的数量正在发生变化。
val df = spark.read.parquet(".../oldPartition") // df has 5000 partitions
df.repartition(300).write.parquet(".../newPartition")
df.read.parquet(".../newPartition") // This doesn't have 300 partitions as expected.
我什至尝试使用合并,但没有运气。有人可以解释发生了什么吗?
【问题讨论】:
标签:
scala
apache-spark
hadoop
apache-spark-sql
【解决方案1】:
写入数据时,Spark 会为每个分区写入一个文件。因此,您可以检查是否确实是 300 个 parquet 文件,您可以在其中编写重新分区的数据帧。
但读取的分区数是另一回事。它受多种因素影响,例如:
- 您正在阅读的文件数
- 您正在阅读的文件的大小
spark.default.parallelism
spark.files.maxPartitionBytes
您可以查看他们的源代码以获取更多详细信息Parquet DataSource
【解决方案2】:
这样做的唯一方法是:
在写时使用repartition和partitionBy:
...
val df2 = df.repartition(7, $"_2")
//df2.rdd.glom().map(_.length).collect()
df2.write.partitionBy("_2").csv("/SOQ2")
...
在随后的阅读中:
...
val df3 = spark.read.csv("/SOQ2")
val df4 = df3.repartition(7, $"_2")
// this val df4 = df3.repartition(7) gives different distribution
df4.rdd.glom().map(_.length).collect()
...
然后填充分区等是相同的,但必须在运行时以这种方式强制执行;除非你使用bucketBy。