【问题标题】:PySpark: Writing input files to separate output files without repartitioningPySpark:将输入文件写入单独的输出文件而不重新分区
【发布时间】:2018-05-30 06:37:19
【问题描述】:

我有一系列非常大的每日 gzip 文件。我正在尝试使用 PySpark 以 Parquet 格式重新保存 S3 中的所有文件以供以后使用。

如果对于单个文件(例如,2012-06-01)我这样做:

dataframe = spark.read.csv('s3://mybucket/input/20120601.gz', schema=my_schema, header=True)
dataframe.write.parquet('s3://mybucket/output/20120601')

它可以工作,但由于 gzip 不可拆分,它在单个主机上运行,​​并且我没有使用集群的任何好处。

我尝试一次读取一大块文件,并使用 partitionBy 将输出写入这样的日常文件(例如,一个月内读取):

dataframe = spark.read.csv('s3://mybucket/input/201206*.gz', schema=my_schema, header=True)
dataframe.write.partitionBy('dayColumn').parquet('s3://mybucket/output/')

这一次,我想要在不同的执行程序中读取单个文件,但是执行程序后来死了,进程失败了。我相信由于文件太大,并且 partitionBy 以某种方式使用了不必要的资源(随机播放?),它会使任务崩溃。

我实际上不需要重新分区我的数据框,因为这只是 1:1 映射。有没有办法让每个单独的任务写入一个单独的、明确命名的 parquet 输出文件?

我在想类似的事情

def write_file(date):
    # get input/output locations from date
    dataframe = spark.read.csv(input_location, schema=my_schema, header=True)
    dataframe.write.parquet(output_location)
spark.sparkContext.parallelize(my_dates).for_each(write_file)

除了这不起作用,因为您无法将 spark 会话广播到集群。有什么建议吗?

【问题讨论】:

    标签: apache-spark pyspark parquet


    【解决方案1】:

    将输入文件写入单独的输出文件而不重新分区

    TL;DR这是您的代码已经在做的事情。

    partitionBy 导致不必要的洗牌

    没有。 DataFrameWriter.partitionBy 根本不洗牌。

    它有效,但由于 gzip 不可拆分

    你可以:

    • 完全放弃压缩 - Parquet 使用内部压缩。
    • 使用像bzip2这样的可拆分压缩。
    • 在提交作业之前将文件解压到临时存储区。

    如果您担心partitionBy 使用的资源(它可能会为每个执行程序线程打开更多的文件),您实际上可以使用 shuffle 来提高性能 - DataFrame partitionBy to a single Parquet file (per partition)。单个文件可能太多了,但是

    dataframe \
        .repartition(n, 'dayColumn', 'someOtherColumn') \
        .write.partitionBy('dayColumn') \
        .save(...)
    

    可以选择someOtherColumn 以获得合理的基数,应该会有所改进。

    【讨论】:

    • 表示 gzip 不可拆分,是一个错字。我测试过,问题肯定是partitionBy。如果我运行完全相同的命令,但没有 partitonBy 它可以工作,除了输出文件没有正确命名。使用 partitionBy,执行者节点会反复杀死自己。我的假设是必须发生洗牌才能导致这种情况,但肯定有一些东西在 partitonBy 中使用了不必要的资源。
    猜你喜欢
    • 2016-09-14
    • 1970-01-01
    • 2015-06-11
    • 2021-09-12
    • 1970-01-01
    • 2014-10-18
    • 1970-01-01
    • 2016-07-09
    • 1970-01-01
    相关资源
    最近更新 更多