【发布时间】:2021-03-18 10:25:49
【问题描述】:
我正在尝试使用 asyncio 和 futures 在单独的线程上运行一个函数。我有一个装饰器,它异步获取长时间运行的函数及其参数并输出其值。不幸的是,这些进程似乎没有异步工作。
def multiprocess(self, function, executor=None, *args, **kwargs):
async def run_task(function, *args, **kwargs):
@functools.wraps(function)
async def wrap(*args, **kwargs):
while True:
execution_runner = executor or self._DEFAULT_POOL_
executed_job = execution_runner.submit(function, *args, **kwargs)
print(
f"Pending {function.__name__}:",
execution_runner._work_queue.qsize(),
"jobs",
)
print(
f"Threads: {function.__name__}:", len(execution_runner._threads)
)
future = await asyncio.wrap_future(executed_job)
return future
return wrap
return asyncio.run(run_task(function, *args, **kwargs))
要调用装饰器,我有两个函数_async_task 和task_function。 _async_task 包含一个循环,为每个需要处理的文档运行 task_function。
@staticmethod
def _async_task(documents):
processed_docs = asyncio.run(task_function(documents))
return processed_docs
task_function处理文档中的每个文档如下,
@multiprocess
async def task_function(documents):
processed_documents = None
try:
for doc in documents:
processed_documents = process_document(doc)
print(processed_documents)
except Exception as err:
print(err)
return processed_documents
这不能异步工作的线索是我对多线程装饰器的诊断会打印以下内容。
Pending summarise_news: 0 jobs
Threads: summarise_news: 2
由于没有待处理的作业,并且整个过程所花费的时间与同步运行的时间一样长,因此它正在同步运行。
【问题讨论】:
-
具体来说,您希望在这里异步运行什么?
_async_task只调用一次task_function。task_function确实在工作池线程中运行。但是您在_async_task中使用asyncio.run,这意味着它将阻塞直到task_function完成。任何后续的_async_task调用将在前一个调用完成之前运行。 -
@dano 查看下面的答案。似乎我误解了异步在 python 中的工作方式。基本上我会
task_function异步工作并将所有异步运行的输出收集到数据结构中,以便下游同步函数可以处理它
标签: python python-3.x python-asyncio python-multithreading concurrent.futures