【发布时间】:2020-10-22 08:04:54
【问题描述】:
我正在编写一个示例程序,它以块的形式从数据源(csv 或 rdbms)中读取数据,进行一些转换并通过套接字将其发送到服务器。
但是因为 csv 非常大,为了测试目的,我想在几块之后打破阅读。 不幸的是,出了点问题,我不知道是什么以及如何解决它。可能我必须取消一些,但现在确定在哪里以及如何取消。我收到以下错误:
Task was destroyed but it is pending!
task: <Task pending coro=<<async_generator_athrow without __name__>()>>
示例代码为:
import asyncio
import json
async def readChunks():
# this is basically a dummy alternative for reading csv in chunks
df = [{"chunk_" + str(x) : [r for r in range(10)]} for x in range(10)]
for chunk in df:
await asyncio.sleep(0.001)
yield chunk
async def send(row):
j = json.dumps(row)
print(f"to be sent: {j}")
await asyncio.sleep(0.001)
async def main():
i = 0
async for chunk in readChunks():
for k, v in chunk.items():
await asyncio.gather(send({k:v}))
i += 1
if i > 5:
break
#print(f"item in main via async generator is {chunk}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
【问题讨论】:
-
Unfortunately something goes wrong...- 出了什么问题?行为不端如何?你知道你的协程是否按照你想要的方式工作吗?您是否尝试通过检查中间结果来追踪错误? -
你为什么不使用高级api -
asyncio.run()?
标签: python asynchronous python-asyncio coroutine event-loop