【问题标题】:Spark union large csv files and write to parquet fileSpark联合大型csv文件并写入镶木地板文件
【发布时间】:2021-04-22 19:51:44
【问题描述】:

我有一堆具有相同架构的大型 csv 文件。我想合并这些文件并将结果写入镶木地板文件,按列file_name 分区。

这是我到目前为止所做的:

children_files = hdfs.list("/landing/my_data/children_flow/")
df = spark.createDataFrame(
            spark.sparkContext.emptyRDD(),
            dfSchema
)
df = df.withColumn("file_name", lit(None))
for one_file in children_files :
    df2 = spark.read.csv(os.path.join("/landing/my_data/children_flow/", one_file))
    df2 = df2.withColumn("file_name", lit(one_file.replace(".csv", "")))
    df = df.union(df2)
df.write.partitionBy("file_name").parquet("/staging/my_data/children_flow/")

问题是我收到java.lang.OutOfMemoryError: Java heap space 错误。

我试着做一个

df.repartition(3000, "file_name").write.partitionBy("file_name").parquet("/staging/my_data/children_flow/")

但它也不起作用。您能提出解决方案吗?

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql


    【解决方案1】:

    HDFS 位置中的文件似乎过多。因为有这么多文件,而union 是一个转换而不是动作,Spark 在尝试构建物理计划时可能会运行 OutOfMemory。

    无论如何,如果您打算读取大量具有相同架构的 csv 文件,我会使用结构化流。为避免手动编写架构,您可以从示例文件中推断出它。

    下面的代码展示了这个想法:

    # define input and output paths and checkpoint location
    csvPath = "hdfs:///landing/my_data/children_flow/"
    parquetPath = "hdfs:///staging/my_data/children_flow/"
    checkpointLoc = "hdfs:///path/to/dir/"
    
    # infer Schema from one example csv file
    schemaDf = spark.read.format("csv").option("inferSchema", true).load(csvPath + "first.csv")
    schema = schemaDf.schema
    
    # create Stream by reading from csv and writing to parquet
    df = spark.readStream \
      .format("csv") \
      .schema(schema) \
      .load(csvPath) \
      .withColumn("file_name", split(input_file_name(), "/")(4))
    
    query = df.writeStream \
      .format("parquet") \
      .outputMode("append") \
      .option("path", parquetPath) \
      .option("checkpointLocation", checkpointLoc) \
      .partitionBy("file_name") \
      .start() \
      .awaitTermination()
    

    如果这再次运行内存不足,您可以将选项“maxFilesPerTrigger”设置为一个合适的数字。如果它创建了太多 parquet 文件,你最终可以只读取 -> 重新分区 -> 写入 parquet 数据一次。

    【讨论】:

    • 嗨@Arès,很抱歉这么晚才回来。我正要测试添加所需的列“file_name”,我已经相应地更新了我的答案。请记住,input_file_name() 将返回完整路径,这就是为什么我用“/”分割它,然后选择集合中的最后一项来获取文件名。
    猜你喜欢
    • 2019-06-02
    • 2021-02-18
    • 1970-01-01
    • 2019-01-08
    • 2017-06-20
    • 1970-01-01
    • 2019-11-20
    • 2018-12-25
    • 2018-01-21
    相关资源
    最近更新 更多