【发布时间】:2013-12-11 09:49:22
【问题描述】:
ThreadedWorkerQueue.add_worker() 方法会阻塞,直到 Worker 被使用。是否有一个不错的设计允许向ThreadedWorkerQueue 添加新工作人员而不会阻塞调用~.add_worker() 的线程,但仍然可以使用条件?
这是一个简短的 SSCCE:
import time
import threading
class Worker(object):
def work(self):
pass
class TimeWorker(Worker):
def __init__(self, seconds):
super(TimeWorker, self).__init__()
self.seconds = seconds
def work(self):
for i in xrange(self.seconds):
print "Working ... (%d)" % i
time.sleep(1)
class ThreadedWorkerQueue(threading.Thread):
def __init__(self):
super(ThreadedWorkerQueue, self).__init__()
self.condition = threading.Condition()
self.workers = []
self.running = False
def add_worker(self, worker):
with self.condition:
self.workers.append(worker)
self.condition.notify()
def stop(self):
with self.condition:
self.running = False
self.condition.notify()
self.join()
def consume(self):
if self.workers:
worker = self.workers.pop(0)
worker.work()
def run(self):
self.running = True
while True:
with self.condition:
if not self.running:
break
self.condition.wait()
self.consume()
def main():
queue = ThreadedWorkerQueue()
queue.start()
queue.add_worker(TimeWorker(3))
time.sleep(1)
tstart = time.time()
queue.add_worker(TimeWorker(2))
print "Blocked", time.time() - tstart, "seconds until worker was added."
queue.stop()
main()
编辑
好的,所以我最初的想法是当线程 可以继续消费Workers。这是基本原理 生产者/消费者设计,跳过连续轮询,真的只做 有工作要做的时候工作。
刚才,我有了一个使用默认获取的锁的想法,并且是 当新的工人可以被消耗时释放。但我不确定这是否是 做这件事的好方法。有人能发现问题(例如潜在的死锁)吗?
完整代码在 GitHub 上:https://github.com/NiklasRosenstein/async/blob/73828ecaa2990a71b63caf93c32f9cce5ec11d27/async.py#L686-L750
class ThreadedWorkerQueue(WorkerQueue, threading.Thread):
r""" This class implements the consumer design, introducing :class:`Worker`
objects to start working as soon as there are new workers available. Every
object adding Workers to this queue are the producers. """
def __init__(self):
WorkerQueue.__init__(self)
threading.Thread.__init__(self)
self.lock = threading.Lock()
def __del__(self):
if self.running:
self.stop()
warnings.warn("ThreadedWorkerQueue not stopped before its lost.")
def notify(self):
r""" Notify the ThreadedWorkerQueue that processing can be continued.
It is usually not necessary to call this method manually. """
try:
self.lock.release()
except (thread.error, RuntimeError):
pass
def stop(self, join=True, clear=False):
r""" Interrupt the thread in its doing, pausing the threads actions
until :meth:`start` is called again. All remaining workers are kept
alive unless *clear* is specified True. """
if clear: self.workers.clear()
self.running = False
self.notify()
if join: self.join()
def consume(self):
r""" Just like :meth:`WorkerQueue.work_off`, but doesn't override
the value of :prop:`running`. """
while self.workers:
worker = self.workers.popleft()
self.current = worker
worker.work()
self.current = None
# WorkerQueue
def add_worker(self, worker):
super(ThreadedWorkerQueue, self).add_worker(worker)
self.notify()
# threading.Thread
def start(self):
r""" Overrides :meth:`threading.Thread.start` to allow starting
the ThreadedWorkerQueue multiple times. """
threading.Thread.__init__(self)
return threading.Thread.start(self)
def run(self):
self.running = True
while True:
self.consume()
if not self.running: break
self.lock.acquire()
【问题讨论】:
-
你的多线程组件使用 CPython 吗?如果是这样,请考虑以下几点:CPython 配备了全局解释器锁 (GIL),它一次只允许 Python 字节码的一个操作码执行,而不管给定 Python 进程中可能运行多少线程...这意味着,在所有条件相同的情况下,单个进程,即使是线程化的,在 128 CPU 机器上的运行速度不会比单 CPU 机器快。 (取自oreillynet.com/onlamp/blog/2005/10/…)
-
@Jon 这是为了 UI 异步,而不是试图让事情运行得更快
标签: python conditional-statements producer-consumer locks thread-synchronization