【问题标题】:Spark parquet partitioning : Large number of filesSpark parquet分区:大量文件
【发布时间】:2017-12-02 04:02:04
【问题描述】:

我正在尝试利用火花分区。我试图做类似的事情

data.write.partitionBy("key").parquet("/location")

这里的问题是每个分区都会创建大量 parquet 文件,如果我尝试从根目录读取,则会导致读取速度缓慢。

为了避免我尝试过

data.coalese(numPart).write.partitionBy("key").parquet("/location")

然而,这会在每个分区中创建 numPart 数量的镶木地板文件。 现在我的分区大小不同了。所以我理想情况下希望每个分区有单独的合并。然而,这看起来并不容易。我需要访问所有分区合并到一定数量并存储在单独的位置。

我应该如何使用分区来避免写入后的许多文件?

【问题讨论】:

  • 我认为您正在寻找的是一种通过数据分区大小动态缩放输出文件数量的方法。我总结了如何完成这个here,以及一个完整的、独立的演示here

标签: apache-spark spark-dataframe rdd apache-spark-2.0 bigdata


【解决方案1】:

首先我真的会避免使用coalesce,因为这通常会在转换链中被推到更远的位置,并且可能会破坏你工作的并行性(我在这里问过这个问题:Coalesce reduces parallelism of entire stage (spark)

为每个 parquet-partition 写入 1 个文件非常容易(请参阅 Spark dataframe write method writing many small files):

data.repartition($"key").write.partitionBy("key").parquet("/location")

如果您想设置任意数量的文件(或大小相同的文件),您需要使用另一个可以使用的属性进一步重新分区您的数据(我无法告诉您这可能是什么情况):

data.repartition($"key",$"another_key").write.partitionBy("key").parquet("/location")

another_key 可以是数据集的另一个属性,或者是对现有属性使用一些模或舍入运算的派生属性。您甚至可以在 key 上使用带有 row_number 的窗口函数,然后将其四舍五入

data.repartition($"key",floor($"row_number"/N)*N).write.partitionBy("key").parquet("/location")

这会将您的 N 记录放入 1 个镶木地板文件中

使用 orderBy

您还可以通过相应地对数据框进行排序来控制文件的数量而无需重新分区:

data.orderBy($"key").write.partitionBy("key").parquet("/location")

这将导致所有分区(默认为 200 个)总共(至少,但不多于)spark.sql.shuffle.partitions 个文件。在$key 之后添加第二个排序列甚至是有益的,因为 parquet 会记住数据帧的顺序并相应地写入统计信息。例如,您可以按 ID 订购:

data.orderBy($"key",$"id").write.partitionBy("key").parquet("/location")

这不会改变文件的数量,但会提高您在 parquet 文件中查询给定的 keyid 时的性能。参见例如https://www.slideshare.net/RyanBlue3/parquet-performance-tuning-the-missing-guidehttps://db-blog.web.cern.ch/blog/luca-canali/2017-06-diving-spark-and-parquet-workloads-example

Spark 2.2+

从 Spark 2.2 开始,您还可以使用新选项 maxRecordsPerFile 来限制每个文件的记录数如果文件太大。如果你有 N 个分区,你仍然会得到至少 N 个文件,但是你可以将 1 个分区(任务)写入的文件拆分成更小的块:

df.write
.option("maxRecordsPerFile", 10000)
...

参见例如http://www.gatorsmile.io/anticipated-feature-in-spark-2-2-max-records-written-per-file/spark write to disk with N files less than N partitions

【讨论】:

  • 即使在包含repartitionpartitionBy 之后,我仍然看到只有一名工作人员保存parquet 文件:请参阅stackoverflow.com/questions/51050272/…
  • First I would really avoid using coalesce, as this is often pushed up further in the chain of transformation and may destroy the parallelism of your job (I asked about this issue here : How to prevent Spark optimization) - @viirya 对您的问题的回答中的主要观点之一不是这不会发生吗?
  • 很好的答案,但我不确定你为什么要避免合并。合并的“狭隘依赖”将避免洗牌,这是一件好事,@Markus 是对的,viirya 的明显回答确实表明它不会被推上链。在大多数情况下,对于大多数人来说,主动阻止 spark 优化并不是一个好建议,尤其是通过引入改组。
  • 嗯,在第二次阅读时,它确实表明 UDF 执行由于合并而发生在更少的节点上。我仍然认为在许多情况下,避免改组的合并将是有益的,并且您始终可以在上游使用其他一些阶段分离操作,例如 reduceByKey。
  • 更全面的答案在stackoverflow.com/a/53037292/13969
【解决方案2】:

这对我很有效:

data.repartition(n, "key").write.partitionBy("key").parquet("/location")

它在每个输出分区(目录)中生成 N 个文件,并且(传闻)比使用 coalesce 更快 (再次传闻,在我的数据集上)比仅在输出。

如果您正在使用 S3,我还建议您在本地驱动器上执行所有操作(Spark 在写出期间执行大量文件创建/重命名/删除),一旦完成,请使用 hadoop FileUtil(或仅使用 aws cli) 复制所有内容:

import java.net.URI
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
// ...
  def copy(
          in : String,
          out : String,
          sparkSession: SparkSession
          ) = {
    FileUtil.copy(
      FileSystem.get(new URI(in), sparkSession.sparkContext.hadoopConfiguration),
      new Path(in),
      FileSystem.get(new URI(out), sparkSession.sparkContext.hadoopConfiguration),
      new Path(out),
      false,
      sparkSession.sparkContext.hadoopConfiguration
    )
  }

编辑:根据 cmets 中的讨论:

您的数据集的分区列为 YEAR,但每个给定的 YEAR 中的数据量都大不相同。因此,一年可能有 1GB 的数据,但另一年可能有 100GB。

这是处理此问题的一种方法的伪代码:

val partitionSize = 10000 // Number of rows you want per output file.
val yearValues = df.select("YEAR").distinct
distinctGroupByValues.each((yearVal) -> {
  val subDf = df.filter(s"YEAR = $yearVal")
  val numPartitionsToUse = subDf.count / partitionSize
  subDf.repartition(numPartitionsToUse).write(outputPath + "/year=$yearVal")
})

但是,我实际上不知道这会起作用。 Spark 可能会在每个列分区读取可变数量的文件时遇到问题。

另一种方法是编写您自己的自定义分区程序,但我不知道其中涉及什么,所以我无法提供任何代码。

【讨论】:

  • @Raphael Roth,感谢您提供的非常好的信息,我的公司在过去 20 年中每年和每季度都有数据。随着公司不断增长的数据同比增长。所以一些早期的年度数据是几百条记录,但最近几年的数据是几百万条记录。我应该/我应该如何均匀地划分这些数据?以便所有镶木地板文件或多或少相同数量的数据/大小。请建议...谢谢
  • 好的,嗯。假设二十年前,您有 10MB 的数据。十年前你有 10GB,而今年你有 10TB。假设您希望每个分区文件为 100MB。
  • AFAIK,如果您按列(例如年份)分区然后分成 N 个文件,每个文件最终都会得到 D*N 个文件,其中 D 是您从列分区获得的分区数.因此,如果分区列是一年,而您有二十年的数据,则 D 是二十。
  • 但是,这些年来你不会得到大小相同的文件,因为 10MB 的文件会分成 N 个更小的文件,10TB 的文件也是如此。 AKA,如果 N 为 10,您将获得 10 个 1MB 文件用于真正的旧数据,但 10 个 1TB 文件用于最新数据。
  • 您可以解决一些问题,将每个列分区手动划分为不同数量的文件;比如说,将数据帧分成多个数据帧(每年一个),然后分别写出每个数据帧 - 我认为你可以很好地阅读它。
【解决方案3】:

让我们用另一种方法扩展 Raphael Roth 的答案,该方法将创建每个分区可以包含的文件数量的上限,as discussed in this answer

import org.apache.spark.sql.functions.rand

df.repartition(numPartitions, $"some_col", rand)
  .write.partitionBy("some_col")
  .parquet("partitioned_lake")

【讨论】:

    【解决方案4】:

    这里的其他答案都很好但是有一些问题:

    • 依靠maxRecordsPerFile 将大分区分解为较小的文件非常方便,但有两个注意事项:

      1. 如果您的分区列严重倾斜,则通过它们重新分区意味着可能会将最大数据分区的所有数据移动到单个 DataFrame 分区中。如果该 DataFrame 分区变得太大,仅此一项可能会使您的工作崩溃。

        举个简单的例子,想象一下repartition("country") 会为世界上每个人都有 1 行的 DataFrame 做什么。

      2. maxRecordsPerFile 将确保您的输出文件不超过一定的行数,但只有一个任务能够连续写出这些文件。一项任务必须处理整个数据分区,而不是能够写出包含多个任务的大型数据分区。

    • repartition(numPartitions, $"some_col", rand) 是一个优雅的解决方案,但不能很好地处理小数据分区。它会为每个数据分区写出numPartitions 文件,即使它们很小。

      在许多情况下这可能不是问题,但如果您有一个大型数据湖,您就会知道写出许多小文件会随着时间的推移降低数据湖的性能。

    因此,一种解决方案不适用于非常大的数据分区,而另一种解决方案不适用于非常小的数据分区。

    我们需要一种根据数据分区大小动态扩展输出文件数量的方法。如果它非常大,我们需要很多文件。如果它非常小,我们只需要几个文件,甚至只需要一个文件。

    解决方案是使用repartition(..., rand) 扩展该方法,并根据该数据分区所需的输出文件数动态扩展rand 的范围。

    这是the solution I posted 在一个非常相似的问题上的精髓:

    # In this example, `id` is a column in `skewed_data`.
    partition_by_columns = ['id']
    desired_rows_per_output_file = 10
    
    partition_count = skewed_data.groupBy(partition_by_columns).count()
    
    partition_balanced_data = (
        skewed_data
        .join(partition_count, on=partition_by_columns)
        .withColumn(
            'repartition_seed',
            (
                rand() * partition_count['count'] / desired_rows_per_output_file
            ).cast('int')
        )
        .repartition(*partition_by_columns, 'repartition_seed')
    )
    

    这将平衡输出文件的大小,而不考虑分区倾斜,并且不会限制您的并行度或为小分区生成太多小文件。

    如果您想自己运行此代码,我提供了 a self-contained example,以及 DataFrame 分区正确平衡的证明。

    【讨论】:

      猜你喜欢
      • 2018-10-04
      • 1970-01-01
      • 1970-01-01
      • 2019-11-10
      • 2019-03-02
      • 2016-02-06
      • 1970-01-01
      • 2015-01-27
      • 1970-01-01
      相关资源
      最近更新 更多