【问题标题】:Correct Way To Handle Thread Joins in Python在 Python 中处理线程连接的正确方法
【发布时间】:2023-03-05 09:50:01
【问题描述】:

所以我编写了一个工具,它获取项目列表,将其拆分为给定数量的列表(比如说 10 个),然后取出这 10 个列表并产生 10 个线程,“EvaluationThreads”(扩展 threading.thread) ,并且这些线程中的每一个都会评估它们提供给评估的任何内容。当我启动每个线程时,我将它们全部放入一个列表中,在生成它们之后,我有以下代码:

for th in threadList:
    th.join()
    someTotal = th.resultsAttribute

这就是我如何等待所有线程完成并收集它们的信息。虽然这是一种等待一切完成然后收集结果的工作方式,但我觉得必须有一种更优雅的方式来做这件事,因为这些线程可以很好地在不同的时间完成,如果第一个开始的线程最后完成所有较早完成的必须等待该线程完成才能加入。有没有办法获取这些线程的信息并在它们完成时加入它们,而不是按照它们开始的顺序?我最初认为我会在线程中使用某种回调或其他东西,但我不确定是否有更可接受的解决方案。

感谢您的帮助。

编辑:澄清一下,我的评估函数不受 CPU 限制,我并没有尝试在线程之间分配文档以尽快完成,每个线程都有固定的大约偶数个作业。

【问题讨论】:

  • 为什么您的问题有问题?一个已完成但尚未加入的线程浪费了很少的资源(基本上,在内核或用户空间某处由操作系统维护的表中的一个小条目)。
  • 我想这不完全是一个问题,但它似乎是一个非常不优雅的解决方案,如果空闲线程不是等待加入的问题,我想我不会担心它。
  • 附注:如果您的“评估”操作受 CPU 限制,那么在此应用程序中使用线程可能不会给您带来太多好处。了解 CPython 的全局解释器锁 (GIL)。
  • @NedBatchelder:对此+1。如果数据块复制起来很便宜(或者可以由线程本身生成而不是传入),那么使用多处理而不是线程可能会更好。这也意味着默认情况下数据是不共享的(所以如果你搞砸了,它既更明显也更不那么灾难性)。并且 multiprocessing 有一个内置的 Pool 类,有很好的方法来简化最常见的习惯用法。
  • 他们甚至不是空闲线程;它们是完整的线程。尽快加入他们很好,但这意味着您需要某种方式来知道需要先加入哪个。 (或者你需要一个与 Win32 的 WaitForMultipleObjects 等效的跨平台,而你没有……尽管一个像样的线程组/池库可能会使用每个平台上可用的最佳实现。)

标签: python multithreading join spawn


【解决方案1】:

对于您的主要问题:

如果你正在做比这更复杂的事情——或者,特别是,如果你重复这样做——你可能需要一个“线程组”类。有几十个是预制的,但如果你不喜欢其中任何一个,自己写一个就很简单了。

然后,而不是这个:

threadList = []
for argchunk in splitIntoChunks(values, 10):
  threadList.append(threading.Thread(target=myThreadFunc, args=argchunk))
...
someTotal = 0
for th in threadList:
  th.join()
  someTotal += th.resultsAttribute

你可以这样做:

threadGroup = ThreadGroup.ThreadGroup()
for argchunk in splitIntoChunks(values, 10):
  threadGroup.newThread(myThreadFunc, argchunk)
threadGroup.join()
someTotal = sum(th.resultsAttribute for th in threadGroup)

或者,也许更好,一个完整的线程池库,所以你可以这样做:

pool = ThreadPool(10)
for argchunk in splitIntoChunks(values, 100):
  pool.putRequest(myThreadFunc, argchunk)
pool.wait()

这里的优点是您可以轻松地在 10 个线程上适当地安排 100 个作业,而不是每个线程一个 10 个作业,而无需维护队列等所有工作。缺点是您不能只是迭代线程来获取返回值,你必须迭代作业——理想情况下,你不想让作业一直存活到最后,这样你就可以迭代它们。

这将我们带到您的第二个问题,即如何从线程(或作业)中获取值。有很多很多方法可以做到这一点。

你所做的工作。你甚至不需要任何锁定。

按照您的建议,使用回调也可以。但请记住,回调将在工作线程上运行,而不是在主线程上运行,因此如果它正在访问某个全局​​对象,您将需要某种同步。

如果您仍然要进行同步,则回调可能没有任何好处。例如,如果您只想对一堆值求和,您可以设置total=[0],并让每个线程在锁内执行total[0] += myValue。 (当然,在这种情况下,只在主线程中进行求和并避免锁定可能更有意义,但如果合并结果的工作量更大,那么选择可能就不那么简单了。)

您也可以使用某种原子对象,而不是显式锁定。例如,标准的 Queue.Queue 和 collections.deque 都是原子的,所以每个线程只需设置q = Queue.Queue(),然后每个线程通过执行q.push(myValue) 推送其结果,然后加入后您只需迭代并总结队列的值.

事实上,如果每个线程只向队列推送一次,您只需在队列本身上执行 10 次阻塞获取,之后您就知道 group.join()pool.wait() 或其他任何会很快返回。

或者您甚至可以将回调作为作业推送到队列中。同样,您可以在队列上执行 10 次阻塞获取,每次都执行结果。

如果每个线程可以返回多个对象,他们可以在完成后将一个标记值或回调推送到队列中,并且您的主线程会不断弹出,直到它读取 10 个标记。

【讨论】:

    【解决方案2】:

    一旦信息可用,就使用队列将线程中的信息推送出去:

    假设这是您的主题:

    class myThread(threading.Thread):
       def __init__(self, results_queue):
           self.results_queue = results_queue
           #other init code here
    
    
       def run(self):
           #thread code here
    
           self.results_queue.put(result) #result is the information you want from the thread
    

    这是你的主要代码:

    import Queue #or "import queue" in Python 3.x
    results_queue = Queue()
    
    #thread init code here
    
    for i in xrange(num_threads_running):
        data = results_queue.get() # queue.get() blocks until some item is available
        #process data as it is made available
    
    #at this point, there is no need to .join(), since all the threads terminate as soon as they put data to the queue.
    

    【讨论】:

    • 我相信只有当你显式加入线程或者线程对象被销毁时,底层系统资源才会被释放,这意味着需要调用join(),除非你要退出,或者你有一个 with 块或等效块来管理 Thread 对象。
    • @abarnert:你有链接吗?我在文档中找不到任何关于此的内容。它在Thread.join() 下所说的只是它“阻塞调用线程,直到调用 join 方法的线程终止。”
    • @abarnert:来自文档,“其他线程可以调用线程的 join() 方法。这会阻塞调用线程,直到调用 join() 方法的线程终止。”这意味着线程终止独立于被调用的join() 方法发生。
    • 可以,但是线程终止仅仅意味着线程不再被调度,而不是底层资源已经被回收。使Thread.join() 执行本机pthread_join 或等效项(正如Jython 似乎所做的那样),或者根据需要自动加入线程并为Thread.join() 提供一些其他方式阻止(正如 POSIX 系统上的 CPython 2.7.3 似乎所做的那样)。
    • @abarnert:我在Lib\threading.py 下的Thread.join()Thread._delete() (在Python 3 中)svn.python.org/projects/python/branches/py3k/Lib/threading.py 下看不到任何东西
    猜你喜欢
    • 2012-04-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-09-26
    • 1970-01-01
    • 2012-03-17
    • 2011-10-01
    • 1970-01-01
    相关资源
    最近更新 更多