【问题标题】:Utilizing asyncio generators and asyncio.as_completed利用 asyncio 生成器和 asyncio.as_completed
【发布时间】:2019-10-01 20:18:21
【问题描述】:

我有一些代码用于抓取 url,解析信息,然后使用 SQLAlchemy 将其放入数据库。我正在尝试异步执行此操作,同时限制同时请求的最大数量。

这是我的代码:

async def get_url(aiohttp_session, url1, url2):
    async with session.get(url1) as r_url1:
       if r_url1.status == 200:
          async with session.get(url2) as r_url2:
             if r_url2.status == 200:
                return await r_url1.json(), await r_url2.json()

async def url_generator(formatted_start_date, formatted_end_date, machine_id, interval):
    interval_start = formatted_start_date
    interval_end = formatted_start_date + interval

    while interval_end <= formatted_end_date:
        yield (f"https://example.org/start={interval_start}"
               f"Start={datetime.strftime(interval_start, DATETIME_FORMAT)}"
               f"&End={datetime.strftime(interval_end, DATETIME_FORMAT)}"
               f"&machines={machine_id}",
               f"https://example.org/start={interval_start}"
               f"Start={datetime.strftime(interval_start, DATETIME_FORMAT)}"
               f"&End={datetime.strftime(interval_end, DATETIME_FORMAT)}"
               f"&machines={machine_id}&groupby=Job"
               )
        interval_start += interval
        interval_end += interval

async def parse(database, url1_json, url2_json):
    """ Do some parsing and save it using credentials stored in the database object """


def main(database, formatted_start_date, formatted_end_date, machine_id, interval):
    async for url1_json, url2_json in asyncio.as_completed(url_generator(formatted_start_date, formatted_end_date, machine_id, interval)):
         parse(database, url1_json, url2_json)

我收到错误yield from should be used as context manager expression

我已经尝试查看 documentation here 以及同步原语,但仍然对我出错的地方以及我应该如何从我的生成器创建任务感到困惑。

【问题讨论】:

    标签: python python-asyncio python-3.7


    【解决方案1】:

    贴出的代码有几个问题:

    • 您正在尝试将as_completed 用作异步迭代器,并使用async for 对其结果进行迭代。但是,as_completed 不会返回异步迭代器(至少 not yet),必须使用常规的 for 进行迭代,并像 shown in the docs 一样显式等待每个生成的对象。

    • 您将异步迭代器传递给as_completed,而它接受普通容器或(常规)可迭代器。

    • 您在未使用async def 定义的函数中使用async for,这应该是语法错误。另外,parse() 被定义为协程,你不用等待它。

    好消息是,由于url_generator 已经是一个生成器,所以您根本不需要as_completed,您应该能够对其进行迭代:

    async def main(database, formatted_start_date, formatted_end_date,
                   machine_id, interval):
        async for url1_json, url2_json in url_generator(
                formatted_start_date, formatted_end_date,
                machine_id, interval)):
            await parse(database, url1_json, url2_json)
    

    但是请注意,async for 不会自动并行化迭代,它只会允许其他协程与迭代的协程并行运行。要并行化迭代,您需要调用create_task 来并行提交任务,并使用asyncio.Semaphore 来限制并行任务的数量。例如:

    async def parse(database, url1_json, url2_json, limit):
        # async with applied to a semaphore ensures that no more than N
        # coroutines that use the same semaphore enter the "with" block
        # in parallel
        async with limit:
            ... code goes here ...
    
    async def main(database, formatted_start_date, formatted_end_date,
                   machine_id, interval):
        limit = asyncio.Semaphore(10)
    
        # create all coroutines in advance using create_task
        # and run them in parallel, relying on the semaphore
        # limit the number of simultaneous requests
        tasks = []
        async for url1_json, url2_json in url_generator(
                formatted_start_date, formatted_end_date,
                machine_id, interval)):
            # this create_task just creates the task - it will
            # start running when we return to the event loop
            tasks.append(asyncio.create_task(parse(database, url1_json, url2_json, limit))
    
        # suspend to the event loop, resuming this coroutine only after
        # all the tasks have finished (or any of them raises)
        await asyncio.gather(*tasks)
    

    请注意,url_generator 不需要异步,因为它不需要 await 任何东西。您可以使用def 定义它并使用for 对其进行迭代。

    【讨论】:

    • 感谢您的深入回答。我让url_generator 成为生成器的部分原因是我不想同时将所有任务保存在内存中。任务列表可能有数百万。当信号量调用release() 时,您在底部的方法是否仅调用next(),还是同时将所有任务保存在内存中?
    • @TMarks 答案中的代码同时将所有任务保存在内存中,尽管它们中的大多数将是空的,因为它们会在信号量上等待。尽管如此,如果您正在处理数百万个项目,您可能希望避免提前创建数百万个任务,并让它们“争夺”信号量。 (战斗在算法上应该仍然有效,但你会浪费内存。)
    • @TMarks 您可以创建一个有界队列,创建固定数量的协程(尽可能多的同时访问限制)来排空队列/联系数据库,然后填充来自main() 的队列。有关示例,请参见 this answer
    • 感谢@user4815162342,感谢您为回答所做的所有努力。我会去查看您发布的链接并尝试实施类似的解决方案。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-11-08
    • 2016-09-13
    • 2012-09-11
    • 2016-08-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多