【问题标题】:Python multiprocessing subprocesses with ordered printing?具有有序打印的 Python 多处理子进程?
【发布时间】:2017-11-13 07:01:37
【问题描述】:

我正在尝试并行运行一些 Python 函数,这些函数在整个函数中都有打印命令。我想要的是让每个子进程运行相同的功能,以分组方式输出到主标准输出。我的意思是我希望每个子进程的输出仅在完成任务后才被打印。但是,如果在此过程中发生某种错误,我仍想输出子流程中所做的任何事情。

一个小例子:

from time import sleep
import multiprocessing as mp


def foo(x):
    print('foo')
    for i in range(5):
        print('Process {}: in foo {}'.format(x, i))
        sleep(0.5)


if __name__ == '__main__':
    pool = mp.Pool()

    jobs = []
    for i in range(4):
        job = pool.apply_async(foo, args=[i])
        jobs.append(job)

    for job in jobs:
        job.wait()

这是并行运行的,但是输出的是:

foo
Process 0: in foo 0
foo
Process 1: in foo 0
foo
Process 2: in foo 0
foo
Process 3: in foo 0
Process 1: in foo 1
Process 0: in foo 1
Process 2: in foo 1
Process 3: in foo 1
Process 1: in foo 2
Process 0: in foo 2
Process 2: in foo 2
Process 3: in foo 2
Process 1: in foo 3
Process 0: in foo 3
Process 3: in foo 3
Process 2: in foo 3
Process 1: in foo 4
Process 0: in foo 4
Process 3: in foo 4
Process 2: in foo 4

我想要的是:

foo
Process 3: in foo 0
Process 3: in foo 1
Process 3: in foo 2
Process 3: in foo 3
Process 3: in foo 4
foo
Process 1: in foo 0
Process 1: in foo 1
Process 1: in foo 2
Process 1: in foo 3
Process 1: in foo 4
foo
Process 0: in foo 0
Process 0: in foo 1
Process 0: in foo 2
Process 0: in foo 3
Process 0: in foo 4
foo
Process 2: in foo 0
Process 2: in foo 1
Process 2: in foo 2
Process 2: in foo 3
Process 2: in foo 4

这两个进程的特定顺序无关紧要,只要每个输出针对每个子进程组合在一起即可。有趣的是,如果我这样做,我会得到我想要的输出

python test.py > output

我知道每个子进程都没有自己的标准输出,而是使用主标准输出。我已经考虑并查找了一些解决方案,例如使我们使用队列,并且每个子进程都有自己的标准输出,然后当它完成后,我们覆盖刷新命令,以便我们可以输出输出回来到队列。之后,我们就可以阅读内容了。但是,尽管这确实满足了我的要求,但如果函数中途停止,我将无法检索输出。只有在成功完成后才会输出。从这里得到它Access standard output of a sub process in python

我也看到了锁的使用,它有效,但它完全杀死了并行运行函数,因为它必须等待每个子进程执行函数 foo。

另外,如果可能的话,我想避免更改我的 foo 函数的实现,因为我有许多需要更改的函数。

编辑:我研究了库 dispy 和并行 python。 Dispy 完全符合我的要求,它有一个单独的 stdout/stderr,我可以在最后打印出来,但 dispy 的问题是我必须在单独的终端中手动运行服务器。我希望能够一次性运行我的 python 程序,而不必先打开另一个脚本。另一方面,并​​行 Python 也可以满足我的要求,但它似乎缺乏您对它的控制,以及一些令人讨厌的麻烦。特别是,当你打印输出时,它也会打印出函数的返回类型,我只想要我使用 print 打印出来的输出。另外,当运行一个函数时,你必须给它一个它使用的模块列表,这有点烦人,因为我不想为了运行一个简单的函数而有一个大的导入列表。

