【问题标题】:python multithreading wait till all threads finishedpython多线程等待所有线程完成
【发布时间】:2022-05-10 09:20:07
【问题描述】:

这可能是在类似的情况下被问到的,但我在搜索大约 20 分钟后找不到答案,所以我会问。

我写了一个 Python 脚本(比如说:scriptA.py)和一个脚本(比如说 scriptB.py)

在 scriptB 中,我想用不同的参数多次调用 scriptA,每次运行大约需要一个小时,(它是一个巨大的脚本,做了很多事情......别担心),我希望能够同时使用所有不同的参数运行 scriptA,但我需要等到所有参数都完成后再继续;我的代码:

import subprocess

#setup
do_setup()

#run scriptA
subprocess.call(scriptA + argumentsA)
subprocess.call(scriptA + argumentsB)
subprocess.call(scriptA + argumentsC)

#finish
do_finish()

我想同时运行所有subprocess.call(),然后等到它们都完成,我该怎么做?

我尝试使用像示例here 一样的线程:

from threading import Thread
import subprocess

def call_script(args)
    subprocess.call(args)

#run scriptA   
t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))
t1.start()
t2.start()
t3.start()

但我认为这是不对的。

在转到我的do_finish() 之前,我怎么知道他们都跑完了?

【问题讨论】:

    标签: python multithreading


    【解决方案1】:

    将线程放在一个列表中,然后使用Join method

     threads = []
    
     t = Thread(...)
     threads.append(t)
    
     ...repeat as often as necessary...
    
     # Start all threads
     for x in threads:
         x.start()
    
     # Wait for all of them to finish
     for x in threads:
         x.join()
    

    【讨论】:

    • 是的,这可行,但更难理解。您应该始终尝试在紧凑的代码和“可读性”之间找到平衡。记住:代码写一次,读多次。所以更重要的是易于理解。
    • “工厂模式”不是我能用一句话解释的。谷歌搜索它并搜索 stackoverflow.com。有很多例子和解释。简而言之:你编写的代码为你构建了一些复杂的东西。就像一个真正的工厂:您下订单并取回成品。
    • 我不喜欢使用列表推导来解决它的副作用并且不对结果列表做任何有用的事情。一个简单的 for 循环会更干净,即使它分布两行...
    • @Aaron DIgull 我明白这一点。我的意思是我只会做一个for x in threads: x.join() 而不是使用列表理解
    • @IoanAlexandruCucu:我还在想是否有更易读、更高效的解决方案:stackoverflow.com/questions/21428602/…
    【解决方案2】:

    您需要在脚本末尾使用Thread 对象的join 方法。

    t1 = Thread(target=call_script, args=(scriptA + argumentsA))
    t2 = Thread(target=call_script, args=(scriptA + argumentsB))
    t3 = Thread(target=call_script, args=(scriptA + argumentsC))
    
    t1.start()
    t2.start()
    t3.start()
    
    t1.join()
    t2.join()
    t3.join()
    

    因此主线程将等待t1t2t3 完成执行。

    【讨论】:

    • hmmm - 无法理解某些东西,这不是先运行 t1,等到它完成,然后再转到 t2..etc 等吗?如何让这一切同时发生?我看不出这将如何同时运行它们?
    • join 的调用会阻塞,直到线程完成执行。无论如何,您将不得不等待所有线程。如果t1 先完成,您将开始等待t2(可能已经完成,您将立即继续等待t3)。如果t1 执行时间最长,则当您从它返回时,t1t2 将立即返回而不会阻塞。
    • 你不明白我的问题——如果我将上面的代码复制到我的代码中——它会起作用吗?还是我错过了什么?
    • 好的,我明白了。现在我明白了,对此有点困惑,但我想我明白了,join 有点将当前进程附加到线程并等待它完成,如果 t2 在 t1 之前完成,那么当 t1 完成时它将检查 t2完成后看到它是,然后检查 t3..etc..etc.. 然后只有当一切都完成后它才会继续。太棒了。
    • 说 t1 耗时最长,但 t2 有一个例外。那会发生什么?你能捕捉到那个异常或检查 t2 是否完成了吗?
    【解决方案3】:

    在 Python3 中,从 Python 3.2 开始有一种新的方法可以达到相同的结果,我个人更喜欢传统的线程创建/启动/加入,包concurrent.futures:https://docs.python.org/3/library/concurrent.futures.html

    使用ThreadPoolExecutor 代码将是:

    from concurrent.futures.thread import ThreadPoolExecutor
    import time
    
    def call_script(ordinal, arg):
        print('Thread', ordinal, 'argument:', arg)
        time.sleep(2)
        print('Thread', ordinal, 'Finished')
    
    args = ['argumentsA', 'argumentsB', 'argumentsC']
    
    with ThreadPoolExecutor(max_workers=2) as executor:
        ordinal = 1
        for arg in args:
            executor.submit(call_script, ordinal, arg)
            ordinal += 1
    print('All tasks has been finished')
    

    前面代码的输出是这样的:

    Thread 1 argument: argumentsA
    Thread 2 argument: argumentsB
    Thread 1 Finished
    Thread 2 Finished
    Thread 3 argument: argumentsC
    Thread 3 Finished
    All tasks has been finished
    

    其中一个优点是您可以控制设置最大并发工作人员的吞吐量。

    【讨论】:

    • 但是如何判断线程池中的所有线程何时完成?
    • 如你所见,with 语句后面的代码在所有任务完成后执行。
    • 这不起作用。尝试在线程中做一些很长的事情。您的打印语句将在线程完成之前执行
    • @Pranalee,该代码有效,我已更新代码以添加输出行。在所有线程完成之前,您看不到“所有任务...”,这就是 with 语句在这种情况下的设计工作方式。无论如何,您总是可以在 SO 中打开一个新问题并发布您的代码,以便我们帮助您了解您的情况。
    • @PrimeByDesign 你可以使用concurrent.futures.wait函数,你可以看到一个real example here官方文档:docs.python.org/3/library/…
    【解决方案4】:

    我更喜欢使用基于输入列表的列表推导:

    inputs = [scriptA + argumentsA, scriptA + argumentsB, ...]
    threads = [Thread(target=call_script, args=(i)) for i in inputs]
    [t.start() for t in threads]
    [t.join() for t in threads]
    

    【讨论】:

    • 检查过的答案解释得很好,但这个更短,不需要难看的重复。只是一个不错的答案。 :)
    • 仅针对副作用的列表理解通常会被贬低*。但在这个用例中,这似乎是一个好主意。 *stackoverflow.com/questions/5753597/…
    • @VinayakKaniyarakkal for t in threads:t.start() 不是更好吗?
    【解决方案5】:

    您可以拥有类似下面的类,您可以从中添加“n”个您想要并行执行的函数或控制台脚本,然后开始执行并等待所有作业完成..

    from multiprocessing import Process
    
    class ProcessParallel(object):
        """
        To Process the  functions parallely
    
        """    
        def __init__(self, *jobs):
            """
            """
            self.jobs = jobs
            self.processes = []
    
        def fork_processes(self):
            """
            Creates the process objects for given function deligates
            """
            for job in self.jobs:
                proc  = Process(target=job)
                self.processes.append(proc)
    
        def start_all(self):
            """
            Starts the functions process all together.
            """
            for proc in self.processes:
                proc.start()
    
        def join_all(self):
            """
            Waits untill all the functions executed.
            """
            for proc in self.processes:
                proc.join()
    
    
    def two_sum(a=2, b=2):
        return a + b
    
    def multiply(a=2, b=2):
        return a * b
    
    
    #How to run:
    if __name__ == '__main__':
        #note: two_sum, multiply can be replace with any python console scripts which
        #you wanted to run parallel..
        procs =  ProcessParallel(two_sum, multiply)
        #Add all the process in list
        procs.fork_processes()
        #starts  process execution 
        procs.start_all()
        #wait until all the process got executed
        procs.join_all()
    

    【讨论】:

    【解决方案6】:

    我刚刚遇到了同样的问题,我需要等待使用 for 循环创建的所有线程。我刚刚尝试了以下代码。它可能不是完美的解决方案,但我认为它会是一个简单的测试解决方案:

    for t in threading.enumerate():
        try:
            t.join()
        except RuntimeError as err:
            if 'cannot join current thread' in err:
                continue
            else:
                raise
    

    【讨论】:

      【解决方案7】:

      来自threadingmodule documentation

      有一个“主线程”对象;这对应于初始 Python 程序中的控制线程。它不是守护线程。

      可能会创建“虚拟线程对象”。 这些是对应于“外来线程”的线程对象,它们是 控制线程在线程模块之外启动,例如 直接来自C代码。虚拟线程对象的功能有限; 他们总是被认为是活着的和守护进程的,不能是join()ed。 它们永远不会被删除,因为无法检测到 终止外来线程。

      所以,当您对保留您创建的线程列表不感兴趣时​​,要捕获这两种情况:

      import threading as thrd
      
      
      def alter_data(data, index):
          data[index] *= 2
      
      
      data = [0, 2, 6, 20]
      
      for i, value in enumerate(data):
          thrd.Thread(target=alter_data, args=[data, i]).start()
      
      for thread in thrd.enumerate():
          if thread.daemon:
              continue
          try:
              thread.join()
          except RuntimeError as err:
              if 'cannot join current thread' in err.args[0]:
                  # catchs main thread
                  continue
              else:
                  raise
      

      于是:

      >>> print(data)
      [0, 4, 12, 40]
      

      【讨论】:

        【解决方案8】:

        也许是这样的

        for t in threading.enumerate():
            if t.daemon:
                t.join()
        

        【讨论】:

        • 我已经尝试过这段代码,但不确定它的工作原理,因为我的代码的最后一条指令是在这个 for 循环之后打印的,但进程仍然没有终止。
        【解决方案9】:

        仅使用连接会导致假阳性与线程交互。就像文档中所说的:

        当 timeout 参数存在而不是 None 时,它​​应该是 指定操作超时的浮点数 秒(或其分数)。由于 join() 总是返回 None,你 必须在 join() 之后调用 isAlive() 来决定是否发生超时 – 如果线程还活着,则 join() 调用超时。

        和说明性的代码:

        threads = []
        for name in some_data:
            new = threading.Thread(
                target=self.some_func,
                args=(name,)
            )
            threads.append(new)
            new.start()
            
        over_threads = iter(threads)
        curr_th = next(over_threads)
        while True:
            curr_th.join()
            if curr_th.is_alive():
                continue
            try:
                curr_th = next(over_threads)
            except StopIteration:
                break
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2011-05-10
          • 2010-09-20
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2021-03-30
          • 1970-01-01
          • 2012-04-09
          相关资源
          最近更新 更多