【问题标题】:Exit a multiprocessing script退出多处理脚本
【发布时间】:2017-01-08 00:16:03
【问题描述】:

当目标函数抛出错误时,我试图退出多处理脚本,但父进程并没有退出,而是挂起。

这是我用来复制问题的测试脚本:

#!/usr/bin/python3.5

import time, multiprocessing as mp

def myWait(wait, resultQueue):
    startedAt = time.strftime("%H:%M:%S", time.localtime())
    time.sleep(wait)
    endedAt = time.strftime("%H:%M:%S", time.localtime())
    name = mp.current_process().name
    resultQueue.put((name, wait, startedAt, endedAt))

# queue initialisation
resultQueue = mp.Queue()

# process creation arg: (process number, sleep time, queue)
proc =  [
    mp.Process(target=myWait, name = ' _One_', args=(2, resultQueue,)),
    mp.Process(target=myWait, name = ' _Two_', args=(2, resultQueue,))
    ]

# starting processes
for p in proc:
    p.start()

for p in proc:
    p.join()

# print results
results = {}
for p in proc:
    name, wait, startedAt, endedAt = resultQueue.get()
    print('Process %s started at %s wait %s ended at %s' % (name, startedAt, wait, endedAt))

这很完美,我可以看到父脚本在htop 中生成了两个子进程,但是当我想强制父脚本退出时,如果在myWait 目标函数中抛出错误,父进程就会挂起并且甚至不会产生任何子进程。我必须ctrl-c 才能杀死它。

def myWait(wait, resultQueue):
    try:
        # do something wrong
    except:
        raise SystemExit

我已经尝试了各种退出函数的方法(例如exit()sys.exit()os._exit()...),但无济于事。

【问题讨论】:

    标签: python multiprocessing python-3.5


    【解决方案1】:

    首先,您的代码存在一个主要问题:您试图在刷新队列内容(如果有)之前加入进程,这可能会导致死锁。请参阅此处标题为“加入使用队列的进程”的部分:https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming

    其次,对resultQueue.get() 的调用将被阻塞,直到它接收到一些数据,这永远不会发生 如果myWait 函数引发异常并且在此之前没有数据被推入队列。所以让它成为非阻塞的,让它在一个循环中检查任何数据,直到它最终收到一些东西或者有什么问题。

    这里有一个快速的'n'dirty修复给你的想法:

    #!/usr/bin/python3.5
    
    import multiprocessing as mp
    import queue
    import time
    
    def myWait(wait, resultQueue):
        raise Exception("error!")
    
    # queue initialisation
    resultQueue = mp.Queue()
    
    # process creation arg: (process number, sleep time, queue)
    proc =  [
        mp.Process(target=myWait, name = ' _One_', args=(2, resultQueue,)),
        mp.Process(target=myWait, name = ' _Two_', args=(2, resultQueue,))
        ]
    
    # starting processes
    for p in proc:
        p.start()
    
    # print results
    results = {}
    for p in proc:
        while True:
            if not p.is_alive():
                break
    
            try:
                name, wait, startedAt, endedAt = resultQueue.get(block=False)
                print('Process %s started at %s wait %s ended at %s'
                      % (name, startedAt, wait, endedAt))
                break
            except queue.Empty:
                pass
    
    for p in proc:
        p.join()
    

    函数myWait 会抛出异常,但两个进程仍会加入,程序会顺利退出。

    【讨论】:

    • get(block=False) 成功了。尽管您的 quick'n'dirty fix 会引发多处理库中的其他错误,但它肯定会出现。谢谢。
    • 说实话,我一直在使用 Python 3.4 运行这段代码 sn-p,但 3.5 中似乎没有任何会改变其行为的有影响力的变化。在我这边,除了raise Exception("error!") 行自愿抛出的错误之外,我没有收到任何错误。你的错误日志是什么样的?
    • File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap self.run() File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs)
    • 是的,正如我之前的评论中提到的那样,这是意料之中的......您只复制/粘贴了追溯中的前 2 个条目,但它随后继续到 raise Exception("error!") 行,这是错误的真正来源。我只是把这个例外放在这里,以证明即使你的代码出错了,正如你在问题中提到的那样,你的进程现在应该加入并且程序很好地退出。所以只要去掉这个异常,用myWait函数的实际内容替换即可。
    • 我已经编辑了我的答案以提及预期的行为。
    【解决方案2】:

    您应该使用multiprocessing.Pool 为您管理流程。然后使用Pool.imap_unordered 按完成的顺序迭代结果。一旦您遇到第一个异常,您就可以停止池及其子进程(当您退出 with Pool() as pool 块时,这会自动完成)。例如

    from multiprocessing import Pool
    import time
    
    def my_wait(args):
        name, wait = args
        if wait == 2:
            raise ValueError("error!")
        else:
            startedAt = time.strftime("%H:%M:%S", time.localtime())
            time.sleep(wait)
            endedAt = time.strftime("%H:%M:%S", time.localtime())
            return name, wait, startedAt, endedAt
    
    if __name__ == "__main__":
        try:
            with Pool() as pool:
                args = [["_One_", 2], ["_Two_", 3]]
                for name, wait, startedAt, endedAt in pool.imap_unordered(my_wait, args):     
                    print('Task %s started at %s wait %s ended at %s' % (name,
                        startedAt, wait, endedAt))
        except ValueError as e:
            print(e)
    

    此方法不适合长时间、低工作负载的任务,因为它只会并行运行与其管理的子进程数量一样多的任务(但这是您可以设置的)。如果您需要运行不同的功能,这也不是很好。

    【讨论】:

    • 啊,我没想到会在这里看到Pool,因为它解决了一系列不同的问题。尽管如此,还是感谢您抽出宝贵的时间!
    • 与我的尝试相比,看起来很有希望且简洁。至于真实的东西(这只是一个测试台),我希望只同时发送 2 个 http 请求并获得它们的结果。所以,短时间进程只有一个目标函数。你的方法似乎非常合适。谢谢。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-12-03
    • 1970-01-01
    • 2015-06-07
    • 1970-01-01
    • 2019-08-08
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多