【问题标题】:Renaming spark output csv in azure blob storage在 azure blob 存储中重命名 spark 输出 csv
【发布时间】:2019-05-08 00:15:07
【问题描述】:

我有一个 Databricks 笔记本设置,如下所示;

  • 到 Blob 存储帐户的 pyspark 连接详细信息
  • 通过 spark 数据帧读取文件
  • 转换为 pandas Df
  • pandas Df 上的数据建模
  • 转换为火花 Df
  • 在单个文件中写入 blob 存储

我的问题是,你不能命名文件输出文件,我需要一个静态 csv 文件名。

有没有办法在 pyspark 中重命名它?

## Blob Storage account information
storage_account_name = ""
storage_account_access_key = ""

## File location and File type
file_location = "path/.blob.core.windows.net/Databricks_Files/input"
file_location_new = "path/.blob.core.windows.net/Databricks_Files/out"
file_type = "csv"

## Connection string to connect to blob storage
spark.conf.set(
  "fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
  storage_account_access_key)

数据转换后输出文件

dfspark.coalesce(1).write.format('com.databricks.spark.csv') \
  .mode('overwrite').option("header", "true").save(file_location_new)

然后将文件写入的位置为 "part-00000-tid-336943946930983.....csv"

目标是拥有 "Output.csv"

我看到的另一种方法是在 python 中重新创建它,但尚未在文档中遇到如何将文件输出回 blob 存储。

我知道从 Blob 存储中检索的方法是 .get_blob_to_path 通过microsoft.docs

非常感谢这里的任何帮助。

【问题讨论】:

    标签: python azure apache-spark pyspark azure-storage


    【解决方案1】:

    Hadoop/Spark 会将每个分区的计算结果并行输出到一个文件中,因此您会在 HDFS 输出路径中看到许多 part-<number>-.... 文件,例如您命名的 Output/

    如果要将计算的所有结果输出到一个文件中,可以通过命令hadoop fs -getmerge /output1/part* /output2/Output.csv 将它们合并或使用1 设置reduce 进程的数量,如使用coalesce(1) 函数。

    所以在你的场景中,你只需要调整调用这些函数的顺序,使coalease函数在save函数的前面被调用,如下所示。

    dfspark.write.format('com.databricks.spark.csv') \
      .mode('overwrite').option("header", "true").coalesce(1).save(file_location_new)
    

    【讨论】:

      【解决方案2】:

      coalescerepartition 无助于将数据帧保存到 1 个通常命名的文件中。

      我最终只是重命名了 1 个 csv 文件并删除了带有日志的文件夹:

      def save_csv(df, location, filename):
        outputPath = os.path.join(location, filename + '_temp.csv')
      
        df.repartition(1).write.format("com.databricks.spark.csv").mode("overwrite").options(header="true", inferSchema="true").option("delimiter", "\t").save(outputPath)
      
        csv_files = os.listdir(os.path.join('/dbfs', outputPath))
      
        # moving the parquet-like temp csv file into normally named one
        for file in csv_files:
          if file[-4:] == '.csv':
            dbutils.fs.mv(os.path.join(outputPath,file) , os.path.join(location, filename))
            dbutils.fs.rm(outputPath, True)
      
      # using save_csv
      save_csv_location = 'mnt/.....'
      save_csv(df, save_csv_location, 'name.csv')
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-04-16
        • 2013-04-13
        • 2020-03-27
        • 1970-01-01
        • 2018-11-07
        • 2019-03-08
        相关资源
        最近更新 更多