【问题标题】:Python multi processing . Handle exception in parent process and make all children die gracefullyPython 多处理。处理父进程异常,让所有子进程优雅地死去
【发布时间】:2015-01-11 07:43:11
【问题描述】:

我有以下代码。

这使用了一个名为 decorator 的 python 模块。

from multiprocessing import Pool
from random import randint
import traceback
import decorator
import time


def test_retry(number_of_retry_attempts=1, **kwargs):
    timeout = kwargs.get('timeout', 2.0) # seconds
    @decorator.decorator
    def tryIt(func, *fargs, **fkwargs):
        for _ in xrange(number_of_retry_attempts):
            try: return func(*fargs, **fkwargs)
            except:
                tb = traceback.format_exc()
                if timeout is not None:
                    time.sleep(timeout)
                print 'Catching exception %s. Attempting retry: '%(tb)

        raise
    return tryIt

装饰器模块帮助我装饰我的数据仓库调用函数。所以我不需要处理连接丢失和各种基于连接的问题,并允许我重置连接并在超时后重试。我用这种方法装饰了我所有的数据仓库读取功能,所以我可以免费重试。

我有以下方法。

def process_generator(data):
    #Process the generated data


def generator():
    data = data_warhouse_fetch_method()#This is the actual method which needs retry
    yield data

@test_retry(number_of_retry_attempts=2,timeout=1.0)
def data_warhouse_fetch_method():
    #Fetch the data from data-warehouse
    pass

我尝试使用这样的多处理模块对我的代码进行多处理。

try:
    pool = Pool(processes=2)
    result = pool.imap_unordered(process_generator,generator())
except Exception as exception:
    print 'Do some post processing stuff'
    tb = traceback.format_exc()
    print tb 

一切顺利时一切正常。当它在重试次数内自行修复时,情况也很正常。但是一旦重试次数超过我会在 test_retry 方法中引发异常,该异常不会在主进程中被捕获。进程死亡,主进程分叉的进程被保留为孤儿。可能是我在这里做错了什么。我正在寻找一些帮助来解决以下问题。将异常传播到父进程,以便我可以处理异常并使我的孩子优雅地死去。另外我想知道如何通知子进程优雅地死去。在此先感谢您的帮助 。

编辑:添加了更多代码来解释。

def test_retry(number_of_retry_attempts=1, **kwargs):
    timeout = kwargs.get('timeout', 2.0) # seconds
    @decorator.decorator
    def tryIt(func, *fargs, **fkwargs):
        for _ in xrange(number_of_retry_attempts):
            try: return func(*fargs, **fkwargs)
            except:
                tb = traceback.format_exc()
                if timeout is not None:
                    time.sleep(timeout)
                print 'Catching exception %s. Attempting retry: '%(tb)
        raise
    return tryIt

@test_retry(number_of_retry_attempts=2,timeout=1.0)
def bad_method():
    sample_list =[]
    return sample_list[0] #This will result in an exception


def process_generator(number):
    if isinstance(number,int):
        return number+1
    else:
        raise

def generator():
    for i in range(20):
        if i%10 == 0 :
         yield bad_method()
        else:
            yield i

try:
    pool = Pool(processes=2)
    result = pool.imap_unordered(process_generator,generator())
    pool.close()
    #pool.join()
    for r in result:
        print r
except Exception, e: #Hoping the generator will catch the exception. But not .
    print 'got exception: %r, terminating the pool' % (e,)
    pool.terminate()
    print 'pool is terminated'
finally:
    print 'joining pool processes'
    pool.join()
    print 'join complete'
print 'the end'

实际问题归结为如果生成器抛出异常,我无法在包含 pool.imap_unordered() 方法的 except 子句中捕获生成器抛出的异常。所以在抛出异常后,主进程被卡住,子进程永远等待。不确定我在这里做错了什么。

【问题讨论】:

  • 当“进程死亡”时会是什么样子。有什么例外吗?它会结冰吗?您是否尝试过返回 None 而不是异常?
  • 它确实抛出了原始异常,并且父进程死亡,子进程被分叉为孤儿。
  • 如果添加finally: pool.join()会怎样
  • 添加 finally:pool.join 后与之前相同。父进程死亡并将子进程作为孤儿。唯一不同的是我有以下回溯。Traceback (most recent call last): File "test_exception.py", line 48, in <module> pool.join() File "/Users/senthilsrinivasan/.localpython/lib/python2.7/multiprocessing/pool.py", line 456, in join assert self._state in (CLOSE, TERMINATE)
  • 那您需要先致电pool.close()。见this example

标签: python python-decorators python-multiprocessing


【解决方案1】:

由于我不是专家,因此我不完全理解此处共享的代码。此外,这个问题已经将近一年了。但我有与主题中解释的相同要求。我设法找到了解决方案:

import multiprocessing
import time


def dummy(flag):
    try:
        if flag:
            print('Sleeping for 2 secs')
            time.sleep(2)  # So that it can be terminated
        else:
            raise Exception('Exception from ', flag) # To simulate termination
        return flag  # To check that the sleeping thread never returns this
    except Exception as e:
        print('Exception inside dummy', e)
        raise e
    finally:
        print('Entered finally', flag)


if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    args_list = [(1,), (0,)]
    # call dummy for each tuple inside args_list. 
    # Use error_callback to terminate the pool
    results = pool.starmap_async(dummy, args_list, 
                                error_callback=lambda e, mp_pool=pool: mp_pool.terminate())
    pool.close()
    pool.join()
    try:
        # Try to see the results.
        # If there was an exception in any process, results.get() throws exception
        for result in results.get():
            # Never executed cause of the exception
            print('Printing result ', result)  
    except Exception as e:
        print('Exception inside main', e)

    print('Reached the end')

这会产生以下输出:

Sleeping for 2 secs
Exception inside dummy ('Exception from ', 0)
Entered finally 0
Exception inside main ('Exception from ', 0)
Reached the end

这几乎是我第一次回答问题,所以如果我违反了任何规则或犯了任何错误,我提前道歉。

我曾尝试执行以下操作但没有成功:

  1. 使用 apply_async。但这只是在抛出异常后挂起主进程
  2. 尝试使用 error_callback 中的 pid 杀死进程和子进程
  3. 使用 multiprocessing.event 跟踪异常并在每个步骤之后在所有进程中检查相同情况,然后再继续。这不是一个好方法,但也不起作用:“条件对象只能通过继承在进程之间共享”

老实说,如果其中一个进程抛出异常,我真希望终止同一个池中的所有进程并不难。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-10-15
    • 1970-01-01
    • 1970-01-01
    • 2014-07-10
    • 2018-04-24
    相关资源
    最近更新 更多