【发布时间】:2020-10-07 10:14:11
【问题描述】:
我一次读取数千个文件,对于每个文件,我需要在从每个文件中产生行之前对其执行操作。为了提高性能,我想我可以使用 asyncio 在等待读取新文件的同时对文件执行操作(并生成行)。
但是从打印语句中我可以看到所有文件都已打开并收集,然后每个文件都被迭代(与没有 asyncio 的情况相同)。
我觉得我在这里遗漏了一些非常明显的东西,它使我的异步尝试同步。
import asyncio
async def open_files(file):
with open(file) as file:
# do stuff
print('opening files')
return x
async def async_generator():
file_outputs = await asyncio.gather(*[open_files(file) for file in files])
for file_output in file_ouputs:
print('using open file')
for row in file_output:
# Do stuff to row
yield row
async def main():
async for yield_value in async_generator():
pass
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
输出:
opening files
opening files
.
.
.
using open file
using open file
编辑
使用@user4815162342 提供的代码,我注意到,虽然它快了 3 倍,但生成器生成的行集与不使用并发生成的行集略有不同。我还不确定这是因为每个文件都遗漏了一些产量,或者文件是否以某种方式重新排序。于是我从user4815162342引入了如下代码改动,并在pool.submit()中输入了一个锁
我应该在第一次询问时提到,每个文件中的行和文件本身的顺序是必需的。
import concurrent.futures
def open_files(file):
with open(file) as file:
# do stuff
print('opening files')
return x
def generator():
m = multiprocessing.Manager()
lock = m.Lock()
pool = concurrent.futures.ThreadPoolExecutor()
file_output_futures = [pool.submit(open_files, file, lock) for file in files]
for fut in concurrent.futures.as_completed(file_output_futures):
file_output = fut.result()
print('using open file')
for row in file_output:
# Do stuff to row
yield row
def main():
for yield_value in generator():
pass
if __name__ == '__main__':
main()
这样我的非并发和并发方法每次都会产生相同的值,但是我失去了使用并发获得的所有速度。
【问题讨论】:
标签: python io python-asyncio