一个可能对此有所帮助的现成选项是twisted.internet.defer.DeferredSemaphore。这是正常(计数)信号量的异步版本,如果您进行过大量线程编程,您可能已经知道。
(计数)信号量很像互斥体(锁)。但是,在一个互斥体在相应的发布之前只能获取一次的情况下,可以配置一个(计数)信号量以允许任意(但指定的)数量的获取在需要任何相应的发布之前成功。
这是一个使用DeferredSemaphore 运行十个异步操作的示例,但一次最多运行三个:
from twisted.internet.defer import DeferredSemaphore, gatherResults
from twisted.internet.task import deferLater
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def main():
sem = DeferredSemaphore(3)
jobs = []
for i in range(10):
jobs.append(sem.run(async, i))
d = gatherResults(jobs)
d.addCallback(lambda ignored: reactor.stop())
reactor.run()
if __name__ == '__main__':
main()
DeferredSemaphore 也有明确的acquire 和release 方法,但是run 方法非常方便,几乎总是你想要的。它调用acquire 方法,该方法返回Deferred。对于第一个Deferred,它添加了一个回调,该回调调用您传入的函数(以及任何位置或关键字参数)。如果该函数返回一个Deferred,则在第二个Deferred 中添加一个回调,该回调调用release 方法。
同步情况也可以通过立即调用release 来处理。错误也得到处理,允许它们传播,但确保完成必要的release 以使DeferredSemaphore 保持一致状态。传递给run 的函数的结果(或它返回的Deferred 的结果)成为run 返回的Deferred 的结果。
另一种可能的方法可能基于DeferredQueue 和cooperate。 DeferredQueue 大多类似于普通队列,但其get 方法返回Deferred。如果在调用时队列中没有项目,Deferred 在添加项目之前不会触发。
这是一个例子:
from random import randrange
from twisted.internet.defer import DeferredQueue
from twisted.internet.task import deferLater, cooperate
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def assign(jobs):
# Create new jobs to be processed
jobs.put(randrange(10))
reactor.callLater(randrange(10), assign, jobs)
def worker(jobs):
while True:
yield jobs.get().addCallback(async)
def main():
jobs = DeferredQueue()
for i in range(10):
jobs.put(i)
assign(jobs)
for i in range(3):
cooperate(worker(jobs))
reactor.run()
if __name__ == '__main__':
main()
请注意,async 工作函数与第一个示例中的相同。但是,这一次,还有一个 worker 函数,它显式地将作业从 DeferredQueue 中拉出并使用 async 处理它们(通过添加 async 作为回调到 get 返回的 Deferred) . worker 生成器由cooperate 驱动,在每个Deferred 产生火灾后迭代一次。然后,主循环启动其中三个工作生成器,以便在任何给定时间进行三个作业。
这种方法比DeferredSemaphore 方法涉及更多的代码,但有一些可能很有趣的好处。首先,cooperate 返回一个CooperativeTask 实例,该实例具有pause、resume 等有用的方法。此外,分配给同一合作者的所有作业将在调度中相互合作,以免事件循环过载(这就是 API 的名称)。在DeferredQueue 方面,还可以对待处理的项目数量设置限制,这样您就可以避免服务器完全过载(例如,如果您的图像处理器卡住并停止完成任务)。如果调用put 的代码处理了队列溢出异常,您可以将此作为压力来尝试停止接受新作业(可能将它们分流到另一台服务器,或提醒管理员)。用DeferredSemaphore 做类似的事情有点棘手,因为没有办法限制有多少工作正在等待获取信号量。