【问题标题】:Combining asyncio with a multi-worker ProcessPoolExecutor and for async将 asyncio 与多工人 ProcessPoolExecutor 和异步相结合
【发布时间】:2019-05-06 18:26:02
【问题描述】:

我的问题与Combining asyncio with a multi-worker ProcessPoolExecutor 非常相似-但是稍有变化(我相信是async for)使那里的出色答案对我来说无法使用。

我正在尝试以下 MWE:

import concurrent.futures
import asyncio
import time

async def mygen(u: int = 2):
    i = 0
    while i < u:
        yield i
        i += 1

def blocking(delay):
    time.sleep(delay+1)
    return('EXECUTOR: Completed blocking task number ' + str(delay+1))

async def non_blocking(loop):
    with concurrent.futures.ProcessPoolExecutor() as executor:
        async for i in mygen():
            print('MASTER: Sending to executor blocking task number ' + str(i+1))
            result = await loop.run_in_executor(executor, blocking, i)
            print(result)
            print('MASTER: Well done executor - you seem to have completed blocking task number ' + str(i+1))

loop = asyncio.get_event_loop()
loop.run_until_complete(non_blocking(loop))

正如预期的那样,由此产生的输出不是异步的:

MASTER: Sending to executor blocking task number 1
EXECUTOR: Completed blocking task number 1
MASTER: Well done executor - you seem to have completed blocking task number 1
MASTER: Sending to executor blocking task number 2 
EXECUTOR: Completed blocking task number 2 
MASTER: Well done executor - you seem to have completed blocking task number 2

我想调整代码,使任务在两个并发进程中运行,并在输出可用时打印输出。期望的输出是:

MASTER: Sending to executor blocking task number 1
MASTER: Sending to executor blocking task number 2
EXECUTOR: Completed blocking task number 1
MASTER: Well done executor - you seem to have completed blocking task number 1
EXECUTOR: Completed blocking task number 2
MASTER: Well done executor - you seem to have completed blocking task number 2

我从Combining asyncio with a multi-worker ProcessPoolExecutor 了解到,就目前情况而言,我的await loop.run_in_executor() 语法是阻塞的。我不知道如何以允许async for 在等待执行者完成工作时移动到下一个生成值的方式替换它。注意我没有像他们的例子那样使用asyncio.gather

【问题讨论】:

    标签: python python-asyncio concurrent.futures


    【解决方案1】:

    如果您希望最多有两个进程运行您的任务,最简单的实现方法是使用max_workers=2 创建执行程序。然后您可以尽快提交任务,即继续 async for 的下一次迭代,而无需等待上一个任务完成。您可以在最后收集所有任务的结果,以确保异常不会被忽视(并可能获得实际结果)。

    以下代码产生预期的输出:

    from concurrent.futures import ProcessPoolExecutor
    import asyncio
    import time
    
    async def mygen(u: int = 2):
        i = 0
        while i < u:
            yield i
            i += 1
    
    def blocking(delay):
        time.sleep(delay+1)
        return('EXECUTOR: Completed blocking task number ' + str(delay+1))
    
    async def run_blocking(executor, task_no, delay):
        print('MASTER: Sending to executor blocking task number '
              + str(task_no))
        result = await loop.run_in_executor(executor, blocking, delay)
        print(result)
        print('MASTER: Well done executor - you seem to have completed '
              'blocking task number ' + str(task_no))
    
    async def non_blocking(loop):
        tasks = []
        with ProcessPoolExecutor(max_workers=2) as executor:
            async for i in mygen():
                # spawn the task and let it run in the background
                tasks.append(asyncio.create_task(
                    run_blocking(executor, i + 1, i)))
            # if there was an exception, retrieve it now
            await asyncio.gather(*tasks)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(non_blocking(loop))
    

    【讨论】:

    • mygen 从一个永不停止让步的队列中让步时,您将如何重组它? (生产者/消费者模式)。我想不出一个好方法让进程池执行器从队列中获取它需要的东西,由 max_workers 限制
    • @John 你可以创建一个 Semaphore,将它传递给 run_blocking(),然后在 await 周围获取它。
    • 啊,我明白了,如此有效地在 asyncio 中限制对 ProcessPoolExecutor 的调用,而不是回复池来完成工作? - 谢谢你的回复!
    • @John 想一想,如果mygen() 生成项目的速度超过了执行程序的执行速度,这也不起作用。我想为此你需要一个通往异步工作者池的队列。
    猜你喜欢
    • 1970-01-01
    • 2019-04-15
    • 2022-12-21
    • 2020-02-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-12-11
    • 1970-01-01
    相关资源
    最近更新 更多