【发布时间】:2021-04-22 19:51:44
【问题描述】:
我有一堆具有相同架构的大型 csv 文件。我想合并这些文件并将结果写入镶木地板文件,按列file_name 分区。
这是我到目前为止所做的:
children_files = hdfs.list("/landing/my_data/children_flow/")
df = spark.createDataFrame(
spark.sparkContext.emptyRDD(),
dfSchema
)
df = df.withColumn("file_name", lit(None))
for one_file in children_files :
df2 = spark.read.csv(os.path.join("/landing/my_data/children_flow/", one_file))
df2 = df2.withColumn("file_name", lit(one_file.replace(".csv", "")))
df = df.union(df2)
df.write.partitionBy("file_name").parquet("/staging/my_data/children_flow/")
问题是我收到java.lang.OutOfMemoryError: Java heap space 错误。
我试着做一个
df.repartition(3000, "file_name").write.partitionBy("file_name").parquet("/staging/my_data/children_flow/")
但它也不起作用。您能提出解决方案吗?
【问题讨论】:
标签: python apache-spark pyspark apache-spark-sql