【问题标题】:Python async-generator not asyncPython异步生成器不是异步的
【发布时间】:2019-11-16 00:53:58
【问题描述】:

我的代码如下。我希望两个 sleep 可以共享相同的时间框架并花费 1+2*3=7 秒来运行脚本。 但是好像出了点问题,所以还是需要 3*(1+2) 秒。

有没有办法修改代码?

import asyncio

async def g():
    for i in range(3):
        await asyncio.sleep(1)
        yield i

async def main():
    async for x in g():
        print(x)
        await asyncio.sleep(2)

loop = asyncio.get_event_loop()
res = loop.run_until_complete(main())
loop.close()

【问题讨论】:

  • 在屈服之前,您仍然在g 中运行sleep,因此在main 中的睡眠之前。 async/await 语法的存在是为了同时执行多个任务,而不是同时执行一个任务。你只有 一个 任务,所以没有什么可以同时运行。
  • 您的预期结果是什么?你期望g 在睡觉前产生i 吗?您是否希望maing 休眠时接收x?您是否希望 gmain 睡觉时准备下一个 i
  • 我假设您的sleep 是您真实代码中的一些实际工作?是计算还是 I/O?
  • 是的,我希望gmain 睡觉时准备下一个isleep 是我真实代码中的一些实际工作吗?

标签: python asynchronous generator


【解决方案1】:

作为使用队列执行此操作的替代方法,此解决方案将 Futures 链接在一起,以便 Future 的结果是当前项目,另一个 Future 检索下一个项目(有点像链表,可以这么说):

from asyncio import sleep, get_event_loop, run, create_task

async def aiter(fut, async_generator):
    try:
        async for item in async_generator:
            fut, prev_fut = get_event_loop().create_future(), fut
            prev_fut.set_result((item, fut))
        else:
            fut.set_exception(StopAsyncIteration())
    except Exception as e:
        fut.set_exception(e)


async def concurrent(async_generator):
    fut = get_event_loop().create_future()
    create_task(aiter(fut, async_generator))

    try:
        while True:
            item, fut = await fut
            yield item
    except StopAsyncIteration as e:
        return

作为一个额外的好处,此解决方案将通过重新引发 main() 方法中的异常并使用对调试有用的回溯来正确处理 g() 中发生的异常。

【讨论】:

  • 非常好,因为它保证了恒定的内存使用!
【解决方案2】:

async/await 的重点是交错任务,而不是函数/生成器。例如,当您await asyncio.sleep(1) 时,您当前的协程会随着睡眠而延迟。同样,async for 会延迟其协程,直到下一个项目准备好。

为了运行您的单独功能,您必须将每个部分创建为单独的任务。使用Queue 在他们之间交换物品 - 任务只会延迟到他们交换了物品。

from asyncio import Queue, sleep, run, gather


# the original async generator
async def g():
    for i in range(3):
        await sleep(1)
        yield i


async def producer(queue: Queue):
    async for i in g():
        print('send', i)
        await queue.put(i)  # resume once item is fetched
    await queue.put(None)


async def consumer(queue: Queue):
    x = await queue.get()  # resume once item is fetched
    while x is not None:
        print('got', x)
        await sleep(2)
        x = await queue.get()


async def main():
    queue = Queue()
    # tasks only share the queue
    await gather(
        producer(queue),
        consumer(queue),
    )


run(main())

如果您经常需要此功能,您还可以将其放入一个包装异步可迭代对象的辅助对象中。助手封装了队列和单独的任务。您可以在 async for 语句中将帮助程序直接应用于异步迭代。

from asyncio import Queue, sleep, run, ensure_future


# helper to consume iterable as concurrent task
async def _enqueue_items(async_iterable, queue: Queue, sentinel):
    async for item in async_iterable:
        await queue.put(item)
    await queue.put(sentinel)


async def concurrent(async_iterable):
    """Concurrently fetch items from ``async_iterable``"""
    queue = Queue()
    sentinel = object()
    consumer = ensure_future(  # concurrently fetch items for the iterable
        _enqueue_items(async_iterable, queue, sentinel)
    )
    try:
        item = await queue.get()
        while item is not sentinel:
            yield item
            item = await queue.get()
    finally:
        consumer.cancel()


# the original generator
async def g():
    for i in range(3):
        await sleep(1)
        yield i


# the original main - modified with `concurrent`
async def main():
    async for x in concurrent(g()):
        print(x)
        await sleep(2)


run(main())

【讨论】:

  • @cjs840312 由于这是一种经常发生的模式,我现在添加了一个示例帮助程序,它可以围绕现有的异步生成器进行包装。
猜你喜欢
  • 1970-01-01
  • 2010-12-20
  • 2017-03-17
  • 1970-01-01
  • 2020-11-20
  • 1970-01-01
  • 2021-08-24
  • 1970-01-01
  • 2015-08-08
相关资源
最近更新 更多