【问题标题】:Threading queue working example [duplicate]线程队列工作示例
【发布时间】:2016-05-11 16:02:15
【问题描述】:

如何在以下代码中将打开线程的最大值限制为 20?我知道过去曾有人问过一些类似的问题,但我特别想知道如何最好地使用队列和工作示例(如果可能的话)。

    # b is a list with 10000 items
    threads = [threading.Thread(target=targetFunction, args=(ptf,anotherarg)) for ptf in b]
    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

【问题讨论】:

  • 你试过b[:20]吗?或者你想用最多 20 个线程处理整个范围?你的问题不太清楚。
  • # b 是一个包含 10000 个项目的列表,不能更改
  • 所以使用线程池并让targetFunction 从队列中拉取工作?
  • 例如,您希望每个循环处理 20 个线程吗?还是什么?
  • targetFunction 正在从服务器下载信息,我一次不能打开超过 20 个连接。这就是为什么我想将打开线程的最大值限制为 20

标签: python multithreading


【解决方案1】:

执行此操作的简单方法是使用queue.Queue 进行工作,并使用for _ in range(MAXTHREADS): threading.Thread(target=f, args=(the_queue,)).start() 启动线程。然而,我发现通过子类化Thread 更容易阅读。您的里程可能会有所不同。

import threading
import queue

class Worker(threading.Thread):
    def __init__(self, q, other_arg, *args, **kwargs):
        self.q = q
        self.other_arg = other_arg
        super().__init__(*args, **kwargs)
    def run(self):
        while True:
            try:
                work = self.q.get(timeout=3)  # 3s timeout
            except queue.Empty:
                return
            # do whatever work you have to do on work
            self.q.task_done()

q = queue.Queue()
for ptf in b:
    q.put_nowait(ptf)
for _ in range(20):
    Worker(q, otherarg).start()
q.join()  # blocks until the queue is empty.

如果您坚持使用某个函数,我建议您将targetFunction 包装为知道如何从队列中获取的东西。

def wrapper_targetFunc(f, q, somearg):
    while True:
        try:
            work = q.get(timeout=3)  # or whatever
        except queue.Empty:
            return
        f(work, somearg)
        q.task_done()

q = queue.Queue()
for ptf in b:
    q.put_nowait(ptf)
for _ in range(20):
    threading.Thread(target=wrapper_targetFunc,
                     args=(targetFunction, q, otherarg)).start()
q.join()

【讨论】:

  • 这看起来很不错,但是如果我想把它放在一个类中,它会改变我的代码中的一些变化。有没有办法只调用一个函数?
  • @Nicolas 检查我的编辑。我发现这更难阅读,但正如我在问题顶部所说:YMMV
  • 如果两个线程同时试图从队列中拉出,一个线程会阻塞,直到另一个线程完成其get。由于 GIL,这在 CPython 中是不可能的,但我无法知道使用不会运行并发线程的 Python 版本。 3 是任意的。 timeout=1 应该同样工作。
  • @Nicolas 我猜你的意思是daemon=True。不,不是——当run 返回时,线程会死掉,当q.get 抛出queue.Empty 异常时,就会在这段代码中发生这种情况。
  • @shivarajkarki q.put 如果队列已满,将永远阻塞,等待队列被清空到足以将新项目放入其中。 q.put_nowait 在尝试将新项目入队时,如果队列已满,将引发异常。