【问题标题】:How to create an async generator in Python?如何在 Python 中创建异步生成器?
【发布时间】:2017-05-12 13:51:33
【问题描述】:

我正在尝试将此 Python2.7 代码重写为新的异步世界秩序:

def get_api_results(func, iterable):
    pool = multiprocessing.Pool(5)
    for res in pool.map(func, iterable):
        yield res

map() 阻塞,直到计算出所有结果,所以我试图将其重写为异步实现,一旦它们准备好就会产生结果。与map() 一样,返回值的返回顺序必须与iterable 相同。我试过这个(我需要requests,因为旧的身份验证要求):

import requests

def get(i):
    r = requests.get('https://example.com/api/items/%s' % i)
    return i, r.json()

async def get_api_results():
    loop = asyncio.get_event_loop()
    futures = []
    for n in range(1, 11):
        futures.append(loop.run_in_executor(None, get, n))
    async for f in futures:
        k, v = await f
        yield k, v

for r in get_api_results():
    print(r)

但是使用 Python 3.6 我得到了:

  File "scratch.py", line 16, in <module>
    for r in get_api_results():
TypeError: 'async_generator' object is not iterable

我怎样才能做到这一点?

【问题讨论】:

  • 不要将事件循环放在异步代码块中,异步代码必须由事件循环运行,而不是相反。
  • 谢谢!当然,我在这里遗漏了一些东西。我见过的所有事件循环示例都使用 loop.run_until_complete(get_api_results()) 在我的理解中,这会使调用阻塞并丢失结果。
  • 你通常会有更多的协程来处理结果,而事件循环会驱动这些。
  • 另外,requests.get() 是一个阻塞调用,不是你可以等待的。
  • 是的,这就是为什么我按照stackoverflow.com/questions/22190403/… 中的建议将它包装在loop.run_in_executor()

标签: python asynchronous async-await generator python-asyncio


【解决方案1】:

关于您的旧 (2.7) 代码 - 多处理被认为是更简单的线程模块的强大替代品,用于并发处理 CPU 密集型任务,其中线程不能很好地工作。您的代码可能不受 CPU 限制 - 因为它只需要发出 HTTP 请求 - 并且线程可能足以解决您的问题。

然而,Python 3+ 没有直接使用threading,而是有一个名为concurrent.futures 的漂亮模块,它通过酷炫的Executor 类提供更简洁的API。这个模块也可用于 python 2.7 作为external package

以下代码适用于 python 2 和 python 3:

# For python 2, first run:
#
#    pip install futures
#
from __future__ import print_function

import requests
from concurrent import futures

URLS = [
    'http://httpbin.org/delay/1',
    'http://httpbin.org/delay/3',
    'http://httpbin.org/delay/6',
    'http://www.foxnews.com/',
    'http://www.cnn.com/',
    'http://europe.wsj.com/',
    'http://www.bbc.co.uk/',
    'http://some-made-up-domain.coooom/',
]


def fetch(url):
    r = requests.get(url)
    r.raise_for_status()
    return r.content


def fetch_all(urls):
    with futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_url = {executor.submit(fetch, url): url for url in urls}
        print("All URLs submitted.")
        for future in futures.as_completed(future_to_url):
            url = future_to_url[future]
            if future.exception() is None:
                yield url, future.result()
            else:
                # print('%r generated an exception: %s' % (
                # url, future.exception()))
                yield url, None


for url, s in fetch_all(URLS):
    status = "{:,.0f} bytes".format(len(s)) if s is not None else "Failed"
    print('{}: {}'.format(url, status))

此代码使用futures.ThreadPoolExecutor,基于线程。很多魔法都在as_completed() 这里使用。

你上面的python 3.6代码,使用run_in_executor()创建一个futures.ProcessPoolExecutor(),并没有真正使用异步IO!!

如果你真的想继续使用 asyncio,你需要使用支持 asyncio 的 HTTP 客户端,例如 aiohttp。这是一个示例代码:

import asyncio

import aiohttp


async def fetch(session, url):
    print("Getting {}...".format(url))
    async with session.get(url) as resp:
        text = await resp.text()
    return "{}: Got {} bytes".format(url, len(text))


async def fetch_all():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, "http://httpbin.org/delay/{}".format(delay))
                 for delay in (1, 1, 2, 3, 3)]
        for task in asyncio.as_completed(tasks):
            print(await task)
    return "Done."


loop = asyncio.get_event_loop()
resp = loop.run_until_complete(fetch_all())
print(resp)
loop.close()

如您所见,asyncio 也有一个as_completed(),现在使用真正的异步 IO,仅在一个进程上使用一个线程。

【讨论】:

  • Since coroutines are generators, it is not possible to use simple "yield"s in them. 有可能。 stackoverflow.com/a/37550568/2908138
  • @im7mortal:谢谢,我已经从答案中删除了这部分。
【解决方案2】:

您将事件循环放在另一个协同程序中。不要那样做。事件循环是异步代码的最外层“驱动程序”,应该同步运行。

如果您需要处理获取的结果,请编写更多执行此操作的协程。他们可以从队列中获取数据,也可以直接驱动获取。

你可以有一个获取和处理结果的主函数,例如:

async def main(loop): 
    for n in range(1, 11):
        future = loop.run_in_executor(None, get, n)
        k, v = await future
        # do something with the result

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))

我也会使用 aiohttp 之类的异步库使 get() 函数正确异步,因此您根本不必使用执行程序。

【讨论】:

    猜你喜欢
    • 2018-12-18
    • 2019-02-25
    • 2019-11-16
    • 2019-04-12
    • 2021-11-18
    • 2021-11-19
    • 2020-11-20
    • 1970-01-01
    • 2010-12-20
    相关资源
    最近更新 更多