【问题标题】:Handling tasks that never terminate in python3 asyncio处理在 python3 asyncio 中永远不会终止的任务
【发布时间】:2015-11-04 18:47:18
【问题描述】:

有时异步任务没有有意义的终止条件 - 例如,在下面的程序中,“rate_limiter”任务在队列中以固定速率永远生成令牌流。

import asyncio
import sys

@asyncio.coroutine
def rate_limiter(queue, rate):
    """Push tokens to QUEUE at a rate of RATE per second."""
    delay = 1/rate
    while True:
        yield from asyncio.sleep(delay)
        yield from queue.put(None)

@asyncio.coroutine
def do_work(n, rate):
    for i in range(n):
        yield from rate.get()
        sys.stdout.write("job {}\n".format(i))

def main():
    loop   = asyncio.get_event_loop()
    rate   = asyncio.Queue()
    rltask = loop.create_task(rate_limiter(rate, 10))
    wtask  = loop.create_task(do_work(20, rate))
    loop.run_until_complete(wtask)

main()

这个程序完美地运行除了,异步库认为这是一个编程错误,当没有任何东西可以限制速率时丢弃rltask;你会收到类似的投诉

...
job 18
job 19
Task was destroyed but it is pending!
task: <Task pending coro=<rate_limiter() running at rl.py:9>
      wait_for=<Future pending cb=[Task._wakeup()]>>

(无论是否处于调试模式)。

我可以解决这个问题,比如一个告诉rate_limiter 协程跳出循环的事件,但这感觉像是额外的代码,没有真正的好处。 应该在使用 asyncio 时如何处理这种情况?

编辑:我不清楚:我正在寻找的是类似于线程上的 daemon 标志的东西:这样我就不必等待特定的任务, 理想地表示为任务本身或其协程的注释。我也会接受一个证明不存在这种机制的答案。我已经知道解决方法了。

【问题讨论】:

  • 不相关:在多线程情况下,daemon thread is used to produce tokens (see RatedSemaphore)(它在程序退出时死亡)
  • @J.F.Sebastian 对,这是相同的算法,转换为异步任务。据我所知,asyncio 没有等效的守护线程。
  • 您可以使用asyncio.Task.all_tasks() 获取待处理任务并调用cancel() 或仅调用rltask.cancel() 并在CancelledError 中中断rate_limiter()
  • @J.F.Sebastian 这涉及的额外代码量与事件大致相同。
  • while not stopped.wait(delay) where stopped is asyncio.Event 可以模拟 asyncio.sleep(delay) + CancelledError 但它不会模拟更复杂的代码,您需要在退出时进行清理(例如,使用 @ 987654338@-statements 虽然明确的协议,如server.close()server.wait_closed() 如果您需要彻底关闭会更好)。

标签: python-3.x python-asyncio


【解决方案1】:

为避免 “任务已销毁,但它正在等待处理!” 警告,如果您为相应的未来对象设置了一个虚拟结果,您可以将一个永不结束的协程标记为退出程序时完成:

#!/usr/bin/env python3.5
import asyncio
import itertools
from contextlib import closing, contextmanager


@contextmanager
def finishing(coro_or_future, *, loop=None):
    """Mark a never ending coroutine or future as done on __exit__."""
    fut = asyncio.ensure_future(
        coro_or_future, loop=loop)  # start infinite loop
    try:
        yield
    finally:
        if not fut.cancelled():
            fut.set_result(None)  # mark as finished


async def never_ends():
    for c in itertools.cycle('\|/-'):
        print(c, end='\r', flush=True)
        await asyncio.sleep(.3)


with closing(asyncio.get_event_loop()) as loop, \
     finishing(never_ends(), loop=loop):
    loop.run_until_complete(asyncio.sleep(3))  # do something else

它假定您的协程在进程退出之前不需要显式清理。在后一种情况下,为清理定义一个显式过程:提供可以调用的方法(例如,server.close()server.wait_closed()),或者传递调用者应该在关闭或引发时设置的事件(asyncio.Event)异常(例如CancelledError)。

引入finishing() 的好处是检测错误,即您不应忽略警告,除非它被finishing() 调用明确静音。

【讨论】:

    【解决方案2】:

    .cancel()任务然后等待它被取消,在外面抓CancelledError

    # vim: tabstop=4 expandtab
    
    import asyncio
    import sys
    
    @asyncio.coroutine
    def rate_limiter(queue, rate):
        """Push tokens to QUEUE at a rate of RATE per second."""
        delay = 1/rate
        while True:
            yield from asyncio.sleep(delay)
            yield from queue.put(None)
    
    @asyncio.coroutine
    def do_work(n, rate):
        for i in range(n):
            yield from rate.get()
            sys.stdout.write("job {}\n".format(i))
    
    def main():
        loop   = asyncio.get_event_loop()
        rate   = asyncio.Queue()
        rltask = loop.create_task(rate_limiter(rate, 10))
        wtask  = loop.create_task(do_work(20, rate))
        loop.run_until_complete(wtask)
        rltask.cancel()
        try:
            loop.run_until_complete(rltask)
        except asyncio.CancelledError:
            ...
        loop.close()
    
    main()
    

    【讨论】:

    • 正如我在 cmets 中所说的那样,这涉及“没有真正好处的额外代码”,就像使用事件结束循环一样。你是在断言没有办法不必等待rltask
    • 您已经不必等待rltask。那么唯一的问题是asyncio 任务实现logs 作为error。您可以将 private Task._log_destroy_pending 变量设置为 False,或者您可以使用 BaseEventLoop.set_exception_handler 覆盖默认异常处理程序以忽略这些消息。无论哪种方式,都需要额外的代码。
    猜你喜欢
    • 2014-05-29
    • 2014-05-13
    • 2015-08-14
    • 2018-01-11
    • 2018-10-14
    • 1970-01-01
    • 2014-08-08
    • 2021-10-08
    • 1970-01-01
    相关资源
    最近更新 更多