【问题标题】:subprocess + multiprocessing - multiple commands in sequencesubprocess + multiprocessing - 多个命令按顺序
【发布时间】:2012-11-08 07:44:40
【问题描述】:

我有一组命令行工具,我想在一系列文件上并行运行。我编写了一个 python 函数来包装它们,看起来像这样:

def process_file(fn):
    print os.getpid()
    cmd1 = "echo "+fn
    p = subprocess.Popen(shlex.split(cmd1))

    # after cmd1 finishes
    other_python_function_to_do_something_to_file(fn)

    cmd2 = "echo "+fn
    p = subprocess.Popen(shlex.split(cmd2))
    print "finish"

if __name__=="__main__":
    import multiprocessing
    p = multiprocessing.Pool()
    for fn in files:
        RETURN = p.apply_async(process_file,args=(fn,),kwds={some_kwds})

虽然这可行,但它似乎并没有运行多个进程;似乎它只是串行运行(我尝试使用 Pool(5) 得到相同的结果)。我错过了什么?对Popen 的调用是否“阻塞”?

编辑:澄清一点。我需要 cmd1,然后是一些 python 命令,然后是 cmd2,以便在每个文件上按顺序执行。

EDIT2:上面的输出具有以下模式:

pid
finish
pid
finish
pid
finish

而类似的调用,使用map 代替apply(但没有任何传递kwds 的规定)看起来更像

pid
pid
pid
finish
finish
finish

但是,map 调用有时(总是?)在明显成功后挂起

【问题讨论】:

  • 在回答这个问题之前……你为什么需要multiprocessing?您可以在一个进程中创建一大堆 Popen 实例,它们将与您在多个进程中创建它们一样并行。
  • 嗯,这是我的问题的一部分(感谢您帮助我澄清)...我的 shell 命令需要连续运行(即 cmd1、cmd2、cmd3)。我希望对每个文件执行每个 series
  • @abarnert 是对的。只是不要在进程上调用communicatewait
  • 嗯,map 正在等待所有孩子完成后才返回,而 apply_async 立即返回。如果您不想在所有作业完成之前阻止,请使用map_asyncimap_unordered。但我认为在这种情况下,你确实想阻止,对吧?无论如何,在您的用例中,我不确定简化调度的好处是否超过了包装函数调用的复杂性,所以我不知道您需要在这部分进行多长时间。
  • 至于map 挂……你确定所有的工作都完成了吗?这实际上是调试的痛苦……但是如果您切换到imap_unordered,您可以获得更多关于正在发生的事情的反馈。 (multiprocessing.Pool 的代码非常简单——看一下。)

标签: python multiprocessing


【解决方案1】:

Popen 的调用是否“阻塞”?

没有。只需创建 subprocess.Popen 即可立即返回,为您提供一个可以等待或以其他方式使用的对象。如果你想阻止,那很简单:

subprocess.check_call(shlex.split(cmd1))

同时,我不确定您为什么要将您的 args 放在一个字符串中,然后尝试将 shlex 它们返回到列表中。为什么不直接写清单呢?

cmd1 = ["echo", fn]
subprocess.check_call(cmd1)

虽然这可行,但它似乎并没有运行多个进程;它似乎只是在串行运行

是什么让你有这样的想法?鉴于每个进程只是尽可能快地将两个进程启动到后台,因此很难判断它们是否并行运行。

如果您想验证您是否从多个处理中获得工作,您可能需要添加一些打印或日志记录(并将os.getpid() 之类的内容放入消息中)。

同时,您似乎正试图在 multiprocessing.Pool.apply_async 周围的循环中完全复制 multiprocessing.Pool.map_async 的效果,除了将每个结果存储在一个名为 RESULT 的变量中,而不是累积结果然后在你可以使用它之前把它扔掉。为什么不直接使用map_async

最后,您问multiprocessing 是否适合这项工作。好吧,你显然需要一些异步的东西:check_call(args(file1)) 必须阻止 other_python_function_to_do_something_to_file(file1),但同时不能阻止 check_call(args(file2))

我可能会使用threading,但实际上并没有太大区别。即使您在一个进程启动成本高昂的平台上,您也已经支付了这笔费用,因为整个过程都在运行 N * M 组子进程,因此另一个 8 个池不会有任何损害。并且通过在线程之间共享数据而意外创建竞赛的风险很小,或者意外创建看起来像是在进程之间共享数据的代码,因为没有什么可共享的。所以,不管你更喜欢哪一个,去吧。

另一种选择是编写一个事件循环。我实际上可能会开始为这个问题自己做,但我会后悔的,你不应该这样做......

【讨论】:

  • 好的,所以如果我想让我的 shell 命令连续运行,check_call 是最好的选择。那么使用multiprocessing 有意义吗? (re:shlex - 我的实际命令比我包含的简单示例更复杂)
  • 现在我明白了这个问题,是的,multiprocessing 至少是一个选项。您想在为第一个文件调用第二个进程之前阻止第一个文件的第一个进程......但在调用第二个文件的第一个进程之前不要。我可能会使用threading(或者我什至可以编写select 循环,然后在移植到Windows 时将我的头撞到墙上……),但multiprocessing 可以正常工作。
  • 太好了,谢谢,这很有意义。我使用apply 而不是map,因为我需要将关键字参数传递给函数。用 kwarg 解析器包装或装饰我的函数然后使用 map_async 是否更有意义?
  • re:为什么我认为它们不是独立的进程?因为我按顺序获得输出结果。即,我的输入是 f1,f2,f3,f4,...,f30,我运行 4 个进程,每个进程按顺序输出,每个进程之间的时间间隔相同;如果它们并行运行,则顺序将是随机的。但是,他们确实有不同的pid。嗯。
  • 对不起,我没有回答你关于 kwargs 的其他问题。 map 方法仅适用于一个参数的函数,这意味着是的,您确实需要将内容包装在例如列表和字典的元组中,并传递一个等效于 lambda arg: realFunc(*arg[0], **arg[1]) 的包装器来代替 @ 987654347@。我在问题的 cmets 中说,这可能比您在调度中节省的工作更多。
猜你喜欢
  • 2010-11-23
  • 2013-07-11
  • 1970-01-01
  • 1970-01-01
  • 2017-10-05
  • 1970-01-01
  • 2011-05-23
  • 2011-09-15
  • 1970-01-01
相关资源
最近更新 更多