【问题标题】:Python subprocess return code without waitingPython子进程无需等待即可返回代码
【发布时间】:2014-11-04 13:06:01
【问题描述】:

希望我的问题足够具体,与我读过的任何其他问题无关。我想使用 subprocess 和 multiprocessing 来连续生成一堆作业并将返回码返回给我。问题是我不想等待(),所以我可以一次生成所有作业,但我确实想知道它何时完成,以便我可以获得返回码。我遇到了这个奇怪的问题,如果我 poll() 它将无法运行。它只是挂在活动监视器中而不运行(我在 Mac 上)。我以为我可以使用观察者线程,但我挂在 q_out.get() 上,这让我相信也许我正在填满缓冲区并陷入僵局。我不知道如何解决这个问题。这基本上就是我的代码的样子。如果有人对如何做到这一点有任何更好的想法,我很乐意完全改变我的方法。

def watchJob(p1,out_q):
    while p1.poll() == None:
        pass
    print "Job is done"
    out_q.put(p1.returncode)

def runJob(out_q):
    LOGFILE = open('job_to_run.log','w')
    p1 = Popen(['../../bin/jobexe','job_to_run'], stdout = LOGFILE)
    t = threading.Thread(target=watchJob, args=(p1,out_q))
    t.start()

out_q= Queue()
outlst=[]
for i in range(len(nprocs)):
    proc = Process(target=runJob, args=(out_q,))
    proc.start()
    outlst.append(out_q.get()) # This hangs indefinitely
    proc.join()

【问题讨论】:

  • 有什么特别的理由同时拥有线程和多处理?
  • 如果您不打算阅读内容,为什么还要通过管道传输作业的标准输出?根据作业是否产生大量输出,作业可能会简单地阻止写入标准输出。
  • 我会调查Multiprocessing Pools。他们有几种不同的方式来同步或异步启动进程,并且检索返回码(通过等待或稍后检查)非常灵活。此外,还有几种方法可以附加在子流程完成时执行的回调。
  • 我一定会调查 Pools。这可能是我最终采用的方法。我仍然想知道我在哪里出错只是为了我的理解。感谢到目前为止所有回答的人。

标签: python multithreading multiprocessing subprocess


【解决方案1】:

如果您要在线程中运行watchJob,则没有理由使用p1.poll 进行忙循环;只需调用p1.wait() 阻止直到进程完成。使用忙循环需要不断地释放/重新获取 GIL,这会减慢主线程的速度,并且还会占用 CPU,从而更加损害性能。

另外,如果你没有使用子进程的stdout,你不应该把它发送到PIPE,因为如果进程向stdout缓冲区写入足够多的数据,这可能会导致死锁。填满它(这实际上可能是您的情况)。这里也不需要使用multiprocessing;只需在主线程中调用Popen,然后让watchJob 线程等待进程完成。

import threading
from subprocess import Popen
from Queue import Queue

def watchJob(p1, out_q):
    p1.wait()
    out_q.put(p1.returncode)

out_q = Queue()
outlst=[]
p1 = Popen(['../../bin/jobexe','job_to_run'])
t = threading.Thread(target=watchJob, args=(p1,out_q))
t.start()
outlst.append(out_q.get())
t.join()

编辑:

以下是如何以这种方式同时运行多个作业:

out_q = Queue()
outlst = []
threads = []
num_jobs = 3
for _ in range(num_jobs):
    p = Popen(['../../bin/jobexe','job_to_run'])
    t = threading.Thread(target=watchJob, args=(p1, out_q))
    t.start()
    # Don't consume from the queue yet.

# All jobs are running, so now we can start
# consuming results from the queue.
for _ in range(num_jobs):
    outlst.append(out_q.get())
    t.join()

