【问题标题】:How to interrupt Tornado coroutine如何中断 Tornado 协程
【发布时间】:2015-07-02 22:58:16
【问题描述】:

假设我有两个这样工作的函数:

@tornado.gen.coroutine
def f():
    for i in range(4):
        print("f", i)
        yield tornado.gen.sleep(0.5)

@tornado.gen.coroutine
def g():
    yield tornado.gen.sleep(1)
    print("Let's raise RuntimeError")
    raise RuntimeError

一般来说,函数f 可能包含无限循环并且永远不会返回(例如它可以处理一些队列)。

我想做的是能够随时中断它。

最明显的方法行不通。异常仅在函数 f 退出后引发(如果它是无穷无尽的,它显然永远不会发生)。

@tornado.gen.coroutine
def main():
    try:
        yield [f(), g()]
    except Exception as e:
        print("Caught", repr(e))

    while True:
        yield tornado.gen.sleep(10)

if __name__ == "__main__":
    tornado.ioloop.IOLoop.instance().run_sync(main)

输出:

f 0
f 1
Let's raise RuntimeError
f 2
f 3
Traceback (most recent call last):
  File "/tmp/test/lib/python3.4/site-packages/tornado/gen.py", line 812, in run
    yielded = self.gen.send(value)
StopIteration

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  <...>
  File "test.py", line 16, in g
    raise RuntimeError
RuntimeError

也就是说,只有当两个协程都返回(两个期货都解析)时才会引发异常。

tornado.gen.WaitIterator 部分解决了这个问题,但它有问题 (unless I'm mistaken)。但这不是重点。

它仍然没有解决中断现有协程的问题。即使启动协程的函数退出,协程也会继续运行。

编辑:似乎 Tornado 并不真正支持协程取消,这与 Python 的 asyncio 不同,您可以轻松地在每个屈服点抛出 CancelledError

【问题讨论】:

  • 普通的generator.throw 函数不能与 Tornado 协程一起工作吗?而且,如果没有,是否没有记录在案的替代品呢? (对于基于yield from 的协程,如在stdlib 的asyncio 中,有一些棘手的问题,但对于基于yield 的蹦床协程不应该是一个问题。)
  • tornado.gen.coroutine 装饰器使函数返回未来而不是生成器,并且底层生成器似乎没有暴露。 Tornado 期货也不支持cancel() 方法(它的实现总是返回False)。
  • 哦,好吧,我想我对 Tornado 的了解还不够,无法在这里提供帮助。希望别人这样做。 :)

标签: python exception tornado yield coroutine


【解决方案1】:

如果您 use WaitIterator according to the instructions,并使用 toro.Event 在协程之间发出信号,它会按预期工作:

from datetime import timedelta
import tornado.gen
import tornado.ioloop

import toro

stop = toro.Event()


@tornado.gen.coroutine
def f():
    for i in range(4):
        print("f", i)

        # wait raises Timeout if not set before the deadline.
        try:
            yield stop.wait(timedelta(seconds=0.5))
            print("f done")
            return
        except toro.Timeout:
            print("f continuing")


@tornado.gen.coroutine
def g():
    yield tornado.gen.sleep(1)
    print("Let's raise RuntimeError")
    raise RuntimeError


@tornado.gen.coroutine
def main():
    wait_iterator = tornado.gen.WaitIterator(f(), g())
    while not wait_iterator.done():
        try:
            result = yield wait_iterator.next()
        except Exception as e:
            print("Error {} from {}".format(e, wait_iterator.current_future))
            stop.set()
        else:
            print("Result {} received from {} at {}".format(
                result, wait_iterator.current_future,
                wait_iterator.current_index))


if __name__ == "__main__":
    tornado.ioloop.IOLoop.instance().run_sync(main)

现在,pip install toro 获取 Event 类。 Tornado 4.2 将包括事件,see the changelog

【讨论】:

  • 那行得通,但这基本上意味着我必须手动放置中断点(除了屈服点),创建事件对象并以某种方式将其与协程相关联等。不是很方便。不过,我相信您的解决方案可能会通过一些装饰器和辅助函数得到改进。
