【问题标题】:Break Main Calling Thread If Child Thread Throws An Exception如果子线程抛出异常,则中断主调用线程
【发布时间】:2017-10-05 10:09:06
【问题描述】:

我正在使用带有可调用列表的 threading.Thread 和 t.start() 来执行长时间运行的多线程处理。我的主线程被阻塞,直到所有线程都完成。但是,如果其中一个 Callables 抛出异常并终止其他线程,我希望 t.start() 立即返回。

使用 t.join() 来检查线程是否被执行提供了关于异常失败的信息。

代码如下:

import json
import requests


class ThreadServices:
    def __init__(self):
        self.obj = ""

    def execute_services(self, arg1, arg2):
        try:
            result = call_some_process(arg1, arg2) #some method
            #save results somewhere
        except Exception, e:
            # raise exception
            print e

    def invoke_services(self, stubs):
       """
       Thread Spanning Function        
       """
       try:
           p1 = ""  #some value
           p2 = ""  #some value
           # Call service 1
           t1 = threading.Thread(target=self.execute_services, args=(a, b,)

           # Start thread
           t1.start()
           # Block till thread completes execution
           t1.join()

           thread_pool = list()
           for stub in stubs:
               # Start parallel execution of threads
               t = threading.Thread(target=self.execute_services,
                                          args=(p1, p2))
               t.start()
               thread_pool.append(t)
            for thread in thread_pool:
                # Block till all the threads complete execution: Wait for all 
                the parallel tasks to complete
                thread.join()

            # Start another process thread
            t2 = threading.Thread(target=self.execute_services,
                                           args=(p1, p2)
            t2.start()
            # Block till this thread completes execution
            t2.join()

            requests.post(url, data= json.dumps({status_code=200}))
        except Exception, e:
            print e
            requests.post(url, data= json.dumps({status_code=500}))
        # Don't return anything as this function is invoked as a thread from 
        #  main calling function


class Service(ThreadServices):
    """
    Service Class
    """

    def main_thread(self, request, context):
        """
        Main Thread:Invokes Task Execution Sequence in ThreadedService        
        :param request: 
        :param context:
        :return: 
        """
        try:
            main_thread = threading.Thread(target=self.invoke_services,
                                       args=(request,))
            main_thread.start()
            return True
        except Exception, e:
            return False

当我调用 Service().main_thread(request, context) 并且执行 t1 时出现一些异常,我需要在 main_thread 中引发它并返回 False。我怎样才能为这个结构实现它。谢谢!!

【问题讨论】:

    标签: python multithreading exception-handling


    【解决方案1】:

    一方面,你把事情复杂化了。我会这样做:

    from thread import start_new_thread as thread
    from time import sleep
    
    class Task:
        """One thread per task.
        This you should do with subclassing threading.Thread().
        This is just conceptual example.
        """
        def __init__ (self, func, args=(), kwargs={}):
            self.func = func
            self.args = args
            self.kwargs = kwargs
            self.error = None
            self.done = 0
            self.result = None
    
        def _run (self):
            self.done = 0
            self.error = None
            self.result = None
            # So this is what you should do in subclassed Thread():
            try: self.result = self.func(*self.args, **self.kwargs)
            except Exception, e:
                self.error = e
            self.done = 1
    
        def start (self):
            thread(self._run,())
    
        def wait (self, retrexc=1):
            """Used in place of threading.Thread.join(), but it returns the result of the function self.func() and manages errors.."""
            while not self.done: sleep(0.001)
            if self.error:
                if retrexc: return self.error
                raise self.error
            return self.result
    
    # And this is how you should use your pool:
    def do_something (tasknr):
        print tasknr-20
        if tasknr%7==0: raise Exception, "Dummy exception!"
        return tasknr**120/82.0
    
    pool = []
    for task in xrange(20, 50):
        t = Task(do_something, (task,))
        pool.append(t)
    # And only then wait for each one:
    results = []
    for task in pool:
        results.append(task.wait())
    print results
    

    这样你可以让 task.wait() 引发错误。该线程将已经停止。因此,您需要做的就是在完成后从池或整个池中删除它们的引用。你甚至可以:

    results = []
    for task in pool:
        try: results.append(task.wait(0))
        except Exception, e:
            print task.args, "Error:", str(e)
    print results
    

    现在,不要严格使用这个(我的意思是 Task() 类),因为它需要添加很多东西才能真正使用。

    只需继承 threading.Thread() 并通过重写 run() 和 join() 或添加新函数如 wait() 来实现类似的概念。

    【讨论】:

    • 谢谢@Dalen。这种方法至少对我有用。一些解决方法,它开始了。
    猜你喜欢
    • 2012-12-25
    • 2021-01-16
    • 2017-03-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-01-06
    • 2021-05-01
    • 1970-01-01
    相关资源
    最近更新 更多