【问题标题】:Cancel nested coroutines in python asyncio取消python asyncio中的嵌套协程
【发布时间】:2016-12-06 20:06:22
【问题描述】:

在我的应用程序中,我有一个协程,它可能等待其他几个协程,并且每个如果这个协程,可能等待另一个协程,依此类推。 如果这样的协程之一失败,则无需执行所有其他尚未执行的协程。 (在我的情况下,这甚至是有害的,我想启动几个回滚协程来代替)。 那么,如何取消所有嵌套协程的执行呢?这是我现在所拥有的:

import asyncio

async def foo():
    for i in range(5):
        print('Foo', i)
        await asyncio.sleep(0.5)
    print('Foo2 done')

async def bar():
    await asyncio.gather(bar1(), bar2())


async def bar1():
    await asyncio.sleep(1)
    raise Exception('Boom!')


async def bar2():
    for i in range(5):
        print('Bar2', i)
        await asyncio.sleep(0.5)
    print('Bar2 done')


async def baz():
    for i in range(5):
        print('Baz', i)
        await asyncio.sleep(0.5)

async def main():
    task_foo = asyncio.Task(foo())
    task_bar = asyncio.Task(bar())
    try:
        await asyncio.gather(task_foo, task_bar)
    except Exception:
        print('One task failed. Canceling all')
        task_foo.cancel()
        task_bar.cancel()
    print('Now we want baz')
    await baz()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

这显然行不通。如您所见,foo 协程如我所愿被取消,但bar2 仍在运行:

Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
One task failed. Canceling all
Now we want baz
Baz 0
Bar2 3
Baz 1
Bar2 4
Baz 2
Bar2 done
Baz 3
Baz 4

所以,我肯定做错了什么。这里的正确方法是什么?

【问题讨论】:

标签: python asynchronous async-await python-asyncio


【解决方案1】:

当您调用task_bar.cancel() 时,任务已经完成,因此没有任何效果。作为gather docs state

如果return_exceptions为真,任务中的异常被视为成功的结果,并聚集在结果列表中; 否则,第一个引发的异常将立即传播到返回的未来。

这正是正在发生的事情,将您的 task_bar 协程稍微修改为:

async def bar():
    try:
        await asyncio.gather(bar1(), bar2())
    except Exception:
        print("Got a generic exception on bar")
        raise

输出:

Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
Got a generic exception on bar
One task failed. Canceling all
<Task finished coro=<bar() done, defined at cancel_nested_coroutines.py:11> exception=Exception('Boom!',)>
Now we want baz
Baz 0
Bar2 3
Baz 1
Bar2 4
Baz 2
Bar2 done
Baz 3
Baz 4

我也在task_bar.cancel()调用之前打印task_bar,注意它已经完成了,所以调用cancel没有任何效果。

就解决方案而言,我认为生成协程需要处理取消它安排的协程,因为一旦协程完成,我就找不到检索它们的方法(除了滥用Task.all_tasks,这听起来是错误的)。

话虽如此,我必须使用 wait 而不是 gather 并返回第一个异常,这是一个完整的示例:

import asyncio


async def foo():
    for i in range(5):
        print('Foo', i)
        await asyncio.sleep(0.5)
    print('Foo done')


async def bar():
    done, pending = await asyncio.wait(
        [bar1(), bar2()], return_when=asyncio.FIRST_EXCEPTION)

    for task in pending:
        task.cancel()

    for task in done:
        task.result()  # needed to raise the exception if it happened


async def bar1():
    await asyncio.sleep(1)
    raise Exception('Boom!')


async def bar2():
    for i in range(5):
        print('Bar2', i)
        await asyncio.sleep(0.5)
    print('Bar2 done')


async def baz():
    for i in range(5):
        print('Baz', i)
        await asyncio.sleep(0.5)


async def main():
    task_foo = asyncio.Task(foo())
    task_bar = asyncio.Task(bar())
    try:
        await asyncio.gather(task_foo, task_bar)
    except Exception:
        print('One task failed. Canceling all')
        print(task_bar)
        task_foo.cancel()
        task_bar.cancel()

    print('Now we want baz')
    await baz()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

哪些输出:

Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
One task failed. Canceling all
<Task finished coro=<bar() done, defined at cancel_nested_coroutines_2.py:11> exception=Exception('Boom!',)>
Now we want baz
Baz 0
Baz 1
Baz 2
Baz 3
Baz 4

这不是很好,但它有效。

【讨论】:

    【解决方案2】:

    据我所知,在取消协程本身时,不可能自动取消协程的所有子任务。所以你必须手动清理子任务。 当等待 asyncio.gather 未来时抛出异常时,您可以通过 Gathering_future 对象的_children 属性访问剩余的任务。 你的例子工作:

    import asyncio
    
    async def foo():
        for i in range(5):
            print('Foo', i)
            await asyncio.sleep(0.5)
        print('Foo2 done')
    
    async def bar():
        gathering = asyncio.gather(bar1(), bar2())
        try:
            await gathering
        except Exception:
            # cancel all subtasks of this coroutine
            [task.cancel() for task in gathering._children]
            raise
    
    async def bar1():
        await asyncio.sleep(1)
        raise Exception('Boom!')
    
    async def bar2():
        for i in range(5):
            print('Bar2', i)
            try:
                await asyncio.sleep(0.5)
            except asyncio.CancelledError:
                # you can cleanup here
                print("Bar2 cancelled")
                break
        else:
            print('Bar2 done')
    
    async def baz():
        for i in range(5):
            print('Baz', i)
            await asyncio.sleep(0.5)
    
    async def main():
        task_foo = asyncio.Task(foo())
        task_bar = asyncio.Task(bar())
        try:
            task = asyncio.gather(task_foo, task_bar)
            await task
        except Exception:
            print('One task failed. Canceling all')
            task_foo.cancel()
            task_bar.cancel()
        print('Now we want baz')
        await baz()
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        try:
            loop.run_until_complete(main())
        finally:
            loop.close()
    

    返回

    Foo 0
    Bar2 0
    Foo 1
    Bar2 1
    Foo 2
    Bar2 2
    Bar2 cancelled
    One task failed. Canceling all
    Now we want baz
    Baz 0
    Baz 1
    Baz 2
    Baz 3
    Baz 4
    

    【讨论】:

    • 不幸的是,这不是我想要的。您所做的实际上只是将代码从main 复制到bar。我认为这本身就是反对 DRY 的。但是,如果我正在等待来自 3rd 方库的协程,这也无法完成。此外,如果您查看以下内容:repl.it/EhTf/0,您可能会看到foo 子任务仍被取消。但如果其中一个子任务引发异常,它不会取消相邻子任务。
    猜你喜欢
    • 2020-12-29
    • 1970-01-01
    • 2018-02-25
    • 2019-03-16
    • 2021-12-29
    • 2020-04-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多