【发布时间】:2021-09-09 05:08:48
【问题描述】:
我想批量处理协程,例如:
import asyncio
async def test(i):
print(f"Start Task {i}")
await asyncio.sleep(0.1)
print(f"Finished Task {i}")
async def main():
for i in range(10):
await asyncio.gather(*[test(10*i+j) for j in range(10)])
asyncio.run(main())
有没有办法使用 Python 内置函数或库来做到这一点,这样我就不必单独创建批次?
很遗憾
async with asyncio.Semaphore(10):
await asyncio.gather(*[test(i) for i in range(100)])
没有按预期处理协程: 协同程序是一次性创建的。只有执行是有限的。我不想一次创建所有任务。任务要分批创建。
【问题讨论】:
-
协程究竟是如何被意外处理的?我测试了这段代码,确实注意到打印返回的顺序是无序的,但奇怪的是不是随机顺序。我认为这只是由于在 python 中如何编写并发,但如果这是问题所在,那么我建议您将输出基于函数的返回值,方法是将 asyncio.gather 设置为一个变量然后打印它。跨度>
-
如果说“没有按预期处理协程”是指按顺序返回结果,这是因为
gather返回结果的顺序与传递给函数的顺序相同,所以您将首先看到任务 1,其次是任务 2,以此类推。任务实际上以异步批处理 10 次,尝试更改为等待,您会看到结果是随机的,因为等待不保留结果顺序。 -
抱歉不准确。我更新了我的问题。
-
gather的原理是同时运行协程...因此您观察到的行为是正确的...我不认为使用for循环创建您的批次....否则尝试阅读有关线程:docs.python.org/3/library/asyncio-task.html#id10 -
@McDizzy 这是一个合理的解释。一年前我回答了一个类似的问题,它也使用了信号量:stackoverflow.com/a/61778481/5378816
标签: python asynchronous semaphore coroutine gather