【问题标题】:asyncio task was destroyed but it is pendingasyncio 任务已被销毁,但它处于挂起状态
【发布时间】:2020-10-22 08:04:54
【问题描述】:

我正在编写一个示例程序,它以块的形式从数据源(csv 或 rdbms)中读取数据,进行一些转换并通过套接字将其发送到服务器。

但是因为 csv 非常大,为了测试目的,我想在几块之后打破阅读。 不幸的是,出了点问题,我不知道是什么以及如何解决它。可能我必须取消一些,但现在确定在哪里以及如何取消。我收到以下错误:

Task was destroyed but it is pending!
task: <Task pending coro=<<async_generator_athrow without __name__>()>>

示例代码为:

import asyncio
import json

async def readChunks():
  # this is basically a dummy alternative for reading csv in chunks
  df = [{"chunk_" + str(x) : [r for r in range(10)]} for x in range(10)]
  for chunk in df:
    await asyncio.sleep(0.001)
    yield chunk

async def send(row):
    j = json.dumps(row)
    print(f"to be sent: {j}")
    await asyncio.sleep(0.001)


async def main():
    i = 0
    async for chunk in readChunks():
        for k, v in chunk.items():
            await asyncio.gather(send({k:v}))
        i += 1
        if i > 5:
            break
        #print(f"item in main via async generator is {chunk}")
    

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

【问题讨论】:

  • Unfortunately something goes wrong... - 出了什么问题?行为不端如何?你知道你的协程是否按照你想要的方式工作吗?您是否尝试通过检查中间结果来追踪错误?
  • 你为什么不使用高级api - asyncio.run()

标签: python asynchronous python-asyncio coroutine event-loop


【解决方案1】:

这行得通...

import asyncio
import json
import logging

logging.basicConfig(format='%(asctime)s.%(msecs)03d %(message)s',
                    datefmt='%S')
root = logging.getLogger()
root.setLevel(logging.INFO)

async def readChunks():
  # this is basically a dummy alternative for reading csv in chunks
  df = [{"chunk_" + str(x) : [r for r in range(10)]} for x in range(10)]
  for chunk in df:
    await asyncio.sleep(0.002)
    root.info('readChunks: next chunk coming')
    yield chunk

async def send(row):
    j = json.dumps(row)
    root.info(f"to be sent: {j}")
    await asyncio.sleep(0.002)


async def main():
    i = 0
    root.info('main: starting to read chunks')
    async for chunk in readChunks():
        for k, v in chunk.items():
            root.info(f'main: sending an item')
            #await asyncio.gather(send({k:v}))
            stuff = await send({k:v})
        i += 1
        if i > 5:
            break
        #print(f"item in main via async generator is {chunk}")

##loop = asyncio.get_event_loop()
##loop.run_until_complete(main())
##loop.close()

if __name__ == '__main__':

    asyncio.run(main())

...至少它会运行并完成。


bugs.python.org/issue38013 中描述了通过退出 async for 循环来停止异步生成器的问题,并且看起来它已在 3.7.5 中修复。

但是,使用

loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(main())
loop.close()

我得到一个调试错误,但在 Python 3.8 中没有异常。

Task was destroyed but it is pending!
task: <Task pending name='Task-8' coro=<<async_generator_athrow without __name__>()>>

使用更高级别的 API asyncio.run(main()) with debugging ON 我确实没有收到调试消息。如果您打算尝试升级到 Python 3.7.5-9,您可能仍应使用 asyncio.run()

【讨论】:

  • 你有调试吗? - 让我试试 - 我正在使用 3,8。
  • 在调试模式开启的情况下没有任何报告 - 不确定我是否可以在我的计算机上没有 3.7 的情况下跟踪该问题,我将查看文档。
  • @MisterMiyagi - 看起来是 - bugs.python.org/issue38013 - 它在 3.7.1 中出现,然后在 3.7.5 中修复。跨度>
  • 忘了提及我使用的是 3.7,因为使用 3.8 我无法安装一些可用 C 代码提供的库。
  • @spyder - 看起来它已在 3.7.5 中修复 - 如果它不会破坏您的库,请升级到 3.7.5-3.7.9。
【解决方案2】:

许多async 资源,例如生成器,需要借助事件循环进行清理。当async for 循环通过break 停止迭代异步生成器时,生成器仅由垃圾收集器清理。这意味着任务处于挂起状态(等待事件循环)但被销毁(被垃圾收集器)。

最直接的解决方法是显式地aclose 生成器:

async def main():
    i = 0
    aiter = readChunks()      # name iterator in order to ...
    try:
        async for chunk in aiter:
            ...
            i += 1
            if i > 5:
                break
    finally:
        await aiter.aclose()  # ... clean it up when done

可以使用asyncstdlib 简化这些模式(免责声明:我维护这个库)。 asyncstdlib.islice 允许在干净地关闭生成器之前获取固定数量的项目:

import asyncstdlib as a

async def main():
    async for chunk in a.islice(readChunks(), 5):
        ...

如果break 条件是动态的,scoping the iterator 在任何情况下都保证清理:

import asyncstdlib as a

async def main():
    async with a.scoped_iter(readChunks()) as aiter:
        async for idx, chunk in a.enumerate(aiter):
            ...
            if idx >= 5:
                break

【讨论】:

  • 我不知道 asyncstdlib,干得好 Miyagi 先生!
  • 谢谢!认为要取消某些事情并将某些事情转换为任务。不知道是什么以及如何。您是否知道任何详细但逐步的指南,它会越来越深入并引导我穿越异步世界?
  • 还发现这似乎在没有“最终”的情况下工作:await asyncio.gather(send({k:v}), return_exceptions = True)
  • 我可以把它放在我的代码或每个有async for循环的任务中吗?
【解决方案3】:

您的 readChunks 正在异步运行并且您的循环。如果没有完成程序,你就是在破坏它。

这就是为什么它给asyncio task was destroyed but it is pending

简而言之,异步任务在后台执行其工作,但您通过中断循环(停止程序)将其杀死。

【讨论】:

    【解决方案4】:

    问题很简单。您确实提前退出了循环,但异步生成器尚未用尽(待处理):

    ...
    if i > 5:
        break
    ...
    

    【讨论】:

    • @7u5h4r 和 Alex:是的,我的意图是打破迭代。但我的做法是……至少很糟糕。我正在寻找合适的教授。大大地。所以想找方法在哪里,怎么取消,哪一部分。
    猜你喜欢
    • 2023-03-29
    • 1970-01-01
    • 2020-12-14
    • 2022-10-14
    • 2017-12-17
    • 2016-02-03
    • 2011-06-26
    • 2018-05-30
    • 2015-06-01
    相关资源
    最近更新 更多