【解决方案2】:

警告:这不是一个有效的解决方案。看评论。不过,如果您是新手(如我自己),此示例可以显示逻辑流程。谢谢@nathaniel-j-smith 和@wgh

使用更原始的东西有什么区别,例如全局变量?

import asyncio


event = asyncio.Event()
aflag = False


async def short():
    while not aflag:
        print('short repeat')
        await asyncio.sleep(1)
    print('short end')


async def long():
    global aflag

    print('LONG START')
    await asyncio.sleep(3)

    aflag = True
    print('LONG END')


async def main():
    await asyncio.gather(long(), short())

if __name__ == '__main__':
    asyncio.run(main())

它适用于 asyncio,但我想这个想法保持不变。这是一个半问题(为什么 Event 会更好?)。然而,解决方案会产生作者需要的准确结果:

LONG START
short repeat
short repeat
short repeat
LONG END
short end

更新: 这个slides 可能对理解问题的核心很有帮助。

【讨论】:

  • 这需要协程不断轮询全局变量,这不是一个选项,因为协程可能会被其他库的网络 IO 阻塞。
  • 我不明白。他们使用相同的共享内存。然而,一个线程会出现问题。您能否分享任何有关它的博客文章或简要描述一下,为什么在一个进程的共享内存中使用非常轻量级的原语会出现问题?使用 ContextVars 是否更可取,或者在概念上没有区别?谢谢@WGH
  • 想象一下,如果不是await asyncio.sleep(1),而是await asyncio.sleep(1000),或者await fetch("http://slow.example.com/"),会发生什么。
  • 问题是,如果只在每次操作完成后检查变量,那么函数可能需要很长时间才能注意到全局变量已设置。如果你现在想打断它,那么等待 1000 秒并不是那么好:-)。另一方面,如果你想让它被快速中断,那么函数必须不断地以高频率轮询全局变量,这是低效的。
  • 噢噢,现在我明白了。谢谢@NathanielJ.Smith。我担心我不理解协程的概念,尽管这是操作系统或网络的一些问题。但这只是一个简单的逻辑流程!
【解决方案3】:

从版本 5 开始,Tornado runs on asyncio event loop

在 Python 3 上,IOLoop 始终是 asyncio 事件循环的包装器,使用 asyncio.Futureasyncio.Task 代替 Tornado 对应项。

因此您可以使用asyncio 任务取消,即asyncio.Task.cancel

您的示例带有读取 while-true 循环的队列,可能如下所示。

import logging
from asyncio import CancelledError

from tornado import ioloop, gen


async def read_off_a_queue():
    while True:
        try:
            await gen.sleep(1)
        except CancelledError:
            logging.debug('Reader cancelled')
            break
        else:
            logging.debug('Pretend a task is consumed')

async def do_some_work():
    await gen.sleep(5)
    logging.debug('do_some_work is raising')
    raise RuntimeError                     

async def main():
    logging.debug('Starting queue reader in background')
    reader_task = gen.convert_yielded(read_off_a_queue())    
    try:
        await do_some_work()
    except RuntimeError:
        logging.debug('do_some_work failed, cancelling reader')
        reader_task.cancel()
        # give the task a chance to clean up, in case it
        # catches CancelledError and awaits something
        try:
            await reader_task            
        except CancelledError:
            pass


if __name__ == '__main__':
    logging.basicConfig(level='DEBUG')        
    ioloop.IOLoop.instance().run_sync(main)

如果你运行它,你应该会看到:

DEBUG:asyncio:Using selector: EpollSelector
DEBUG:root:Starting queue reader in background
DEBUG:root:Pretend a task is consumed
DEBUG:root:Pretend a task is consumed
DEBUG:root:Pretend a task is consumed
DEBUG:root:Pretend a task is consumed
DEBUG:root:do_some_work is raising
DEBUG:root:do_some_work failed, cancelling reader
DEBUG:root:Reader cancelled

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-06-26
    • 2018-11-30
    • 1970-01-01
    • 2016-03-29
    • 2020-10-16
    相关资源
    最近更新 更多