【问题标题】:How to immediately re-raise an exception thrown in any of the worker threads?如何立即重新引发任何工作线程中抛出的异常?
【发布时间】:2017-01-19 06:35:35
【问题描述】:

我正在使用ThreadPoolExecutor 并且需要中止整个计算以防任何工作线程失败。

示例 1。 无论错误如何,都会打印 Success,因为 ThreadPoolExecutor 不会自动重新引发异常。

from concurrent.futures import ThreadPoolExecutor

def task():
    raise ValueError

with ThreadPoolExecutor() as executor:
    executor.submit(task)
print('Success')

示例 2. 这正确地使主线程崩溃,因为 .result() 重新引发异常。但它会等待第一个任务完成,因此主线程会延迟遇到异常。

import time
from concurrent.futures import ThreadPoolExecutor

def task(should_raise):
    time.sleep(1)
    if should_raise:
        raise ValueError

with ThreadPoolExecutor() as executor:
    executor.submit(task, False).result()
    executor.submit(task, True).result()
print('Success')

如何在主线程发生异常后立即(或多或少)注意到它,以处理故障并中止剩余的工作线程?

【问题讨论】:

    标签: python multithreading python-3.x exception threadpoolexecutor


    【解决方案1】:

    首先,我们必须在请求结果之前提交任务。否则,线程甚至不会并行运行:

    futures = []
    with ThreadPoolExecutor() as executor:
        futures.append(executor.submit(good_task))
        futures.append(executor.submit(bad_task))
    for future in futures:
        future.result()
    

    现在我们可以将异常信息存储在一个对主线程和工作线程都可用的变量中:

    exc_info = None
    

    主线程并不能真正杀死它的子进程,所以我们让工作人员检查要设置的异常信息并停止:

    def good_task():
        global exc_info
        while not exc_info:
            time.sleep(0.1)
    
    def bad_task():
        global exc_info
        time.sleep(0.2)
        try:
            raise ValueError()
        except Exception:
            exc_info = sys.exc_info()
    

    所有线程终止后,主线程可以检查保存异常信息的变量。如果已填充,我们会重新引发异常:

    if exc_info:
        raise exc_info[0].with_traceback(exc_info[1], exc_info[2])
    print('Success')
    

    【讨论】:

      【解决方案2】:

      我想,我会这样实现它:

      我是主进程,我创建了2个队列:

      1. 一个报告异常,
      2. 通知取消。

      ::

      import multiprocessing as mp
      
      error_queue = mp.Queue()
      cancel_queue = mp.Queue()
      

      我创建每个ThreadPoolExecutor,并将这些队列作为参数传递。

      class MyExecutor(concurrent.futures.ThreadPoolExecutor):
          def __init__(self, error_queue, cancel_queue):
              self.error_queue : error_queue
              self.cancel_queue = cancel_queue
      

      每个ThreadPoolExecutor 都有一个主循环。在这个循环中,我首先扫描cancel_queue,看看是否有“取消”消息可用。

      在主循环中,我还实现了一个异常管理器。如果发生错误,我会引发异常:

      self.status = "running"
      with True:  # <- or something else
          if not self.cancel_queue.empty():
              self.status = "cancelled"
              break
          try:
              # normal processing
              ...
          except Exception as exc:
              # you can log the exception here for debug
              self.error_queue.put(exc)
              self.status = "error"
              break
          time.sleep(.1)
      

      在主进程中:

      运行所有MyExecutor 实例。

      扫描error_queue:

      while True:
          if not error_queue.empty():
              cancel_queue.put("cancel")
          time.sleep(.1)
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2016-10-11
        • 1970-01-01
        • 1970-01-01
        • 2012-06-29
        • 2014-04-03
        • 2020-06-23
        • 1970-01-01
        相关资源
        最近更新 更多