async/await 的重点是交错任务,而不是函数/生成器。例如,当您await asyncio.sleep(1) 时,您当前的协程会随着睡眠而延迟。同样,async for 会延迟其协程,直到下一个项目准备好。
为了运行您的单独功能,您必须将每个部分创建为单独的任务。使用Queue 在他们之间交换物品 - 任务只会延迟到他们交换了物品。
from asyncio import Queue, sleep, run, gather
# the original async generator
async def g():
for i in range(3):
await sleep(1)
yield i
async def producer(queue: Queue):
async for i in g():
print('send', i)
await queue.put(i) # resume once item is fetched
await queue.put(None)
async def consumer(queue: Queue):
x = await queue.get() # resume once item is fetched
while x is not None:
print('got', x)
await sleep(2)
x = await queue.get()
async def main():
queue = Queue()
# tasks only share the queue
await gather(
producer(queue),
consumer(queue),
)
run(main())
如果您经常需要此功能,您还可以将其放入一个包装异步可迭代对象的辅助对象中。助手封装了队列和单独的任务。您可以在 async for 语句中将帮助程序直接应用于异步迭代。
from asyncio import Queue, sleep, run, ensure_future
# helper to consume iterable as concurrent task
async def _enqueue_items(async_iterable, queue: Queue, sentinel):
async for item in async_iterable:
await queue.put(item)
await queue.put(sentinel)
async def concurrent(async_iterable):
"""Concurrently fetch items from ``async_iterable``"""
queue = Queue()
sentinel = object()
consumer = ensure_future( # concurrently fetch items for the iterable
_enqueue_items(async_iterable, queue, sentinel)
)
try:
item = await queue.get()
while item is not sentinel:
yield item
item = await queue.get()
finally:
consumer.cancel()
# the original generator
async def g():
for i in range(3):
await sleep(1)
yield i
# the original main - modified with `concurrent`
async def main():
async for x in concurrent(g()):
print(x)
await sleep(2)
run(main())