【讨论】:

  • 很抱歉这里没有完整的代码。它需要多处理,因为当它完成时,它将在 HPC 上的多个处理器上产生多个作业。我用 p1.wait() 尝试了代码,stdout 进入了一个日志文件,根本不指导它。在 outlst.append(out_q.get()) 上完成程序后,它仍然会挂起。因此,如果我不需要生成多个作业,您的解决方案在技术上是有效的,但只是循环 Popen 会导致程序等待。这就是我有观察者线程的原因,因为我希望它不会导致程序等待。
  • @fatalaccidents 它是和我上面的代码挂起的,还是和你的代码挂起的,它使用了multiprocessing.Process?您在运行的代码中使用了哪种Queue
  • 您的代码没有挂起。如果您在混合中添加进程(和来自多处理的队列),您可以让它不挂起,但如果您将它放在一个循环中,它只会运行一个作业,然后运行下一个作业。所以它不是并行运行它们。您能否编写一个同时运行作业的示例?感谢您对此的帮助。
  • @fatalaccidents 它一次运行一个作业,因为您在每次迭代中都从队列中消耗。我已经编辑了我的答案,以展示如何避免这样做。我还不确定您使用multiprocessing.Process 来做什么...使用Popen 运行每个作业将允许它们在多个处理器上同时运行;你不需要multiprocessing
  • 谢谢你,你确实是对的。它就像预期的那样工作。唯一的问题是我想使用此队列信息来了解作业何时完成以及它们是否正确完成以产生新作业。事后阅读队列对我没有帮助,因为工作可能在不同的时间完成。也许我以错误的方式接近这个。我基本上需要消耗某种队列的结果并将其反馈给生成过程。我实际上是从数据库中读取数据,但我试图先获得一个简单的示例。你很有帮助。
【解决方案2】:

这里不需要多处理也不需要线程。你可以并行运行多个子进程,并在一个线程中收集它们的所有规则:

#!/usr/bin/env python3
from subprocess import Popen

def run(cmd, log_filename):
    with open(log_filename, 'wb', 0) as logfile:
        return Popen(cmd, stdout=logfile)

# start several subprocesses
processes = {run(['echo', c], 'subprocess.%s.log' % c) for c in 'abc'}
# now they all run in parallel
# report as soon as a child process exits
while processes: 
    for p in processes: 
        if p.poll() is not None:
           processes.remove(p) 
           print('{} done, status {}'.format(p.args, p.returncode))
           break

p.args 在 Python 3.3+ 中存储 cmd,在早期的 Python 版本中自己跟踪 cmd

另见:

为了限制并行作业的数量,可以使用线程池(如the first link 所示):

#!/usr/bin/env python3
from multiprocessing.dummy import Pool # use threads
from subprocess import Popen

def run_until_done(args):
    cmd, log_filename = args
    try:
        with open(log_filename, 'wb', 0) as logfile:
            p = Popen(cmd, stdout=logfile)
        return cmd, p.wait(), None
    except Exception as e:
        return cmd, None, str(e)

commands = ((('echo', str(d)), 'subprocess.%03d.log' % d) for d in range(500))
pool = Pool(128) # 128 concurrent commands at a time
for cmd, status, error in pool.imap_unordered(run_until_done, commands):
    if error is None:
       fmt = '{cmd} done, status {status}'
    else:
       fmt = 'failed to run {cmd}, reason: {error}'
    print(fmt.format_map(vars())) # or fmt.format(**vars()) on older versions

示例中的线程池有 128 个线程(不多也不少)。它不能同时执行超过 128 个作业。一旦任何线程释放(完成一个作业),它就会占用另一个,等等。并发执行的作业总数受线程数的限制。新工作不会等待所有 128 个以前的工作完成。当 任何 个旧作业完成后,它就会启动。

【讨论】:

  • 这将引发异常,因为processes 的大小在您对其进行迭代时会发生变化。您必须遍历 processes 的副本。
  • 没问题。出于好奇,你为什么还要从一个集合切换到一个列表? removeset 相比效率更高,我假设set.copy() 是O(n),与list[:] 相同。当然,对于这么小的可迭代对象,无论如何它并没有太大的区别......
  • @dano:没有理由。主要任务是“以接近完成的顺序打印退出状态”。 [:].copy() 在视觉上更不易分散注意力。性能在这里并不重要(n 很小)。我只是提到它,以防代码被复制粘贴而没有理解。
  • @dano:好的。我已经做到了 O(n)。
  • @J.F.Sebastian 这似乎可以解决一定数量的进程。让我陷入循环的事情是,我正在尝试运行例如 128 个作业……然后当一个完成从数据库中拉出另一个作业时,我不断地保持 128 个作业的运行。因此,一旦该集合被迭代并且工作开始,我不确定如何监控它以将另一个工作添加到组合中。感谢您的回答,它非常接近,很可能只是因为我的描述不佳而缺少。
猜你喜欢
  • 1970-01-01
  • 2020-12-03
  • 2021-02-13
  • 2017-12-15
  • 2013-05-01
  • 1970-01-01
  • 2017-06-26
  • 2021-06-11
  • 1970-01-01
相关资源
最近更新 更多