【问题标题】:FastAPI `run_in_threadpool` getting stuckFastAPI `run_in_threadpool` 卡住了
【发布时间】:2022-02-01 00:27:23
【问题描述】:

我已经使用异步实现了我的所有路由。并遵循 FastAPI 文档中的所有准则。

每条路由都有多个 DB 调用,不支持异步,所以它们是这样的正常功能

def db_fetch(query):
    # I take a few seconds to respond
    return 

为了避免阻塞我的事件循环,我使用fastapi.concurrancy.run_in_threadpool

现在的问题是,当大量请求到来时,我的新请求会被阻止。即使我关闭浏览器选项卡(取消请求),整个应用程序也会卡住,直到处理较旧的请求。

我在这里做错了什么?

我使用uvicorn 作为我的 ASGI 服务器。我在具有 2 个副本的 Kubernetes 集群中运行。

很少有人怀疑:我是否产生了太多线程?它是 uvicron 中的一些错误吗?不太确定!

【问题讨论】:

  • 确实可能是线程数。您可以尝试使用定义的线程数创建一个专用的 ThreadPool,然后使用预定义的执行程序使用 asyncio run_in_executor。
  • 有道理。最初我打算这样做,但后来有人建议我使用 fastapi 的 run_in_threadpool,因为它使用 AnyIO 来管理线程并负责管理。
  • 这就是 fastapi 或 starlett 在后台使用的 anyio.readthedocs.io/en/stable/threads.html,他们明确指出线程过多可能是个问题
  • 如果我创建一个专用的线程池,会不会导致同样的问题?
  • 我认为你可以限制工作线程的数量docs.python.org/3/library/… 所以如果你将那个池定义一次作为全局变量并使用它,我认为你可以达到这个限制。

标签: python-asyncio fastapi uvicorn starlette anyio


【解决方案1】:

正如您所说的线程过多的问题。在后台,fastapi 使用 starlette,而后者又使用 anyio 的to_thread.run_sync。如here 所述,线程过多可能会导致问题,您可以使用信号量来屏蔽它们,以设置创建的最大线程数的上限。在代码中,大概读起来像

# Core Library
from typing import TypeVar, Callable
from typing_extensions import ParamSpec
# Third party
from anyio import Semaphore
from starlette.concurrency import run_in_threadpool

# To not have too many threads running (which could happen on too many concurrent
# requests, we limit it with a semaphore.
MAX_CONCURRENT_THREADS = 10
MAX_THREADS_GUARD = Semaphore(MAX_CONCURRENT_THREADS)
T = TypeVar("T")
P = ParamSpec("P")


async def run_async(func: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> T:
    async with MAX_THREADS_GUARD:
        return await run_in_threadpool(func, args, kwargs)

【讨论】:

  • 添加这个后,我的其他请求会怎样?现在线程受到限制,它们都处于等待状态,对吗?还是 starlette 会产生一个新的线程池?
  • 最后一行不应该是:return await run_in_threadpool(func, *args, **kwargs) 吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2012-06-23
  • 2021-10-08
  • 2016-11-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多