【发布时间】:2020-12-03 18:50:13
【问题描述】:
最近,我问了一个关于如何在部署的 API 中跟踪 for 循环的进度的问题。这是link。
对我有用的solution code 是,
from fastapi import FastAPI, UploadFile
from typing import List
import asyncio
import uuid
context = {'jobs': {}}
app = FastAPI()
async def do_work(job_key, files=None):
iter_over = files if files else range(100)
for file, file_number in enumerate(iter_over):
jobs = context['jobs']
job_info = jobs[job_key]
job_info['iteration'] = file_number
job_info['status'] = 'inprogress'
await asyncio.sleep(1)
jobs[job_key]['status'] = 'done'
@app.get('/')
async def get_testing():
identifier = str(uuid.uuid4())
context['jobs'][identifier] = {}
asyncio.run_coroutine_threadsafe(do_work(identifier), loop=asyncio.get_running_loop())
return {"identifier": identifier}
@app.get('/status/{identifier}')
async def status(identifier):
return {
"status": context['jobs'].get(identifier, 'job with that identifier is undefined'),
}
这样,我可以通过调用status method 使用标识符跟踪do_work 内for 循环的进度
现在,我正在寻找一种方法来并行化 do_work 方法中的 for 循环。
但是如果我使用joblib那么我不知道如何跟踪正在处理的每个文件,迭代计数将毫无意义,因为所有文件都将并行处理。
注意:我只是举了一个joblib的例子,因为我对其他库不是很熟悉。对文件的处理是基于 CPU 的繁重工作。我正在预处理文件并加载 4 个 tensorflow 模型并在文件上进行预测并写入 sql 数据库。
如果有人知道我可以做到的任何方法,请分享并帮助我。
【问题讨论】:
-
是否有理由需要使用 joblib 而不是
asyncio.run_coroutine_threadsafe运行它?有了这样的功能,您可以共享变量,在我看来,这可能是一个伟大而简单的想法 -
不,我只是举了一个
joblib的例子,因为我熟悉那个库,我正在寻找的只是让我的 for 循环执行并行,而不会丢失继续跟踪哪个迭代的功能继续,所以我也跟踪进度 -
根据
joblib的文档,看起来你可以通过共享变量来实现。这可能会导致竞争条件,但如果该函数是唯一正在处理id并且ids 保证是 uniuqe 的人,则情况不应该如此。见joblib.readthedocs.io/en/latest/auto_examples/… -
您能否与我上面给出的代码分享一个工作示例,说明您将如何做到这一点?
id是什么意思?你是说状态标识符吗?
标签: python python-3.x python-asyncio fastapi joblib