【发布时间】:2018-05-30 06:37:19
【问题描述】:
我有一系列非常大的每日 gzip 文件。我正在尝试使用 PySpark 以 Parquet 格式重新保存 S3 中的所有文件以供以后使用。
如果对于单个文件(例如,2012-06-01)我这样做:
dataframe = spark.read.csv('s3://mybucket/input/20120601.gz', schema=my_schema, header=True)
dataframe.write.parquet('s3://mybucket/output/20120601')
它可以工作,但由于 gzip 不可拆分,它在单个主机上运行,并且我没有使用集群的任何好处。
我尝试一次读取一大块文件,并使用 partitionBy 将输出写入这样的日常文件(例如,一个月内读取):
dataframe = spark.read.csv('s3://mybucket/input/201206*.gz', schema=my_schema, header=True)
dataframe.write.partitionBy('dayColumn').parquet('s3://mybucket/output/')
这一次,我想要在不同的执行程序中读取单个文件,但是执行程序后来死了,进程失败了。我相信由于文件太大,并且 partitionBy 以某种方式使用了不必要的资源(随机播放?),它会使任务崩溃。
我实际上不需要重新分区我的数据框,因为这只是 1:1 映射。有没有办法让每个单独的任务写入一个单独的、明确命名的 parquet 输出文件?
我在想类似的事情
def write_file(date):
# get input/output locations from date
dataframe = spark.read.csv(input_location, schema=my_schema, header=True)
dataframe.write.parquet(output_location)
spark.sparkContext.parallelize(my_dates).for_each(write_file)
除了这不起作用,因为您无法将 spark 会话广播到集群。有什么建议吗?
【问题讨论】:
标签: apache-spark pyspark parquet