【发布时间】:2022-01-19 19:44:47
【问题描述】:
我正在尝试并行化存储在 Databricks 上 s3 中的 pyspark 中的解压缩文件。在for 循环中解压缩是这样的:
file_list = [(file.path, file.name) for file in dbutils.fs.ls(data_path) if os.path.basename(file.path).endswith(".zip")] # data_path is taken as a parameter
file_names = [ff[1] for ff in file_list]
for ff in file_list:
dbutils.fs.cp(ff[0], "/FileStore/tmp/" + ff[1])
cmd = 'unzip /dbfs/FileStore/tmp/' + ff[1]
os.system(cmd)
dbutils.fs.cp("file:/databricks/driver/" + ff[1], data_path)
dbutils.fs.rm("file:/databricks/driver/" + ff[1])
我正在尝试并行化解压缩部分。所以在将文件复制到"/FileStore/tmp/" 之后,我正在运行:
unzips = [file[1] for file in file_list]
def f(x):
os.system('unzip /dbfs/FileStore/tmp/' + x)
sc.parallelize(unzips).foreach(f)
作业运行,但文件未在任何地方解压缩。
【问题讨论】:
标签: pyspark databricks unzip