【问题标题】:Python Twisted - running multiple callbacksPython Twisted - 运行多个回调
【发布时间】:2016-04-07 22:27:42
【问题描述】:

我对 twisted 很陌生,我真的需要一件事 - 运行任意数量的函数(从同一个函数开始),收集所有函数的结果并进行一些处理。

这是我所拥有的:

from twisted.internet import defer
import time


# slow computing query
def process_data(num, data):
    time.sleep(5)
    array = []
    # mock the results obtained from processed data
    for i in range(0, 5):
        array.append(num)
    return array

def process_results(arrays):
    # this should collect return arrays of all callbacks 
    print arrays

data = []
callbacks_refs = []
for i in range(0, 5):
   d=defer.Deferred()
   d.addCallback(process_data)
   callbacks_refs.append(d)

callbacks = defer.DeferredList(callbacks_refs)
callbacks.addCallback(process_results)

for i, d in enumerate(callbacks_refs):
    d.callback(i, data)

我希望最后一个 for 循环将开始异步执行所有回调(就像通常使用 Promises 一样),所有结果都将传递给 process_results 函数,该函数将在 callbacks_refs 的所有回调完成后执行,但我觉得我大错特错了。

【问题讨论】:

    标签: python asynchronous callback twisted


    【解决方案1】:

    我不知道您的示例与您的实际代码有多接近,但示例代码显示了对 Twisted 所做工作的一些误解。 Twisted 不会神奇地使您的同步代码异步。您正在阻止time.sleep 中的事件循环。如果您正在做一些受 CPU 限制(而不是 I/O 限制)的事情,您可以使用多个线程或进程。

    我会假设process_data是一个阻塞调用,并为您提供基于多线程的解决方案:

    import time
    from twisted.internet import defer, task, threads
    
    # slow computing query
    def process_data(num):
        time.sleep(5)
        array = []
        # mock the results obtained from processed data
        for i in range(0, 5):
            array.append(num)
        return array
    
    def process_results(arrays):
        # this should collect return arrays of all callbacks
        print arrays
    
    def main(_):
        callbacks_refs = []
        for i in range(0, 5):
            callbacks_refs.append(threads.deferToThread(process_data, i))
        callbacks = defer.DeferredList(callbacks_refs)
        callbacks.addCallback(process_results)
        return callbacks
    
    task.react(main)
    

    我还将给你一个关于 Twisted 编程的一般建议 - 如果你发现自己在输入 d = defer.Deferred(),那么你的设计可能有问题。

    【讨论】:

      【解决方案2】:

      我不知道这是否有解决方法,但是根据您制作 defer.callback() 的方式,您将错误的参数传递给您的回调。

      如果您要附加一个 errback 和一个回调,您可能会发现您只是收到大量失败的结果......所以它可以正常工作,但不能按预期工作。

      我看到了两个修复。

      from functools import partial
      for i in range(0, 5):
          d=defer.Deferred()
          d.addCallback(partial(process_data,i,data[i]))
          # This partial is still kinda crooked, but hopefully I have made my point
          callbacks_refs.append(d)
      

      或更改在回调函数中传递数据的方式

      # slow computing query
      def process_data(data_dict):
          #data_dict['num']
          #data_dict['data']
      
      #...and further down
      d.callback({'num':4,'data':(1,2,3)})
      

      对不起,我对 deferredlist 不太熟悉,但我认为一旦你修复了 deferreds,deferredlist 可能会自动工作。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多