【问题标题】:DataFrame partitionBy to a single Parquet file (per partition)DataFrame partitionBy 到单个 Parquet 文件(每个分区)
【发布时间】:2016-04-19 18:43:08
【问题描述】:

我想重新分区/合并我的数据,以便将其保存到每个分区的一个 Parquet 文件中。我还想使用 Spark SQL partitionBy API。所以我可以这样做:

df.coalesce(1)
    .write
    .partitionBy("entity", "year", "month", "day", "status")
    .mode(SaveMode.Append)
    .parquet(s"$location")

我已经对此进行了测试,但它似乎表现不佳。这是因为数据集中只有一个分区可以处理,文件的所有分区、压缩和保存都必须由一个 CPU 内核完成。

我可以在调用 coalesce.

但是使用标准 Spark SQL API 是否有更好的方法来做到这一点?

【问题讨论】:

  • 您找到解决方案了吗?

标签: apache-spark apache-spark-sql


【解决方案1】:

我遇到了完全相同的问题,我找到了一种使用DataFrame.repartition() 的方法。使用coalesce(1) 的问题是你的并行度下降到1,它可能会很慢,最坏的情况是出错。增加这个数字也无济于事——如果你这样做coalesce(10),你会得到更多的并行度,但最终每个分区有 10 个文件。

要在不使用coalesce() 的情况下为每个分区获取一个文件,请将repartition() 与您希望输出分区的相同列一起使用。所以在你的情况下,这样做:

import spark.implicits._
df.repartition($"entity", $"year", $"month", $"day", $"status").write.partitionBy("entity", "year", "month", "day", "status").mode(SaveMode.Append).parquet(s"$location")

一旦我这样做了,我会为每个输出分区获得一个 parquet 文件,而不是多个文件。

我在 Python 中对此进行了测试,但我认为在 Scala 中应该是相同的。

【讨论】:

  • 我猜@PatrickMcGloin 没有报告,但这很好用,我会鼓励帕特里克接受答案。
  • @GlennieHellesSindholt - 你是对的。回答接受。感谢 user3033652。
  • 请注意,在 scala 2.0 中需要提供一个新的 org.apache.spark.sql.Column("entity") 等作为重新分区的参数
  • @morpheus 或者只是$"entity"
  • 使用 Spark 1.6 这非常适合 Parquet。但是,使用 Avro,我仍然每个分区都有多个文件。
【解决方案2】:

根据定义:

coalesce(numPartitions: Int): 数据帧 返回一个新的 DataFrame,它正好有 numPartitions 个分区。

您可以通过 numPartitions 参数使用它来减少 RDD/DataFrame 中的分区数。它对于过滤大型数据集后更有效地运行操作很有用。

关于您的代码,它表现不佳,因为您实际上在做的是:

  1. 将所有内容放入 1 个分区,这会使驱动程序过载,因为它将所有数据拉入驱动程序上的 1 个分区(这也不是一个好习惯)

  2. coalesce 实际上是对网络上的所有数据进行洗牌,这也可能导致性能损失。

shuffle 是 Spark 用于重新分配数据的机制,以便跨分区以不同方式分组。这通常涉及跨执行器和机器复制数据,这使得 shuffle 成为一项复杂且成本高昂的操作。

shuffle 概念对于管理和理解非常重要。总是最好尽可能地洗牌,因为这是一项昂贵的操作,因为它涉及磁盘 I/O、数据序列化和网络 I/O。为了组织 shuffle 的数据,Spark 生成任务集 - 映射任务来组织数据,以及一组 reduce 任务来聚合它。此命名法来自 MapReduce,与 Spark 的 map 和 reduce 操作没有直接关系。

在内部,各个地图任务的结果会保存在内存中,直到无法容纳为止。然后,这些根据目标分区排序并写入单个文件。在 reduce 方面,任务读取相关的排序块。

关于分区 parquet,我建议您阅读答案 here 关于 Spark DataFrames with Parquet Partitioning 以及 Spark Programming Guide for Performance Tuning 中的 section

我希望这会有所帮助!

【讨论】:

  • 您好,感谢您的回复。我同意合并有成本。在我当前的代码中,我手动对数据进行分区,然后调用 coalesce 并保存在每个运行良好的分区上。但是我不想自己编写 partitionBy step 我想使用正确的 API。但这样做时,合并必须在 partitionBy 之前。这就是我卡住的地方。
  • 但是你所做的是将所有内容放在 1 个分区中,然后是 partitionBy,你应该只使用 partitionBy 来代替
  • 但是我最终得到了很多文件。我想限制创建的 Parquet 文件的数量。我每分钟流式传输和保存一次,所以每个分区已经有 1440 个文件。我不想乘以它。
  • 好吧,这么说吧,您的代码会将每个分区的 parquet 文件写入文件系统(本地或 HDFS)。这意味着如果您有 10 个不同的实体和 3 个不同的年份,每个 12 个月,等等,您最终可能会创建 1440 个文件。
  • 我仍然不知道如何使用标准 API 有效地将数据保存到每个分区的一个 Parquet 文件中,所以我认为它不能回答问题。
猜你喜欢
  • 2022-01-20
  • 1970-01-01
  • 2020-07-10
  • 1970-01-01
  • 2018-08-09
  • 2019-03-02
  • 1970-01-01
  • 2021-01-03
  • 2021-09-27
相关资源
最近更新 更多