【发布时间】:2020-12-23 15:42:17
【问题描述】:
我有一个简单的 FastAPI webapp 正在运行,我希望能够在启动时检查数据库连接(如果失败则重试连接)
我有以下代码,但感觉不对
# main.py
import uvicorn
from backend.app import app
if __name__ == "__main__":
uvicorn.run(app, port=8001)
# app.py
# ... omitted for brevity
from backend.database import notes, tags
# ... omitted for brevity
# database.py
from motor.motor_asyncio import AsyncIOMotorClient
from asyncio import get_event_loop
client = AsyncIOMotorClient("localhost", 27027)
loop = get_event_loop()
data = loop.run_until_complete(client.server_info())
db = client.notes_db
notes = db.notes
tags = db.tags
如果没有get_event_loop() 和随后的loop.run_until_complete() 调用,它不会测试数据库连接,直到您实际尝试访问/写入它。
我的目标是能够停止启动过程,直到它可以成功连接到数据库,是否有任何干净的方法可以使用 Python 和 motor.io(https://motor.readthedocs.io/,抱歉没有标签)?
【问题讨论】:
标签: python-3.x mongodb fastapi