【问题标题】:Force Thread to timeout with ThreadPoolExecutor使用 ThreadPoolExecutor 强制线程超时
【发布时间】:2015-10-04 17:45:25
【问题描述】:

我正在升级我的代码以使用 ThreadPoolExecuter,并希望能够使处理时间超过几秒钟的任何线程超时。是否可以对属于线程池的线程强制超时?我正在使用的代码如下。

    with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
        future_tasks = {executor.submit(self.crawl_task, url): url for url in self.results.keys()}

        for future in concurrent.futures.as_completed(future_tasks):
            url = future_tasks[future]
            try:
                result = future.result()
                self.results[result[0]] = result[1]
            except Exception as e:
                print('%r generated an exception: %s' % (url, e))

我能够使线程超时的唯一方法是更改​​

for future in concurrent.futures.as_completed(future_tasks):

for future in concurrent.futures.as_completed(future_tasks, timeout=1):

但是,这会破坏整个循环,我将无法知道哪个线程超时以及哪些数据导致了超时。

Traceback (most recent call last):
  File "test.py", line 75, in <module>
    request = Requests(data)
  File "test.py", line 22, in __init__
    for future in concurrent.futures.as_completed(future_tasks, timeout=1):
  File "/source/homebrew/Cellar/python3/3.4.0_1/Frameworks/Python.framework/Versions/3.4/    lib/python3.4/concurrent/futures/_base.py", line 213, in as_completed
    len(pending), len(fs)))
concurrent.futures._base.TimeoutError: 17 (of 17) futures unfinished

【问题讨论】:

    标签: python multithreading timeout threadpool


    【解决方案1】:

    将整个 for 循环包装在一个异常中仍然允许其他线程结果处理。使用两个单独的字典,您可以查看哪些线程因超时而停止。

    with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
        future_tasks = {executor.submit(self.crawl_task, url): url for url in self.requests.keys()}
    
        try:
            for future in concurrent.futures.as_completed(future_tasks, timeout=10):
                result = future.result()
                self.responses[result[0]] = result[1]
        except Exception as e:
            print(e)
    
    timeout = [url for url in self.requests.keys() if url not in self.responses.keys()]
    
    print('URL Threads timed out: ', timeout)
    

    我必须指出,这违背了传统观念。通常,如果您将整个 for 循环包装在异常中,则循环中异常之后的任何内容都不应处理,但期货的魔力似乎允许循环中的所有内容(超时的线程除外)进行处理。

    【讨论】:

    • 感谢这个答案解决了我的问题。但是,我仍然对这个魔法有点困惑,并且在文档中没有找到任何关于它的内容。
    【解决方案2】:

    执行此操作的一种方法是在 self.crawl_task 中执行开始时将 url 记录到文件中。在线程任务完成之前,它可以附加一个字符串“DONE”,也许还有一个时间戳。

    另外,您需要处理 TimeoutError 异常以不让执行中断。如果超时,您可以查看其中没有“DONE”字符串的文件日志。

    【讨论】:

    • 这将导致整个循环中断,并会丢失来自其他几个未超时的线程的数据。分析日志文件并不是管理线程状态的理想方法。
    • 您可以在 try 块中执行诸如嵌入“for future in concurrent.futures.as_completed(future_tasks):”之类的操作并忽略异常。您可以通过简单地标记此条件以供您查看日志来忽略。
    • 这会导致所有线程超时。如何在不超时整个线程池的情况下使单个线程超时?
    • result() 中似乎有超时支持。例如结果(超时=无)
    • 我看到了另一篇发布并尝试过的 stackoverflow 帖子。它实际上不会导致超时,也找不到任何有关它的文档。
    猜你喜欢
    • 2020-07-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-02-19
    • 1970-01-01
    • 2018-01-31
    • 1970-01-01
    • 2011-07-27
    相关资源
    最近更新 更多