我没有对此进行测试,但它可能会解决您使用命令解压缩的问题。
命令gunzip -k 是同时保留压缩和解压缩文件那么output 目录的目的是什么。
import subprocess
import gzip
def decompressed_files():
print('starting decompressed_files')
# files where the data is stored
input_folder=('input')
# where I want my data to be
output_folder = input_folder + '/output'
if os.path.exists(output_folder):
print('folder already exists')
else:
os.makedirs(output_folder)
print('folder has been created')
for f in os.listdir(input_folder):
if f and f.endswith('.gz'):
cmd = ['gunzip', '-k', f, output_folder]
my_file = subprocess.Popen(cmd)
my_file.wait
print(cmd) 如下所示
['gunzip', '-k', 'input/sample.gz', 'input/output']
我的文件夹中有一些文件,我想创建循环
从上面引用你的实际问题似乎是从路径解压缩多个 *.gz 文件
在这种情况下,下面的代码应该可以解决您的问题。
import os
import shutil
import fnmatch
def gunzip(file_path,output_path):
with gzip.open(file_path,"rb") as f_in, open(output_path,"wb") as f_out:
shutil.copyfileobj(f_in, f_out)
def make_sure_path_exists(path):
try:
os.makedirs(path)
except OSError:
if not os.path.isdir(path):
raise
def recurse_and_gunzip(input_path):
walker = os.walk(input_path)
output_path = 'files/output'
make_sure_path_exists(output_path)
for root, dirs, files in walker:
for f in files:
if fnmatch.fnmatch(f,"*.gz"):
gunzip(root + '/' + f, output_path + '/' + f.replace(".gz",""))
recurse_and_gunzip('files')
source
编辑:
使用命令行参数 -
subprocess.Popen(base_cmd + args):
在新进程中执行子程序。在 Unix 上,该类使用类似 os.execvp() 的行为来执行子程序
fasta.gz: 没有这样的文件或目录
因此cmd 列表中的任何额外元素都被视为参数,gunzip 将查找argument.gz 文件,因此找不到错误fasta.gz 文件。
ref 和 some useful examples
现在,如果您想将 gz 文件作为命令行参数传递,您仍然可以使用以下代码来执行此操作(您可能需要根据需要稍微润色)
import argparse
import subprocess
import os
def write_to_desired_location(stdout_data,output_path):
print("Going to write to path", output_path)
with open(output_path, "wb") as f_out:
f_out.write(stdout_data)
def decompress_files(gz_files):
base_path=('files') # my base path
output_path = base_path + '/output' # output path
if os.path.exists(output_path):
print('folder already exists')
else:
os.makedirs(output_path)
print('folder has been created')
for f in gz_files:
if f and f.endswith('.gz'):
print('starting decompressed_files', f)
proc = subprocess.Popen(['gunzip', '-dc', f], stdout=subprocess.PIPE) # d:decompress and c:stdout
write_to_desired_location(proc.stdout.read(), output_path + '/' + f.replace(".gz", ""))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"-gzfilelist",
required=True,
nargs="+", # 1 or more arguments
type=str,
help='Provide gz files as arguments separated by space Ex: -gzfilelist test1.txt.tar.gz test2.txt.tar.gz'
)
args = parser.parse_args()
my_list = [str(item)for item in args.gzfilelist] # converting namedtuple into list
decompress_files(gz_files=my_list)
执行:
python unzip_file.py -gzfilelist test.txt.tar.gz
输出
folder already exists
('starting decompressed_files', 'test.txt.tar.gz')
('Going to write to path', 'files/output/test.txt.tar')
例如,您也可以传递多个 gz 文件
python unzip_file.py -gzfilelist test1.txt.tar.gz test2.txt.tar.gz test3.txt.tar.gz