【问题讨论】:

    标签: python multiprocessing stdout


    【解决方案1】:

    正如您所注意到的,在这种情况下使用锁会杀死多处理,因为您实际上会让所有进程等待当前持有 STDOUT“权利”的进程释放互斥锁。但是,与您的函数/子进程并行运行并同步打印在逻辑上是排他性的。

    相反,您可以做的是让您的主进程充当子进程的“打印机”,这样一旦您的子进程完成/出错,那么它才会将要打印的内容发送回您的主进程。您似乎完全满足于打印不是“实时”(无论如何,如前所述),因此这种方法应该为您服务。所以:

    import multiprocessing as mp
    import random  # just to add some randomness
    from time import sleep
    
    def foo(x):
        output = ["[Process {}]: foo:".format(x)]
        for i in range(5):
            output.append('[Process {}] in foo {}'.format(x, i))
            sleep(0.2 + 1 * random.random())
        return "\n".join(output)
    
    if __name__ == '__main__':
        pool = mp.Pool(4)
        for res in pool.imap_unordered(foo, range(4)):
            print("[MAIN]: Process finished, response:")
            print(res)  # this will print as soon as one of the processes finishes/errors
        pool.close()
    

    这会给你(当然是 YMMV):

    [MAIN]: Process finished, response:
    [Process 2]: foo:
    [Process 2] in foo 0
    [Process 2] in foo 1
    [Process 2] in foo 2
    [Process 2] in foo 3
    [Process 2] in foo 4
    [MAIN]: Process finished, response:
    [Process 0]: foo:
    [Process 0] in foo 0
    [Process 0] in foo 1
    [Process 0] in foo 2
    [Process 0] in foo 3
    [Process 0] in foo 4
    [MAIN]: Process finished, response:
    [Process 1]: foo:
    [Process 1] in foo 0
    [Process 1] in foo 1
    [Process 1] in foo 2
    [Process 1] in foo 3
    [Process 1] in foo 4
    [MAIN]: Process finished, response:
    [Process 3]: foo:
    [Process 3] in foo 0
    [Process 3] in foo 1
    [Process 3] in foo 2
    [Process 3] in foo 3
    [Process 3] in foo 4
    

    你可以用同样的方式观察其他任何东西,包括错误。

    更新 - 如果您绝对必须使用无法控制其输出的函数,您可以包装您的子进程并捕获它们的 STDOUT/STDERR,然后在它们完成后(或引发异常)您可以将所有内容返回到进程“管理器”以打印到实际的 STDOUT。通过这样的设置,我们可以拥有foo() like:

    def foo(x):
        print("[Process {}]: foo:".format(x))
        for i in range(5):
            print('[Process {}] in foo {}'.format(x, i))
            sleep(0.2 + 1 * random.random())
            if random.random() < 0.0625:  # let's add a 1/4 chance to err:
                raise Exception("[Process {}] A random exception is random!".format(x))
        return random.random() * 100  # just a random response, you can omit it
    

    请注意,它完全没有意识到有什么东西试图扰乱它的操作模式。然后,我们将创建一个外部通用包装器(因此您不必根据函数对其进行更改)以实际 mess 使用其默认行为(不仅是这个函数,还包括其他所有内容它可能会在运行时调用):

    def std_wrapper(args):
        try:
            from StringIO import StringIO  # ... for Python 2.x compatibility
        except ImportError:
            from io import StringIO
        import sys
        sys.stdout, sys.stderr = StringIO(), StringIO()  # replace stdout/err with our buffers
        # args is a list packed as: [0] process function name; [1] args; [2] kwargs; lets unpack:
        process_name = args[0]
        process_args = args[1] if len(args) > 1 else []
        process_kwargs = args[2] if len(args) > 2 else {}
        # get our method from its name, assuming global namespace of the current module/script
        process = globals()[process_name]
        response = None  # in case a call fails
        try:
            response = process(*process_args, **process_kwargs)  # call our process function
        except Exception as e:  # too broad but good enough as an example
            print(e)
        # rewind our buffers:
        sys.stdout.seek(0)
        sys.stderr.seek(0)
        # return everything packed as STDOUT, STDERR, PROCESS_RESPONSE | NONE
        return sys.stdout.read(), sys.stderr.read(), response
    

    现在我们只需要调用这个包装器而不是所需的foo(),并为它提供代表我们调用什么的信息:

    if __name__ == '__main__':
        pool = mp.Pool(4)
        # since we're wrapping the process we're calling, we need to send to the wrapper packed
        # data with instructions on what to call on our behalf.
        # info on args packing available in the std_wrapper function above.
        for out, err, res in pool.imap_unordered(std_wrapper, [("foo", [i]) for i in range(4)]):
            print("[MAIN]: Process finished, response: {}, STDOUT:".format(res))
            print(out.rstrip())  # remove the trailing space for niceness, print err if you want
        pool.close()
    

    所以现在如果你运行它,你会得到这样的结果:

    [MAIN]: Process finished, response: None, STDOUT:
    [Process 2]: foo:
    [Process 2] in foo 0
    [Process 2] in foo 1
    [Process 2] A random exception is random!
    [MAIN]: Process finished, response: 87.9658471743586, STDOUT:
    [Process 1]: foo:
    [Process 1] in foo 0
    [Process 1] in foo 1
    [Process 1] in foo 2
    [Process 1] in foo 3
    [Process 1] in foo 4
    [MAIN]: Process finished, response: 38.929554421661194, STDOUT:
    [Process 3]: foo:
    [Process 3] in foo 0
    [Process 3] in foo 1
    [Process 3] in foo 2
    [Process 3] in foo 3
    [Process 3] in foo 4
    [MAIN]: Process finished, response: None, STDOUT:
    [Process 0]: foo:
    [Process 0] in foo 0
    [Process 0] in foo 1
    [Process 0] in foo 2
    [Process 0] in foo 3
    [Process 0] in foo 4
    [Process 0] A random exception is random!
    

    尽管foo() 只是打印出来或出错。当然,您可以使用这样的包装器来调用任何函数并传递任意数量的 args/kwargs 给它。

    更新 #2 - 等等!如果我们可以像这样包装我们的函数过程,并捕获它们的 STDOUT/STDERR,我们当然可以将它变成一个装饰器,并在我们的代码中使用它并进行简单的装饰。所以,对于我的最终建议:

    import functools
    import multiprocessing
    import random  # just to add some randomness
    import time
    
    def std_wrapper(func):
        @functools.wraps(func)  # we need this to unravel the target function name
        def caller(*args, **kwargs):  # and now for the wrapper, nothing new here
            try:
                from StringIO import StringIO  # ... for Python 2.x compatibility
            except ImportError:
                from io import StringIO
            import sys
            sys.stdout, sys.stderr = StringIO(), StringIO()  # use our buffers instead
            response = None  # in case a call fails
            try:
                response = func(*args, **kwargs)  # call our wrapped process function
            except Exception as e:  # too broad but good enough as an example
                print(e)  # NOTE: the exception is also printed to the captured STDOUT
            # rewind our buffers:
            sys.stdout.seek(0)
            sys.stderr.seek(0)
            # return everything packed as STDOUT, STDERR, PROCESS_RESPONSE | NONE
            return sys.stdout.read(), sys.stderr.read(), response
        return caller
    
    @std_wrapper  # decorate any function, it won't know you're siphoning its STDOUT/STDERR
    def foo(x):
        print("[Process {}]: foo:".format(x))
        for i in range(5):
            print('[Process {}] in foo {}'.format(x, i))
            time.sleep(0.2 + 1 * random.random())
            if random.random() < 0.0625:  # let's add a 1/4 chance to err:
                raise Exception("[Process {}] A random exception is random!".format(x))
        return random.random() * 100  # just a random response, you can omit it
    

    现在我们可以像以前一样调用包装函数,而无需处理参数包装或任何类似的事情,所以我们回到:

    if __name__ == '__main__':
        pool = multiprocessing.Pool(4)
        for out, err, res in pool.imap_unordered(foo, range(4)):
            print("[MAIN]: Process finished, response: {}, STDOUT:".format(res))
            print(out.rstrip())  # remove the trailing space for niceness, print err if you want
        pool.close()
    

    输出与上一个示例中的相同,但在一个更好且更易于管理的包中。

    【讨论】:

    • 这绝对有效,但是否可以避免更改 foo?特别是,我想继续使用打印。这可能是不可能的,所以我可能不得不接受它并改变我的实现。另外,如果子进程在运行函数的过程中失败了,是不是没有输出?据我所知,它只会在结束时输出日志。我希望能够打印出消息的积累,无论是成功运行功能还是中途未能完成。
    • @mepmerp - 检查更新后的代码...这有点不合时宜,但它可以完成您想要的工作。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-02-16
    • 2018-04-23
    • 2014-01-04
    • 1970-01-01
    • 1970-01-01
    • 2020-08-09
    • 2015-01-11
    相关资源
    最近更新 更多