【发布时间】:2017-09-07 01:25:57
【问题描述】:
我有一些解析应用程序,它基本上执行以下操作
- 将
start()方法添加到 IOLoop 作为回调以在下一次迭代中调用 -
start()调用另一个函数,我们称之为get_input() -
get_input()是一个协程,它通过网络获取一些数据,然后调度另一个协程process_input(),将其添加为start()是在第一步中添加的。 -
get_input()还会检查一些取决于获取的数据的条件,并可能使用调整后的参数自行安排
现在,在此条件呈现 False 之后,我知道不会有任何新的输入项要处理。
但是我怎么知道get_input() 和process_input() 的Futures 仍然没有解决?
我想这可以通过实现一种计数器来解决,每次调用process_input() 时它都会递增,并在解决后递减
但是如果有一系列不同的协程呢?如何跟踪它们的状态,以便在我停止 IOLoop 后,在它们得到解决之前不会有任何任务死亡?
也许,应该有某种分层计数...
编辑:
2 @dano
好的,我现在明白了……我不专心。你真的不会阻塞,因为它自己的调用在这个列表中
但是!
- 这样的组织要求只能使用
yield构造,不能使用add_callback,否则我们会失去“等待”的概念 - 递归级别增加了.. 嗯,不知道是不是太糟糕了
我今天想出的是“元未来”
我创建了一个裸 Future() 对象。
我用我的装饰器装饰每个启用@coroutine 的方法,这会增加“元未来”中的计数器字段并为他们的未来添加一个自定义完成的回调,这应该减少它。
当它达到零时,“元未来”通过调用 set_result(None) 来解决
还有一个 IOLoop 回调,可以准确地产生那个元未来:
@coroutine
def wait_for_complete(self):
yield self._input_data_collected
yield self._all_futures_resolved
self.complete()
因此,在那之后我们知道没有未决的期货。这就像手动实现引用计数一样困难,但它也涵盖了 IOLoop.add_callback() 添加任务的方式
【问题讨论】:
-
您是否总是希望在完成处理后停止 IOLoop?或者是在带外停止 IOLoop 的请求,您希望它等到工作完成后再实际停止?
-
@dano,据我了解,如果我致电
IOLoop.stop()时仍有一些未解决的期货,它们将不会被处理。所以我假设我应该手动跟踪所有处理完成的时刻,然后stop()它 -
确实如此;
IOLoop.stop()不会等待出色的工作完成。但是,您可以组织您的应用程序,使其在完成工作后自动退出。不过,如果不真正了解您的程序的结构,就很难说。