【问题标题】:Python threading, non-blocking productionPython线程,非阻塞生产
【发布时间】:2013-12-11 09:49:22
【问题描述】:

ThreadedWorkerQueue.add_worker() 方法会阻塞,直到 Worker 被使用。是否有一个不错的设计允许向ThreadedWorkerQueue 添加新工作人员而不会阻塞调用~.add_worker() 的线程,但仍然可以使用条件?

这是一个简短的 SSCCE:

import time
import threading

class Worker(object):

    def work(self):
        pass

class TimeWorker(Worker):

    def __init__(self, seconds):
        super(TimeWorker, self).__init__()
        self.seconds = seconds

    def work(self):
        for i in xrange(self.seconds):
            print "Working ... (%d)" % i
            time.sleep(1)

class ThreadedWorkerQueue(threading.Thread):

    def __init__(self):
        super(ThreadedWorkerQueue, self).__init__()
        self.condition = threading.Condition()
        self.workers = []
        self.running = False

    def add_worker(self, worker):
        with self.condition:
            self.workers.append(worker)
            self.condition.notify()

    def stop(self):
        with self.condition:
            self.running = False
            self.condition.notify()
        self.join()

    def consume(self):
        if self.workers:
            worker = self.workers.pop(0)
            worker.work()

    def run(self):
        self.running = True
        while True:
            with self.condition:
                if not self.running:
                    break

                self.condition.wait()
                self.consume()

def main():
    queue = ThreadedWorkerQueue()
    queue.start()

    queue.add_worker(TimeWorker(3))
    time.sleep(1)

    tstart = time.time()
    queue.add_worker(TimeWorker(2))
    print "Blocked", time.time() - tstart, "seconds until worker was added."

    queue.stop()

main()

编辑

好的,所以我最初的想法是当线程 可以继续消费Workers。这是基本原理 生产者/消费者设计,跳过连续轮询,真的只做 有工作要做的时候工作。

刚才,我有了一个使用默认获取的锁的想法,并且是 当新的工人可以被消耗时释放。但我不确定这是否是 做这件事的好方法。有人能发现问题(例如潜在的死锁)吗?

完整代码在 GitHub 上:https://github.com/NiklasRosenstein/async/blob/73828ecaa2990a71b63caf93c32f9cce5ec11d27/async.py#L686-L750

class ThreadedWorkerQueue(WorkerQueue, threading.Thread):
    r""" This class implements the consumer design, introducing :class:`Worker`
    objects to start working as soon as there are new workers available. Every
    object adding Workers to this queue are the producers. """

    def __init__(self):
        WorkerQueue.__init__(self)
        threading.Thread.__init__(self)
        self.lock = threading.Lock()

    def __del__(self):
        if self.running:
            self.stop()
            warnings.warn("ThreadedWorkerQueue not stopped before its lost.")

    def notify(self):
        r""" Notify the ThreadedWorkerQueue that processing can be continued.
        It is usually not necessary to call this method manually. """

        try:
            self.lock.release()
        except (thread.error, RuntimeError):
            pass

    def stop(self, join=True, clear=False):
        r""" Interrupt the thread in its doing, pausing the threads actions
        until :meth:`start` is called again. All remaining workers are kept
        alive unless *clear* is specified True. """

        if clear: self.workers.clear()
        self.running = False
        self.notify()
        if join: self.join()

    def consume(self):
        r""" Just like :meth:`WorkerQueue.work_off`, but doesn't override
        the value of :prop:`running`. """

        while self.workers:
            worker = self.workers.popleft()
            self.current = worker
            worker.work()
        self.current = None

    # WorkerQueue

    def add_worker(self, worker):
        super(ThreadedWorkerQueue, self).add_worker(worker)
        self.notify()

    # threading.Thread

    def start(self):
        r""" Overrides :meth:`threading.Thread.start` to allow starting
        the ThreadedWorkerQueue multiple times. """

        threading.Thread.__init__(self)
        return threading.Thread.start(self)

    def run(self):
        self.running = True
        while True:
            self.consume()
            if not self.running: break
            self.lock.acquire()

【问题讨论】:

  • 你的多线程组件使用 CPython 吗?如果是这样,请考虑以下几点:CPython 配备了全局解释器锁 (GIL),它一次只允许 Python 字节码的一个操作码执行,而不管给定 Python 进程中可能运行多少线程...这意味着,在所有条件相同的情况下,单个进程,即使是线程化的,在 128 CPU 机器上的运行速度不会比单 CPU 机器快。 (取自oreillynet.com/onlamp/blog/2005/10/…
  • @Jon 这是为了 UI 异步,而不是试图让事情运行得更快

标签: python conditional-statements producer-consumer locks thread-synchronization


【解决方案1】:

...但仍在使用条件?

你不需要条件。 Python 已经为此提供了完美的机制:Queue.Queue。将ThreadedWorkerQueue.workers从列表更改为Queue,您无需担心条件、锁定、通知等。它将大大简化您的代码。

你需要更换:

  • 带有Queue 的工人列表
  • 附加+通知Queue.put
  • 等待+弹出Queue.get

并摆脱with self.condition: ...

此外,从stop() 内部调用self.join() 也不是一个好习惯。把它留给调用线程。如果您有多个线程需要停止,您需要先将它们全部停止,然后再将它们全部加入。

【讨论】:

  • 好的,我应该说清楚,不知何故我认为“使用条件”暗示了这一点。无论如何,我最初的想法是保持条件,以便只有在消费者真正有事情要处理时才能唤醒消费者。当您使用一个简单的队列时,您需要不断地轮询它,而 Condition 只是简单地唤醒!简单地让线程休眠几毫秒并重新检查队列可能没有太大区别,但它仍然不如条件方法有效。但是,我在一小时前想到了一些事情,请您查看我编辑过的问题吗?
  • @NiklasR “当你使用一个简单的队列时,你需要不断地轮询它,而一个条件只是简单地唤醒!”嗯?不,你没有。 Queue.get() 正在阻塞。
  • 就像 millimoose 所说,Queue.get() 正在阻塞。 Queue 非常适合您的用例。它就是为此而写的。
  • @millimoose 啊谢谢,我不知道!现在我记得,我尝试使用 Queue 类,但我还需要遍历 Queue.Queue 类不支持的元素。
  • @NiklasR 还有collections.deque() 支持绑定和非锁定修改。 (不知道你将如何实现阻塞。)老实说,在你的情况下,我会考虑找出一种方法来绕过迭代队列内容 - 考虑到用例,它真的没有太大意义它针对它进行了优化。 (即以线程安全的方式从 A 点获取数据到 B 点。)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-07-06
  • 1970-01-01
  • 1970-01-01
  • 2013-02-01
  • 2019-02-12
  • 2013-05-24
相关资源
最近更新 更多