【问题标题】:How can I know that all Futures are resolved in Tornado?我怎么知道 Tornado 中的所有 Futures 都已解决?
【发布时间】:2017-09-07 01:25:57
【问题描述】:

我有一些解析应用程序,它基本上执行以下操作

  • start() 方法添加到 IOLoop 作为回调以在下一次迭代中调用
  • start() 调用另一个函数,我们称之为get_input()
  • get_input() 是一个协程,它通过网络获取一些数据,然后调度另一个协程 process_input(),将其添加为 start() 是在第一步中添加的。
  • get_input() 还会检查一些取决于获取的数据的条件,并可能使用调整后的参数自行安排

现在,在此条件呈现 False 之后,我知道不会有任何新的输入项要处理。
但是我怎么知道get_input()process_input() 的Futures 仍然没有解决?

我想这可以通过实现一种计数器来解决,每次调用process_input() 时它都会递增,并在解决后递减
但是如果有一系列不同的协程呢?如何跟踪它们的状态,以便在我停止 IOLoop 后,在它们得到解决之前不会有任何任务死亡?

也许,应该有某种分层计数...

编辑:

2 @dano 好的,我现在明白了……我不专心。你真的不会阻塞,因为它自己的调用在这个列表中
但是!

  1. 这样的组织要求只能使用yield 构造,不能使用add_callback,否则我们会失去“等待”的概念
  2. 递归级别增加了.. 嗯,不知道是不是太糟糕了

我今天想出的是“元未来”
我创建了一个裸 Future() 对象。
我用我的装饰器装饰每个启用@coroutine 的方法,这会增加“元未来”中的计数器字段并为他们的未来添加一个自定义完成的回调,这应该减少它。

当它达到零时,“元未来”通过调用 set_result(None) 来解决 还有一个 IOLoop 回调,可以准确地产生那个元未来:

    @coroutine
    def wait_for_complete(self):
      yield self._input_data_collected
      yield self._all_futures_resolved
      self.complete()

因此,在那之后我们知道没有未决的期货。这就像手动实现引用计数一样困难,但它也涵盖了 IOLoop.add_callback() 添加任务的方式

【问题讨论】:

  • 您是否总是希望在完成处理后停止 IOLoop?或者是在带外停止 IOLoop 的请求,您希望它等到工作完成后再实际停止?
  • @dano,据我了解,如果我致电 IOLoop.stop() 时仍有一些未解决的期货,它们将不会被处理。所以我假设我应该手动跟踪所有处理完成的时刻,然后stop()
  • 确实如此; IOLoop.stop() 不会等待出色的工作完成。但是,您可以组织您的应用程序,使其在完成工作后自动退出。不过,如果不真正了解您的程序的结构,就很难说。

标签: python tornado future


【解决方案1】:

您可以编写您的方法,以便在所有工作完成之前它们不会返回,而不是安排回调。然后你可以直接调用IOLoop.run_sync(start),在所有处理完成之前调用不会返回:

from tornado import gen
from tornado.ioloop import IOLoop

@gen.coroutine
def start():
    yield get_input()

@gen.coroutine
def get_input(*args, **kwargs):
    data = yield fetch_data_over_net()
    futs = []  # list of Future objects
    futs.append(process_input(data))
    if should_call_myself(data):
        futs.append(get_input(newargs, newkwargs))
    yield futs # This will wait for all Future objects in the list to complete.

@gen.coroutine
def process_input(data):
    # do stuff

if __name__ == "__main__":
    IOLoop.instance().run_sync(start)

我们利用协程返回 Futures 和 Tornado 支持 waiting for multiple Futures in parallel 的事实,以便我们可以尽可能多地同时运行,而实际上之前从未从 get_input(因此 start)返回所有依赖的工作都完成了。

【讨论】:

  • 我知道Tornado的这个功能,其实我用在别的地方。但是,如果我使用 yield [futures]get_input() 的执行将阻塞直到它们完成,这会影响性能,因为我可以立即运行另一个 get_input() - 无需等待 process_input()。这就是为什么我使用add_callback() 来安排处理
  • @Ojomio 你是如何触发get_input() 的,除了在start() 内部调用一次,然后在get_input() 内部递归调用它?我在示例中介绍了递归调用,它没有被process_input 阻止;它们并行运行。如果您在问题中添加一些示例代码来准确显示您在做什么,这可能会有所帮助。
  • 好的,我现在明白了……我不专心。你真的不会阻塞,因为它自己的调用在这个列表中
  • yield fetch_data_over_net() 是阻塞调用吗?
  • @user1045085 它不会阻塞 IO 循环,因为它是我们 yielding 的协程,但它是“阻塞”的,因为执行流程将等待它完成在继续下一行之前。
猜你喜欢
  • 2022-10-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-07-03
  • 2018-07-22
  • 2019-05-21
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多