【发布时间】:2021-06-22 12:59:16
【问题描述】:
在 HDFS 中,我有一个文件夹 /landing/my_data/flow_name/ 在这个文件夹中我有一堆 .csv 文件:
- /landing/my_data/children_flow/20212503_1_children.csv
- /landing/my_data/children_flow/20212503_2_children.csv
- /landing/my_data/children_flow/20212503_3_children.csv
我想使用/landing/my_data/children_flow/ 目录创建一个按文件名分区的拼花文件,并将其放入/staging/my_data/children_flow/。
我希望/staging/my_data/children_flow/ 看起来像这样:
- /staging/my_data/children_flow/file_name=20212503_1_children/*.parquet
- /staging/my_data/children_flow/file_name=20212503_2_children/*.parquet
- /staging/my_data/children_flow/file_name=20212503_3_children/*.parquet
我考虑将/landing/my_data/children_flow 中的每个文件读入DataFrame,然后将文件名添加为一列,最后创建我的parquet 文件,该文件将由file_name 列分区。
children_files = hdfs.list("/landing/my_data/children_flow/")
df = spark.read.csv(os.path.join("/landing/my_data/children_flow/", children_files[0]))
df = df.withColumn("file_name", lit(children_files[0].replace(".csv", "")))
children_files.pop(0)
for one_file in children_files :
df2 = spark.read.csv(os.path.join("/landing/my_data/children_flow/", one_file))
df2 = df2.withColumn("file_name", lit(one_file.replace(".csv", "")))
df = df.union(df2)
df.write.partitionBy("file_name").parquet("/staging/my_data/children_flow/")
但我不确定这是最好的解决方案...谁能帮我解决这个问题?
【问题讨论】:
标签: python apache-spark hadoop pyspark