【问题标题】:How to use asyncio properly for a generator function?如何为生成器函数正确使用 asyncio?
【发布时间】:2020-10-07 10:14:11
【问题描述】:

我一次读取数千个文件,对于每个文件,我需要在从每个文件中产生行之前对其执行操作。为了提高性能,我想我可以使用 asyncio 在等待读取新文件的同时对文件执行操作(并生成行)。

但是从打印语句中我可以看到所有文件都已打开并收集,然后每个文件都被迭代(与没有 asyncio 的情况相同)。

我觉得我在这里遗漏了一些非常明显的东西,它使我的异步尝试同步。

import asyncio

async def open_files(file):
    with open(file) as file:
        # do stuff
        print('opening files')
        return x

async def async_generator():
    file_outputs = await asyncio.gather(*[open_files(file) for file in files])

    for file_output in file_ouputs:
        print('using open file')
        for row in file_output:
            # Do stuff to row
            yield row

async def main():
    async for yield_value in async_generator():
        pass

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

输出:

opening files
opening files
.
.
.
using open file
using open file

编辑

使用@user4815162342 提供的代码,我注意到,虽然它快了 3 倍,但生成器生成的行集与不使用并发生成的行集略有不同。我还不确定这是因为每个文件都遗漏了一些产量,或者文件是否以某种方式重新排序。于是我从user4815162342引入了如下代码改动,并在pool.submit()中输入了一个锁

我应该在第一次询问时提到,每个文件中的行和文件本身的顺序是必需的。

import concurrent.futures

def open_files(file):
    with open(file) as file:
        # do stuff
        print('opening files')
        return x

def generator():
    m = multiprocessing.Manager()
    lock = m.Lock()
    pool = concurrent.futures.ThreadPoolExecutor()
    file_output_futures = [pool.submit(open_files, file, lock) for file in files]
    for fut in concurrent.futures.as_completed(file_output_futures):
        file_output = fut.result()
        print('using open file')
        for row in file_output:
            # Do stuff to row
            yield row

def main():
    for yield_value in generator():
        pass

if __name__ == '__main__':
    main()

这样我的非并发和并发方法每次都会产生相同的值,但是我失去了使用并发获得的所有速度。

【问题讨论】:

    标签: python io python-asyncio


    【解决方案1】:

    我觉得我在这里遗漏了一些非常明显的东西,它使我的异步尝试同步。

    您的代码存在两个问题。第一个是 asyncio.gather() 设计为等待 所有 个期货并行完成,然后才返回它们的结果。因此,您在生成器中执行的处理不会像您的意图那样与 open_files 中的 IO 穿插,而是在所有对 open_files 的调用都返回后才开始。要在异步调用完成后处理它们,您应该使用类似 asyncio.as_completed 的东西。

    第二个也是更基本的问题是,与可以并行化同步代码的线程不同,asyncio 要求一切从头开始都是异步的。将async 添加到像open_files 这样的函数以使其异步是不够的。您需要检查代码并将任何阻塞调用(例如对 IO 的调用)替换为等效的异步原语。例如,连接网络端口应该使用open_connection,等等。如果您的 async 函数不等待任何内容,就像 open_files 的情况一样,它将像常规函数一样执行,并且您不会获得 asyncio 的任何好处。

    由于您在常规文件上使用 IO,并且操作系统不会为常规文件公开可移植异步接口,因此您不太可能从 asyncio 中获利。有像 aiofiles 这样的库在后台使用线程,但它们可能会使您的代码变慢而不是加速它,因为它们漂亮的异步 API 涉及大量内部线程同步。为了加速你的代码,你可以使用一个经典的线程池,它是 Python 通过concurrent.futures 模块公开的。例如(未经测试):

    import concurrent.futures
    
    def open_files(file):
        with open(file) as file:
            # do stuff
            print('opening files')
            return x
    
    def generator():
        pool = concurrent.futures.ThreadPoolExecutor()
        file_output_futures = [pool.submit(open_files, file) for file in files]
        for fut in file_output_futures:
            file_output = fut.result()
            print('using open file')
            for row in file_output:
                # Do stuff to row
                yield row
    
    def main():
        for yield_value in generator():
            pass
    
    if __name__ == '__main__':
        main()
    

    【讨论】:

    • 代码工作和超级有用的解释,非常感谢!
    • 我在输出中注意到的一件事是我之前没有想到的,使用concurrent.futures.as_completed,操作的fut会改变订单。我为此使用的解决方案是m = multiprocessing.Manager()lock = m.lock()file_output_futures = [pool.submit(open_files, file, lock) for file in files]。这样可以使所有内容保持正确的顺序,但会消除获得的处理速度。有没有办法在不减慢速度的情况下保持一切井井有条? @user4815162342
    • @dreadblenks 我不确定你指的是什么顺序。显然as_completed 将按照它们完成的顺序返回期货,这可能不是它们提交的顺序。如果您需要尊重顺序,那么您也许不能使用并行性?另外,我不清楚你为什么使用multiprocessing.Manager(因为这段代码根本不使用多处理),或者你把m.lock()放在哪里。我建议您编辑问题以更详细地说明您的要求。
    • 有效!我原以为as_completed 是在等待之前的操作完成时执行不同代码的一个组成部分......非常感谢您的帮助!
    • @dreadblenks as_completed 有不同的用途,而且做得正确。它为您提供完成的未来,即您尽快获得每个已完成的未来。例如,如果未来 5 先完成,您将首先获得它以便能够对其做出反应。
    猜你喜欢
    • 2022-01-18
    • 1970-01-01
    • 1970-01-01
    • 2016-01-09
    • 2021-12-28
    • 1970-01-01
    • 2016-09-12
    • 1970-01-01
    • 2019-12-13
    相关资源
    最近更新 更多