【问题标题】:os.chdir between multiple python processes多个python进程之间的os.chdir
【发布时间】:2014-02-26 10:10:37
【问题描述】:

我有一个复杂的 python 管道(我无法更改的代码),调用多个其他脚本和其他可执行文件。关键是运行 8000 多个目录需要很长时间,并进行一些科学分析。因此,我使用多处理模块编写了一个简单的包装器(可能不是最有效,但似乎有效)。

from os import path, listdir, mkdir, system
from os.path import join as osjoin, exists, isfile
from GffTools import Gene, Element, Transcript
from GffTools import read as gread, write as gwrite, sort as gsort
from re import match
from multiprocessing import JoinableQueue, Process
from sys import argv, exit

# some absolute paths
inbase = "/.../abfgp_in"
outbase = "/.../abfgp_out"
abfgp_cmd = "python /.../abfgp-2.rev/abfgp.py"
refGff = "/.../B0510_manual_reindexed_noSeq.gff"

# the Queue
Q = JoinableQueue()
i = 0

# define number of processes
try: num_p = int(argv[1])
except ValueError: exit("Wrong CPU argument")

# This is the function calling the abfgp.py script, which in its turn calls alot of third party software
def abfgp(id_, pid):
    out = osjoin(outbase, id_)
    if not exists(out): mkdir(out)

    # logfile
    log = osjoin(outbase, "log_process_%s" %(pid))
    try:
        # call the script
        system("%s --dna %s --multifasta %s --target %s -o %s -q >>%s" %(abfgp_cmd, osjoin(inbase, id_, id_ +".dna.fa"), osjoin(inbase, id_, "informants.mfa"), id_, out, log))
    except:
        print "ABFGP FAILED"
        return

# parse the output
def extractGff(id_):
   # code not relevant


# function called by multiple processes, using the Queue
def run(Q, pid):
    while not Q.empty():
        try:
            d = Q.get()             
            print "%s\t=>>\t%s" %(str(i-Q.qsize()), d)          
            abfgp(d, pid)
            Q.task_done()
        except KeyboardInterrupt:
            exit("Interrupted Child")

# list of directories
genedirs = [d for d in listdir(inbase)]
genes = gread(refGff)
for d in genedirs:
    i += 1
    indir = osjoin(inbase, d)
    outdir = osjoin(outbase, d)
    Q.put(d)

# this loop creates the multiple processes
procs = []
for pid in range(num_p):
    try:
        p = Process(target=run, args=(Q, pid+1))
        p.daemon = True
        procs.append(p) 
        p.start()
    except KeyboardInterrupt:
        print "Aborting start of child processes"
        for x in procs:
            x.terminate()
        exit("Interrupted")     

try:
    for p in procs:
        p.join()
except:
    print "Terminating child processes"
    for x in procs:
        x.terminate()
    exit("Interrupted")

print "Parsing output..."
for d in genedirs: extractGff(d)

现在的问题是,abfgp.py 使用了 os.chdir 函数,这似乎破坏了并行处理。我收到很多错误,指出无法找到某些(输入/输出)文件/目录进行读/写。尽管我通过 os.system() 调用脚本,但我认为生成单独的进程会阻止这种情况发生。

如何解决这些 chdir 干扰?

编辑:我可能会使用正确的目录将 os.system() 更改为 subprocess.Popen(cwd="...") 。我希望这会有所作为。

谢谢。

【问题讨论】:

  • 你为什么使用os.system而不是subprocess.call?如果没有字符串插值,它会少得多。
  • 好的提示,你是对的 :),但正如我所说,我虽然 os.system 会解决 chdir 干扰

标签: python multiprocessing os.system chdir


【解决方案1】:

编辑 2

不要使用os.system() 使用subprocess.call()

system("%s --dna %s --multifasta %s --target %s -o %s -q >>%s" %(abfgp_cmd, osjoin(inbase, id_, id_ +".dna.fa"), osjoin(inbase, id_, "informants.mfa"), id_, out, log))

