【问题标题】:Python Multiprocessing Early TerminationPython 多处理提前终止
【发布时间】:2014-09-11 14:35:10
【问题描述】:

在我的脚本运行时,可能会在某些时候发生错误。在这种情况下,应正确终止所有进程,返回错误消息,并退出脚本。

我现在的代码似乎还不能满足这些要求。发生错误时,将其发送到report_error(),脚本最终挂在终端中,活动监视器显示许多 Python 进程仍在运行。

环境

  • Mac OS X 10.8.5
  • Python 3.3.3

从脚本中的任何一点终止所有进程的正确方法是什么?

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


import sys
from multiprocessing import Pool


# Global variables.

input_files = [
    'test_data_0.csv',
    'test_data_1.csv'
]


def report_error(error):

    # Reports errors then exits script.
    print("Error: {0}".format(error), file=sys.stderr)
    sys.exit(1)

    # What I really want is to report the error, properly terminate all processes,
    # and then exit the script.


def read_file(file):

    try:
        # Read file into list.
    except Exception as error:
        report_error(error)


def check_file(file):

    # Do some error checking on file.
    if error:
        report_error(error)


def job(file):

    # Executed on each item in input_files.

    check_file(file)
    read_file(file)


def main():

    # Sets up a process pool. Defaults to number of cores.
    # Each input gets passed to job and processed in a separate process.
    p = Pool()
    p.map(job, input_files)

    # Closing and joining a pool is important to ensure all resources are freed properly.
    p.close()
    p.join()


if __name__ == '__main__':
    main()

【问题讨论】:

  • 澄清一下,您希望其中一名工作人员发生错误导致所有其他工作人员和父进程终止?
  • 另外,你关心job返回什么值吗?如果是这样,您是否关心返回结果的顺序

标签: python python-3.x multiprocessing


【解决方案1】:

首先,使用sys.exit() 杀死子工作进程实际上会破坏池,并使map 命令永远挂起。目前multiprocessing 无法在工作进程处理作业时从工作进程崩溃中正确恢复(有一个错误报告,其中包含解决此问题的补丁here,值得一提)。

有几种方法可以让你真正做你想做的事。由于您似乎不关心从工作函数返回的值,因此最简单的方法是使用imap_unordered 而不是map,在出现故障时从工作人员那里引发异常,然后简单地迭代imap_unordered返回的迭代器:

def report_error(error):

    # Reports errors then exits script.
    print("Error: {0}".format(error), file=sys.stderr)
    raise error # Raise the exception

...

def main():
    p = Pool()
    try:
        list(p.imap_unordered(job, input_files))
    except Exception:
        print("a worker failed, aborting...")
        p.close()
        p.terminate()
    else:
        p.close()
        p.join()

if __name__ == '__main__':
    main()

使用imap_unordered,结果将在子节点发送后立即返回给父节点。因此,如果子进程将异常发送回父进程,它将立即在父进程中重新引发。我们捕获该异常,打印一条消息,然后终止池。

【讨论】:

  • 这里迟到的问题。该解决方案有效,但我不明白。为什么孩子必须明确提出异常?如果孩子失败了,它不会自然引发异常吗?
  • @Student 如果子函数因抛出异常而“失败”,则不需要额外的 raise。 OPs 子方法似乎对错误进行了某种报告,暗示它正在捕获它们。在这种情况下,需要显式提升,因此异常将其返回给父级。
  • 如果我在池运行时有一个 KeyboardInterupt,是否会导致孩子引发一个 KeyboardInterupt?如果确实如此,那么我不需要让孩子明确提出异常。我看到的是,当我 KeyboardInterupt 池时,程序只是挂起。但是,如果我使用 except 在子项中捕获 KeyboardInterupt,然后在 except 中引发,然后在父项中使用 except Exception 关闭()并像您所做的那样终止,然后如果我在池运行时按 control-C程序按照我的意愿停止运行。
  • a) 感谢您对此的回答! b) 我必须做大量令人不安的原型设计和挖掘工作才能使这项工作适用于我认为最流行的用例
  • @GeorgeMauer 池中的进程直到您调用 close() 之后才会终止,并且其中运行的任何未完成的工作项完成(或者,如果您改为调用 terminate(),则进程退出无需等待出色的工作完成)。当然,它们也会在主进程退出时终止,所以在这个例子中它实际上并不是必需的,因为不会有任何我们想要等待的未完成的工作。我只是将其包括在内,因为一般来说,在您完成它们后干净地关闭它们是最佳做法。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-11-10
相关资源
最近更新 更多