【问题标题】:SQLAlchemy listen to connection/disconnection eventsSQLAlchemy 监听连接/断开事件
【发布时间】:2021-10-14 22:11:35
【问题描述】:

TL;DR:是否可以使用 SQLAlchemy ORM 监听 DB Connect 和 Disconnect 的事件

背景:

我有一个使用SQLAlchemy ORM 连接到 Postgres 数据库的 Flask 应用程序。 (Flask-SQLAlchemy)。一切正常,以下代码没有错误

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://<username>:<password>@<host>:<port>/<db>'
db = SQLAlchemy(app)

由于网络问题,与数据库的连接失败或断开。访问数据库时会产生错误。按设计 SQLAlchemy 在后台处理所有连接,并且在处理连接断开和重新建立时是无缝的。

要求

要监控和处理这些连接中断,我需要触发连接和断开连接的事件

通读文档我发现connectcloseengine_connect 事件可以绑定到引擎

def connect(dbapi_connection, connection_record):
    print('DB Connected')


def disconnect(dbapi_connection, connection_record):
    print('DB Disconnected')


event.listen(db.engine, 'connect', connect)
event.listen(db.engine, 'engine_connect', connect)
event.listen(db.engine, 'close', connect)

同样,还有close_detachedfirst_connect等其他事件。

请注意,SQLAlchemy 建立了一个有点“惰性连接”,即仅当调用数据库时 第一次建立实际连接,因此db.engine.connect() 需要调用connect 事件才能触发。

预期行为

  1. 无论是在应用程序启动时还是在 生命周期。应该触发回调函数connect()
  2. 每当由于数据库宕机或网络问题导致连接丢失时,应触发回调函数disconnect()

当前行为

  1. 在第一次建立连接时调用connect()。无论连接是否断开并重新连接connect() 都不会再次调用
  2. 在连接丢失时,disconnect() 永远不会被调用,我可能正在监听错误的事件。

为了测试,我在本地安装了 PG 服务器,我正在启动和停止服务。并使用psql cli 验证连接


问题

我查看了 SQLAlchemy Core 以找到合适的 events 和轮询机制 但还没有找到任何可行的方法。

SQLAlchemy 在不使用时不会查看连接,因此似乎需要轮询。这是否意味着 我需要每 n 秒运行一次查询,因为在我的情况下,db.connection() 在连接时不会引发错误 中途迷路了。

我了解 Flask 在单个进程中运行的局限性,本质上不支持异步调用。 想知道就算不是Flask怎么实现,需要什么样的轮询

我还没有准备好接受像SQLAlchemy这样成熟的工具不支持这个,但是我花了几天时间并没有找到 一个可行的解决方案。任何帮助或指导表示赞赏。

额外问题:

在 Flask 启动时检查与 DB 的连接是否处于活动状态的适当方法是什么。 目前,我在初始化后调用db.engine.connect(),如果未连接数据库,则会引发错误。

相关SO问题:How to verify SqlAlchemy engine object

有关 SQLAlchemy 如何处理断开连接的更多信息 - Dealing with Disconnects

【问题讨论】:

    标签: python postgresql flask sqlalchemy flask-sqlalchemy


    【解决方案1】:

    嗯,我正在使用 Sqlalchemy 的连接事件来设置time zone。当我手动重新启动数据库时,我可以确认连接事件可靠地工作。我将 sqlalchemy 1.3.24 与 flask_sqlalchemy 2.5.1 和 MariaDB 10.3.17 一起使用。

    我的代码基本上是这样写的:

    app.config.update(SQLALCHEMY_DATABASE_URI="mysql+pymysql://..."))
    db = SQLAlchemy(app)
    event.listen(db.engine, 'connect', set_time_zone)
    

    如果我现在启动数据库、启动应用程序并执行请求,则会调用我的连接回调。如果我现在重新启动数据库,下一个请求将失败(如预期):

    sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (1927, '连接被终止')

    之后,下一个请求会再次调用connect回调,应用正常工作。

    我必须非常小心的唯一一件事是在引发 OperationalError 时回滚会话。如果我不这样做,则需要重新启动烧瓶。

    所以每个 api 端点都是这样的:

    def whatever():
        from .. import db
        session = db.session
    
        try:
            # do your job
        except Exception:
            logger.error("Error: Doing whatever failed")
            print_exc()
            session.rollback()
    

    【讨论】:

    • 您描述的行为是当前行为。如果您阅读了我的预期行为,我希望该事件触发我是否对数据库执行操作。不是在断开连接和回滚时抛出错误。
    猜你喜欢
    • 2013-01-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-04-26
    • 2013-08-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多