【发布时间】:2015-11-04 18:47:18
【问题描述】:
有时异步任务没有有意义的终止条件 - 例如,在下面的程序中,“rate_limiter”任务在队列中以固定速率永远生成令牌流。
import asyncio
import sys
@asyncio.coroutine
def rate_limiter(queue, rate):
"""Push tokens to QUEUE at a rate of RATE per second."""
delay = 1/rate
while True:
yield from asyncio.sleep(delay)
yield from queue.put(None)
@asyncio.coroutine
def do_work(n, rate):
for i in range(n):
yield from rate.get()
sys.stdout.write("job {}\n".format(i))
def main():
loop = asyncio.get_event_loop()
rate = asyncio.Queue()
rltask = loop.create_task(rate_limiter(rate, 10))
wtask = loop.create_task(do_work(20, rate))
loop.run_until_complete(wtask)
main()
这个程序完美地运行除了,异步库认为这是一个编程错误,当没有任何东西可以限制速率时丢弃rltask;你会收到类似的投诉
...
job 18
job 19
Task was destroyed but it is pending!
task: <Task pending coro=<rate_limiter() running at rl.py:9>
wait_for=<Future pending cb=[Task._wakeup()]>>
(无论是否处于调试模式)。
我可以解决这个问题,比如一个告诉rate_limiter 协程跳出循环的事件,但这感觉像是额外的代码,没有真正的好处。 应该在使用 asyncio 时如何处理这种情况?
编辑:我不清楚:我正在寻找的是类似于线程上的 daemon 标志的东西:这样我就不必等待特定的任务, 理想地表示为任务本身或其协程的注释。我也会接受一个证明不存在这种机制的答案。我已经知道解决方法了。
【问题讨论】:
-
不相关:在多线程情况下,daemon thread is used to produce tokens (see
RatedSemaphore)(它在程序退出时死亡) -
@J.F.Sebastian 对,这是相同的算法,转换为异步任务。据我所知,asyncio 没有等效的守护线程。
-
您可以使用
asyncio.Task.all_tasks()获取待处理任务并调用cancel()或仅调用rltask.cancel()并在CancelledError中中断rate_limiter() -
@J.F.Sebastian 这涉及的额外代码量与事件大致相同。
-
while not stopped.wait(delay)wherestoppedisasyncio.Event可以模拟asyncio.sleep(delay)+CancelledError但它不会模拟更复杂的代码,您需要在退出时进行清理(例如,使用 @ 987654338@-statements 虽然明确的协议,如server.close(),server.wait_closed()如果您需要彻底关闭会更好)。