会翻译成

subprocess.call((abfgp_cmd, '--dna', osjoin(inbase, id_, id_ +".dna.fa"), '--multifasta', osjoin(inbase, id_, "informants.mfa"), '--target', id_, '-o', out, '-q')) # without log.

编辑 1 我认为问题在于多处理正在使用模块名称来序列化函数、类。

这意味着如果您执行 import module ,其中模块位于 ./module.py 并且您执行类似 os.chdir('./dir') 之类的操作,现在您需要 from .. import module

子进程继承父进程的文件夹。这可能是个问题。

解决方案

  1. 确保所有模块都已导入(在子进程中),然后更改目录
  2. 将原始os.getcwd() 插入sys.path 以启用从原始目录导入。这必须在从本地目录调用任何函数之前完成。
  3. 将您使用的所有函数放在一个可以随时导入的目录中。 site-packages 可以是这样一个目录。然后你可以做一些类似import module module.main() 的事情来开始你所做的事情。
  4. 我之所以这样做,是因为我知道 pickle 是如何工作的。只有在其他尝试失败时才使用它。 脚本打印:

    serialized # the function runD is serialized
    string executed # before the function is loaded the code is executed
    loaded # now the function run is deserialized
    run # run is called
    

    在你的情况下,你会做这样的事情:

    runD = evalBeforeDeserialize('__import__("sys").path.append({})'.format(repr(os.getcwd())), run)
    p = Process(target=runD, args=(Q, pid+1))
    

    这是脚本:

    # functions that you need
    
    class R(object):
        def __init__(self, call, *args):
    
            self.ret = (call, args)
        def __reduce__(self):
            return self.ret
        def __call__(self, *args, **kw):
            raise NotImplementedError('this should never be called')
    
    class evalBeforeDeserialize(object):
        def __init__(self, string, function):
            self.function = function
            self.string = string
        def __reduce__(self):
            return R(getattr, tuple, '__getitem__'), \
                     ((R(eval, self.string), self.function), -1)
    
    # code to show how it works        
    
    def printing():
        print('string executed')
    
    def run():
        print('run')
    
    runD = evalBeforeDeserialize('__import__("__main__").printing()', run)
    
    import pickle
    
    s = pickle.dumps(runD)
    print('serialized')
    run2 = pickle.loads(s)
    print('loaded')
    run2()
    

如果这些不起作用,请报告。

【讨论】:

  • 感谢您的努力,但我认为您误解了我的意思。我无法更改使用 chdir 的“abfgp.py”中的代码。因此,如果我生成 abfgp.py 的多个进程,它们将为每个进程创建 chdir。这些不同的进程相互干扰,相互改变输入和输出目录。所以我无法更改导入。
  • 这是否真的意味着如果您在一个进程中执行os.chdir,它会在另一个进程中更改os.getcwd()
  • 我也觉得很奇怪,但这就是我正在经历的。我认为 cwd 存储在 sys.path 中(没有检查),这对于所有 python 进程都是全局的,对吗?我可以试试这个:*.com/questions/13757734/…
  • 每个进程都有自己的sys.path。一开始,它们应该看起来都一样,但可以更改。子流程有效吗?您使用哪种操作系统?或许在副本中更改代码并使用副本也是正确的方法。
  • 我尝试在某种超级计算机上运行它,运行 Ubuntu。我将代码更改为 subprocess.call。我需要一段时间才能得到一些错误,成功。
【解决方案2】:

您可以确定不可更改程序正在使用os 库的哪个实例;然后在该库中创建一个定制版本的chdir 来满足您的需要——防止目录更改、记录它等等。如果需要为单个程序定制行为,您可以使用inspect 模块来识别调用者并以特定方式为该调用者定制行为。

如果您确实无法更改现有程序,您的选择将受到限制;但是,如果您可以选择更改它导入的库,那么这样的方法可能是避免不良行为的一种侵入性最小的方法。

更改标准库时通常需要注意。

【讨论】: