【问题标题】:Queue remote calls to a Python Twisted perspective broker?对 Python Twisted 透视代理的远程调用进行排队?
【发布时间】:2011-02-21 03:44:44
【问题描述】:

Twisted(对于 python)的优势在于它的异步框架(我认为)。我编写了一个通过 Perspective Broker 接受请求的图像处理服务器。只要我一次喂它少于几百张图像,它就可以很好地工作。但是,有时它几乎同时会出现数百张图像。因为它试图同时处理它们,所以服务器崩溃了。

作为一种解决方案,我想在服务器上对 remote_calls 进行排队,这样它一次只能处理约 100 个图像。看起来这可能是 Twisted 已经做过的事情,但我似乎找不到它。关于如何开始实施这个的任何想法?指向正确的方向?谢谢!

【问题讨论】:

    标签: python asynchronous twisted


    【解决方案1】:

    一个可能对此有所帮助的现成选项是twisted.internet.defer.DeferredSemaphore。这是正常(计数)信号量的异步版本,如果您进行过大量线程编程,您可能已经知道。

    (计数)信号量很像互斥体(锁)。但是,在一个互斥体在相应的发布之前只能获取一次的情况下,可以配置一个(计数)信号量以允许任意(但指定的)数量的获取在需要任何相应的发布之前成功。

    这是一个使用DeferredSemaphore 运行十个异步操作的示例,但一次最多运行三个:

    from twisted.internet.defer import DeferredSemaphore, gatherResults
    from twisted.internet.task import deferLater
    from twisted.internet import reactor
    
    
    def async(n):
        print 'Starting job', n
        d = deferLater(reactor, n, lambda: None)
        def cbFinished(ignored):
            print 'Finishing job', n
        d.addCallback(cbFinished)
        return d
    
    
    def main():
        sem = DeferredSemaphore(3)
    
        jobs = []
        for i in range(10):
            jobs.append(sem.run(async, i))
    
        d = gatherResults(jobs)
        d.addCallback(lambda ignored: reactor.stop())
        reactor.run()
    
    
    if __name__ == '__main__':
        main()
    

    DeferredSemaphore 也有明确的acquirerelease 方法,但是run 方法非常方便,几乎总是你想要的。它调用acquire 方法,该方法返回Deferred。对于第一个Deferred,它添加了一个回调,该回调调用您传入的函数(以及任何位置或关键字参数)。如果该函数返回一个Deferred,则在第二个Deferred 中添加一个回调,该回调调用release 方法。

    同步情况也可以通过立即调用release 来处理。错误也得到处理,允许它们传播,但确保完成必要的release 以使DeferredSemaphore 保持一致状态。传递给run 的函数的结果(或它返回的Deferred 的结果)成为run 返回的Deferred 的结果。

    另一种可能的方法可能基于DeferredQueuecooperateDeferredQueue 大多类似于普通队列,但其get 方法返回Deferred。如果在调用时队列中没有项目,Deferred 在添加项目之前不会触发。

    这是一个例子:

    from random import randrange
    
    from twisted.internet.defer import DeferredQueue
    from twisted.internet.task import deferLater, cooperate
    from twisted.internet import reactor
    
    
    def async(n):
        print 'Starting job', n
        d = deferLater(reactor, n, lambda: None)
        def cbFinished(ignored):
            print 'Finishing job', n
        d.addCallback(cbFinished)
        return d
    
    
    def assign(jobs):
        # Create new jobs to be processed
        jobs.put(randrange(10))
        reactor.callLater(randrange(10), assign, jobs)
    
    
    def worker(jobs):
        while True:
            yield jobs.get().addCallback(async)
    
    
    def main():
        jobs = DeferredQueue()
    
        for i in range(10):
            jobs.put(i)
    
        assign(jobs)
    
        for i in range(3):
            cooperate(worker(jobs))
    
        reactor.run()
    
    
    if __name__ == '__main__':
        main()
    

    请注意,async 工作函数与第一个示例中的相同。但是,这一次,还有一个 worker 函数,它显式地将作业从 DeferredQueue 中拉出并使用 async 处理它们(通过添加 async 作为回调到 get 返回的 Deferred) . worker 生成器由cooperate 驱动,在每个Deferred 产生火灾后迭代一次。然后,主循环启动其中三个工作生成器,以便在任何给定时间进行三个作业。

    这种方法比DeferredSemaphore 方法涉及更多的代码,但有一些可能很有趣的好处。首先,cooperate 返回一个CooperativeTask 实例,该实例具有pauseresume 等有用的方法。此外,分配给同一合作者的所有作业将在调度中相互合作,以免事件循环过载(这就是 API 的名称)。在DeferredQueue 方面,还可以对待处理的项目数量设置限制,这样您就可以避免服务器完全过载(例如,如果您的图像处理器卡住并停止完成任务)。如果调用put 的代码处理了队列溢出异常,您可以将此作为压力来尝试停止接受新作业(可能将它们分流到另一台服务器,或提醒管理员)。用DeferredSemaphore 做类似的事情有点棘手,因为没有办法限制有多少工作正在等待获取信号量。

    【讨论】:

    • 酷,我真的很欣赏这些想法。响应使用 DeferredSemaphore 的想法。如果需要完成离散批次的作业,这将非常有用。如果一个批次有太多的工作要做,它只会同时做几个工作,然后当所有的工作都完成后,这个批次就会被收集起来。这样做的缺点是在整个批次完成之前不会返回任何结果,对吗?我认为这个缺点可以通过使用 DeferredQueue...
    • 使用 DeferredQueue 和协作的方法很聪明。就扩展处理器而言,它确实会给我未来更多的控制权。我什至认为它不一定更复杂。谢谢。
    【解决方案2】:

    你可能也喜欢我写的 txRDQ(Resizable Dispatch Queue)。谷歌一下,它在 LaunchPad 的 tx 集合中。抱歉,我没有更多时间回复 - 即将上台。

    特里

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2011-02-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-11-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多