【问题标题】:Why is my async function running synchronously Python3.9?为什么我的异步函数在 Python3.9 同步运行?
【发布时间】:2021-03-18 10:25:49
【问题描述】:

我正在尝试使用 asynciofutures 在单独的线程上运行一个函数。我有一个装饰器,它异步获取长时间运行的函数及其参数并输出其值。不幸的是,这些进程似乎没有异步工作。

def multiprocess(self, function, executor=None, *args, **kwargs):
    async def run_task(function, *args, **kwargs):
        @functools.wraps(function)
        async def wrap(*args, **kwargs):
            while True:
                execution_runner = executor or self._DEFAULT_POOL_
                executed_job = execution_runner.submit(function, *args, **kwargs)
                print(
                    f"Pending {function.__name__}:",
                    execution_runner._work_queue.qsize(),
                    "jobs",
                )
                print(
                    f"Threads: {function.__name__}:", len(execution_runner._threads)
                )
                future = await asyncio.wrap_future(executed_job)
                return future

        return wrap

    return asyncio.run(run_task(function, *args, **kwargs))

要调用装饰器,我有两个函数_async_tasktask_function_async_task 包含一个循环,为每个需要处理的文档运行 task_function

@staticmethod
def _async_task(documents):
    processed_docs = asyncio.run(task_function(documents))
    return processed_docs

task_function处理文档中的每个文档如下,

@multiprocess
async def task_function(documents):
    processed_documents = None
    try:
        for doc in documents:
            processed_documents = process_document(doc)
            print(processed_documents)
    except Exception as err:
        print(err)
    return processed_documents

这不能异步工作的线索是我对多线程装饰器的诊断会打印以下内容。

Pending summarise_news: 0 jobs
Threads: summarise_news: 2

由于没有待处理的作业,并且整个过程所花费的时间与同步运行的时间一样长,因此它正在同步运行。

【问题讨论】:

  • 具体来说,您希望在这里异步运行什么? _async_task 只调用一次task_functiontask_function 确实在工作池线程中运行。但是您在_async_task 中使用asyncio.run,这意味着它将阻塞直到task_function 完成。任何后续的 _async_task 调用将在前一个调用完成之前运行。
  • @dano 查看下面的答案。似乎我误解了异步在 python 中的工作方式。基本上我会 task_function 异步工作并将所有异步运行的输出收集到数据结构中,以便下游同步函数可以处理它

标签: python python-3.x python-asyncio python-multithreading concurrent.futures


【解决方案1】:

我在设置您的代码时遇到了一些问题,但我想我已经找到了答案。

首先,正如@dano 在他的评论中提到的,asyncio.run 阻塞直到协程运行完成。因此,您不会从使用这种方法中获得任何加速。

我使用了一个稍微修改过的multiprocess装饰器

def multiprocess(executor=None, *args, **kwargs):
    def run_task(function, *args, **kwargs):
        def wrap(*args, **kwargs):

            execution_runner = executor or DEFAULT_EXECUTOR
            executed_job = execution_runner.submit(function, *args, **kwargs)
            print(
                f"Pending {function.__name__}:",
                execution_runner._work_queue.qsize(),
                "jobs",
            )
            print(
                f"Threads: {function.__name__}:", len(execution_runner._threads)
            )
            future = asyncio.wrap_future(executed_job)

            return future

        return wrap
    return run_task

如您所见,这里没有asyncio.run,并且装饰器和内部包装器都是同步的,因为asyncio.wrap_future 不需要await

更新的multiprocess 装饰器现在与process_document 函数一起使用。这样做的原因是,并行化按顺序调用阻塞函数的函数不会有任何好处。您必须将您的阻塞函数转换为可在执行程序中运行。

注意这个虚拟的 process_document 与我描述的完全一样 - 完全阻塞和同步。

@multiprocess()
def process_document(doc):
    print(f"Processing doc: {doc}...")
    time.sleep(2)
    print(f"Doc {doc} done.")

现在,到最后一点。我们已经通过将process_document 转换为可在执行程序中运行来实现异步,但您如何调用它仍然很重要。

考虑以下示例:

for doc in documents:
    result = await process_document(doc)
results = await asyncio.gather(*[process_document(doc) for doc in documents])

在前一个中,我们将顺序等待协程,必须等到一个完成后才能启动另一个。 在后一个示例中,它们将并行执行,因此它实际上取决于您调用协程执行的准确程度

这是我使用的完整代码:

import asyncio
import concurrent.futures
import time


DEFAULT_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=4)


def multiprocess(executor=None, *args, **kwargs):
    def run_task(function, *args, **kwargs):
        def wrap(*args, **kwargs):

            execution_runner = executor or DEFAULT_EXECUTOR
            executed_job = execution_runner.submit(function, *args, **kwargs)
            print(
                f"Pending {function.__name__}:",
                execution_runner._work_queue.qsize(),
                "jobs",
            )
            print(
                f"Threads: {function.__name__}:", len(execution_runner._threads)
            )
            future = asyncio.wrap_future(executed_job)

            return future

        return wrap
    return run_task


@multiprocess()
def process_document(doc):
    print(f"Processing doc: {doc}...")
    time.sleep(2)
    print(f"Doc {doc} done.")


async def task_function_sequential(documents):
    start = time.time()
    for doc in documents:
        await process_document(doc)

    end = time.time()
    print(f"task_function_sequential took: {end-start}s")


async def task_function_parallel(documents):
    start = time.time()

    jobs = [process_document(doc) for doc in documents]
    await asyncio.gather(*jobs)

    end = time.time()
    print(f"task_function_parallel took: {end-start}s")


async def main():
    documents = [i for i in range(5)]
    await task_function_sequential(documents)
    await task_function_parallel(documents)


asyncio.run(main())

注意task_function_parallel 的例子仍然需要大约 4 秒,而不是 2 秒,因为线程池限制为 4 个 worker,并且作业数为 5,所以最后一个作业将等待一些 worker可用。

【讨论】:

  • 非常感谢所有的细节。这对我理解 python3.7+ 中的异步行为有很大帮助
猜你喜欢
  • 2021-09-23
  • 2021-01-15
  • 1970-01-01
  • 2019-02-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多