【问题标题】:Python CancelledError with asyncio queuePython CancelledError 与异步队列
【发布时间】:2021-05-16 22:58:53
【问题描述】:

我使用来自this 答案的代码,但是当队列为空时得到asyncio.exceptions.CancelledError。在实际项目中,我将任务从消费者添加到队列中,这就是我使用while True 语句的原因

我压缩该代码以使调试更容易:

import asyncio
import traceback


async def consumer(queue: asyncio.Queue):
    try:
        while True:
            number = await queue.get()  # here is exception
            queue.task_done()
            print(f'consumed {number}')
    except BaseException:
        traceback.print_exc()


async def main():
    queue = asyncio.Queue()
    for i in range(3):
        await queue.put(i)
    consumers = [asyncio.create_task(consumer(queue)) for _ in range(1)]
    await queue.join()
    for c in consumers:
        c.cancel()


asyncio.run(main())

还有错误:

consumed 0
consumed 1
consumed 2
Traceback (most recent call last):
  File "/Users/abionics/Downloads/BaseAsyncScraper/ttt.py", line 8, in consumer
    number = await queue.get()
  File "/usr/local/Cellar/python@3.9/3.9.4/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/queues.py", line 166, in get
    await getter
asyncio.exceptions.CancelledError

顺便说一句,queue.get() 的文档说If queue is empty, wait until an item is available。这个错误的真正原因是什么?也许有更好的解决方案?

【问题讨论】:

    标签: python queue python-asyncio producer-consumer


    【解决方案1】:

    原因是因为你取消了任务:

    Task.cancel:

    请求取消任务。

    这安排了一个 CancelledError 异常被抛出到 在事件循环的下一个循环中包装协程。

    您有几个选择来处理这个问题:

    1.使用asyncio.gather

    如果 return_exceptions 为 True,则异常被视为与 成功的结果,并汇总在结果列表中。

    await queue.join()
    
    for c in consumers:
        c.cancel()
    
    await asyncio.gather(*consumers, return_exceptions=True)
    

    2。在消费者中捕获异常

    async def consumer(q):
        while True:
            num = await q.get()
    
            try:                
                print(f"Working on: {num}")
            except asyncio.CancelledError:
                print(f"Exiting...")
                break
            finally:
                q.task_done()
    

    3.抑制异常

    form contextlib import suppress
    
    async def consumer(q):
        with suppress(asyncio.CancelledError):
            while True:
                num = await q.get()
                print(f"Working on: {num}")
                q.task_done()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-10-06
      • 2012-12-31
      • 2013-09-10
      相关资源
      最近更新 更多