【发布时间】:2021-01-27 00:13:52
【问题描述】:
我有以下代码从数据库(read_db)读取数据并将数据写入镶木地板文件(data.to_parquet)。两个 I/O 操作都需要一段时间才能运行。
def main():
while id < 1000:
logging.info(f'reading - id: {id}')
data = read_db(id) # returns a dataframe
logging.info(f'saving - id: {id}')
data.to_parquet(f'{id}.parquet')
logging.info(f'saved - id: {id}')
id += 1
它很慢,所以我希望 read_db(n+1) 和 to_parquet(n) 同时运行。我需要保持id 的每个步骤按顺序完成(read_db(n+1) 需要在read_db(n) 之后运行,data.to_parquet(n+1) 在data.to_parquet(n) 之后运行。)。这是异步版本
def async_wrap(f):
@wraps(f)
async def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
p = partial(f, *args, **kwargs)
return await loop.run_in_executor(executor, p)
return run
async def main():
read_db_async = async_wrap(read_db)
while id < 1000:
logging.info(f'reading - id: {id}')
data = await read_db_async(id) # returns a dataframe
logging.info(f'saving - id: {id}')
to_parquet_async = async_wrap(data.to_parquet)
await data.to_parquet(f'{id}.parquet')
logging.info(f'saved - id: {id}')
id += 1
asyncio.get_event_loop().run_until_complete(main())
我只是看到一些乱序的日志:
reading - id: 1
saving - id: 1 (saving 1 and reading 2 run in parallel)
reading - id: 2
saved - id: 1
saving - id: 2
reading - id: 3
saved - id: 2
.....
但是,实际上日志和同步代码是一样的吗?
reading - id: 1
saving - id: 1
saved - id: 1
reading - id: 2
saving - id: 2
saved - id: 2
reading - id: 3
.....
【问题讨论】:
-
这能回答你的问题吗? How to run tasks concurrently in asyncio?
-
不,答案是并行运行所有任务,这是我需要避免的。我只需要并行运行一些步骤。
-
您可以并行运行任意数量的任务。不一定是全部。
-
您的解决方案中只有一个协程在运行,即以
main()开头的协程。await data.to_parquet(f'{id}.parquet')表示当前协程将 sleep 直到to_parquet完成,因此在此之前它不会开始下一次迭代。查看this question 获取基本示例 -
编辑:我说“只有一个协程在运行”,但这并不完全准确,因为异步调用会创建新的协程。问题是您正在等待那些新的协程完成以恢复原来的协程。
标签: python python-asyncio