【问题标题】:repartition and coalesce not working as expected重新分区和合并没有按预期工作
【发布时间】:2021-03-16 05:36:21
【问题描述】:

我有一个大小为 2.5 GB 的数据框。分区数为 5000。我正在尝试对其重新分区,然后将其持久化。但是在我读取持久数据后,分区的数量正在发生变化。

val df = spark.read.parquet(".../oldPartition") // df has 5000 partitions
df.repartition(300).write.parquet(".../newPartition")

df.read.parquet(".../newPartition") // This doesn't have 300 partitions as expected.

我什至尝试使用合并,但没有运气。有人可以解释发生了什么吗?

【问题讨论】:

标签: scala apache-spark hadoop apache-spark-sql


【解决方案1】:

写入数据时,Spark 会为每个分区写入一个文件。因此,您可以检查是否确实是 300 个 parquet 文件,您可以在其中编写重新分区的数据帧。

但读取的分区数是另一回事。它受多种因素影响,例如:

  • 您正在阅读的文件数
  • 您正在阅读的文件的大小
  • spark.default.parallelism
  • spark.files.maxPartitionBytes

您可以查看他们的源代码以获取更多详细信息Parquet DataSource

【讨论】:

    【解决方案2】:

    这样做的唯一方法是:

    在写时使用repartition和partitionBy:

    ...
    val df2 = df.repartition(7, $"_2")
    //df2.rdd.glom().map(_.length).collect()
    df2.write.partitionBy("_2").csv("/SOQ2")
    ...
    

    在随后的阅读中:

    ...
    val df3 = spark.read.csv("/SOQ2") 
    val df4 = df3.repartition(7, $"_2")
    // this val df4 = df3.repartition(7) gives different distribution
    df4.rdd.glom().map(_.length).collect()
    ...
    

    然后填充分区等是相同的,但必须在运行时以这种方式强制执行;除非你使用bucketBy

    【讨论】:

      猜你喜欢
      • 2019-06-03
      • 1970-01-01
      • 1970-01-01
      • 2011-11-11
      • 2018-01-02
      • 1970-01-01
      • 1970-01-01
      • 2016-03-25
      • 1970-01-01
      相关资源
      最近更新 更多