【问题标题】:Python async coroutines in batches批量 Python 异步协程
【发布时间】:2021-09-09 05:08:48
【问题描述】:

我想批量处理协程,例如:

import asyncio

async def test(i):
    print(f"Start Task {i}")
    await asyncio.sleep(0.1)
    print(f"Finished Task {i}")

async def main():
    for i in range(10):
        await asyncio.gather(*[test(10*i+j) for j in range(10)])


asyncio.run(main())

有没有办法使用 Python 内置函数或库来做到这一点,这样我就不必单独创建批次?

很遗憾

async with asyncio.Semaphore(10):
    await asyncio.gather(*[test(i) for i in range(100)])

没有按预期处理协程: 协同程序是一次性创建的。只有执行是有限的。我不想一次创建所有任务。任务要分批创建。

【问题讨论】:

  • 协程究竟是如何被意外处理的?我测试了这段代码,确实注意到打印返回的顺序是无序的,但奇怪的是不是随机顺序。我认为这只是由于在 python 中如何编写并发,但如果这是问题所在,那么我建议您将输出基于函数的返回值,方法是将 asyncio.gather 设置为一个变量然后打印它。跨度>
  • 如果说“没有按预期处理协程”是指按顺序返回结果,这是因为gather 返回结果的顺序与传递给函数的顺序相同,所以您将首先看到任务 1,其次是任务 2,以此类推。任务实际上以异步批处理 10 次,尝试更改为等待,您会看到结果是随机的,因为等待不保留结果顺序。
  • 抱歉不准确。我更新了我的问题。
  • gather 的原理是同时运行协程...因此您观察到的行为是正确的...我不认为使用for 循环创建您的批次....否则尝试阅读有关线程:docs.python.org/3/library/asyncio-task.html#id10
  • @McDizzy 这是一个合理的解释。一年前我回答了一个类似的问题,它也使用了信号量:stackoverflow.com/a/61778481/5378816

标签: python asynchronous semaphore coroutine gather


【解决方案1】:

这就是我要找的东西:

import asyncio

from aiodecorators import Semaphore


@Semaphore(10)
async def test(i):
    print(f"Start Task {i}")
    await asyncio.sleep(0.1)
    print(f"Finished Task {i}")


async def main():
    await asyncio.gather(*[test(i) for i in range(100)])


asyncio.run(main())

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-02-10
    • 1970-01-01
    • 1970-01-01
    • 2023-03-08
    • 2018-01-07
    • 2015-12-02
    • 1970-01-01
    相关资源
    最近更新 更多