【问题标题】:Databricks pyspark parallelize unzipping multiple filesDatabricks pyspark 并行解压缩多个文件
【发布时间】:2022-01-19 19:44:47
【问题描述】:

我正在尝试并行化存储在 Databricks 上 s3 中的 pyspark 中的解压缩文件。在for 循环中解压缩是这样的:

file_list = [(file.path, file.name) for file in dbutils.fs.ls(data_path) if os.path.basename(file.path).endswith(".zip")] # data_path is taken as a parameter
file_names = [ff[1] for ff in file_list]
for ff in file_list:
  dbutils.fs.cp(ff[0], "/FileStore/tmp/" + ff[1])
  cmd = 'unzip /dbfs/FileStore/tmp/' + ff[1]
  os.system(cmd)
  dbutils.fs.cp("file:/databricks/driver/" + ff[1], data_path)
  dbutils.fs.rm("file:/databricks/driver/" + ff[1])

我正在尝试并行化解压缩部分。所以在将文件复制到"/FileStore/tmp/" 之后,我正在运行:

unzips = [file[1] for file in file_list]
def f(x):
  os.system('unzip /dbfs/FileStore/tmp/' + x)
sc.parallelize(unzips).foreach(f)

作业运行,但文件未在任何地方解压缩。

【问题讨论】:

    标签: pyspark databricks unzip


    【解决方案1】:

    只需添加 -d 选项即可将输出放到 DBFS 上,如下所示:

    def f(x):
      os.system('unzip -o -d /dbfs/FileStore/tmp-output/ /dbfs/FileStore/tmp/' + x)
    

    如有必要(例如,存档中没有目录),为每次运行添加更多唯一目录,例如,将x 附加到它。

    【讨论】:

    • 会试试这个。如何查看unzip 的文档?我不知道该去哪里
    • 不带任何参数运行 %sh unzip,或者在 Linux 或 Mac 上运行 man unzip
    • 这工作除了一个问题。有时,当 spark 移动到 Databricks 上我读取解压缩文件的下一个单元格时,并非所有文件都已解压缩。这会导致它失败并出现错误。知道如何避免这种情况吗?
    • 您需要检查输出(通过 Spark UI 执行程序日志)以查看是否有任何错误
    猜你喜欢
    • 2015-01-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-02-25
    • 1970-01-01
    • 2015-07-09
    相关资源
    最近更新 更多