【问题标题】:How to limit number of concurrent threads in Python?如何限制 Python 中的并发线程数?
【发布时间】:2013-08-21 00:53:16
【问题描述】:

如何限制 Python 中的并发线程数?

例如,我有一个包含许多文件的目录,我想处理所有文件,但一次只能并行处理 4 个。

这是我目前所拥有的:

def process_file(fname):
        # open file and do something                                                                                            

def process_file_thread(queue, fname):
    queue.put(process_file(fname))

def process_all_files(d):
    files=glob.glob(d + '/*')
    q=Queue.Queue()
    for fname in files:
        t=threading.Thread(target=process_file_thread, args=(q, fname))
        t.start()
    q.join()

def main():
    process_all_files('.')
    # Do something after all files have been processed

如何修改代码以便一次只运行 4 个线程?

请注意,我想等待所有文件都处理完毕,然后继续处理已处理的文件。

【问题讨论】:

  • 你试过multiprocessPools吗?在 Python 3 上,您还可以使用 futures
  • 你也可以在 Python 2 中使用futures,你只需要安装 backport。
  • concurrent.futures 确实是最好的方法
  • 您可以使用multiprocessing.pool.ThreadPool 轻松限制线程数,如this answer 中的另一个问题所示。

标签: python multithreading


【解决方案1】:

例如,我有一个包含许多文件的目录,我想处理所有文件,但一次只能并行处理 4 个。

这正是线程池的作用:您创建作业,并且池一次并行运行 4 个。您可以通过使用执行程序使事情变得更简单,您只需将函数(或其他可调用对象)交给它,它就会将结果交还给您。您可以自己构建所有这些,但您不必这样做。*

stdlib 的concurrent.futures 模块是最简单的方法。 (对于 Python 3.1 和更早版本,请参阅 backport。)事实上,one of the main examples 非常接近您想要做的事情。但让我们根据您的具体用例对其进行调整:

def process_all_files(d):
    files = glob.glob(d + '/*')
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        fs = [executor.submit(process_file, file) for file in files]
        concurrent.futures.wait(fs)

如果您希望 process_file 返回某些内容,这几乎同样简单:

def process_all_files(d):
    files = glob.glob(d + '/*')
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        fs = [executor.submit(process_file, file) for file in files]
        for f in concurrent.futures.as_completed(fs):
            do_something(f.result())

如果你也想处理异常……好吧,看看这个例子;这只是一个try/except 围绕着对result() 的调用。


* 如果您想自己构建它们,这并不难。 multiprocessing.pool 的源代码写得很好,评论也很好,而且没有那么复杂,而且大多数困难的东西都与线程无关; concurrent.futures 的来源更简单。

【讨论】:

    【解决方案2】:

    这个技巧我用过几次,觉得有点难看:

    import threading
    
    def process_something():
        something = list(get_something)
    
        def worker():
            while something:
                obj = something.pop()
                # do something with obj
    
       threads = [Thread(target=worker) for i in range(4)]
       [t.start() for t in threads]
       [t.join() for t in threads]
    

    【讨论】:

      猜你喜欢
      • 2015-07-17
      • 1970-01-01
      • 2011-09-14
      • 1970-01-01
      • 2013-01-17
      • 1970-01-01
      • 2010-12-19
      • 1970-01-01
      • 2016-10-24
      相关资源
      最近更新 更多