【问题标题】:Retry failed sqlalchemy queries重试失败的 sqlalchemy 查询
【发布时间】:2019-04-16 15:40:04
【问题描述】:

每次我重新启动 mysql 服务时,我的应用都会在任何查询中收到以下错误:

result = self._query(query)
  File "/usr/local/lib/python3.6/site-packages/pymysql/cursors.py", line 328, in _query
    conn.query(q)
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 516, in query
    self._affected_rows = self._read_query_result(unbuffered=unbuffered)
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 727, in _read_query_result
    result.read()
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 1066, in read
    first_packet = self.connection._read_packet()
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 656, in _read_packet
    packet_header = self._read_bytes(4)
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 702, in _read_bytes
    CR.CR_SERVER_LOST, "Lost connection to MySQL server during query")
sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (2013, 'Lost connection to MySQL server during query') [SQL: ...] [parameters: {...}] (Background on this error at: http://sqlalche.me/e/e3q8)

之后的任何查询都会照常成功。

这只是一个常见的用例,通常我可能想根据错误重试任何查询。

有什么方法可以在一些低级别的sqlalchemy api 中捕获并重试查询?在我的代码中使用 try-except 或自定义 query 方法是不合理的,因为我使用它的次数太多且不可维护。

【问题讨论】:

  • 自定义查询或围绕它的包装有什么问题?在这种情况下,将其用作每个查询的 try... except... 块的基础似乎是最明智的解决方案。你有minimal reproducible example格式的sn-p代码来演示这个问题吗?

标签: python mysql sqlalchemy flask-sqlalchemy retry-logic


【解决方案1】:

显然sqlalchemy 有一个很好的选项来自定义查询类,这正是我所需要的。

类实现:

import logging
from flask_sqlalchemy import BaseQuery
from sqlalchemy.exc import OperationalError
from time import sleep

class RetryingQuery(BaseQuery):

    __retry_count__ = 3
    __retry_sleep_interval_sec__ = 0.5

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def __iter__(self):
        attempts = 0
        while True:
            attempts += 1
            try:
                return super().__iter__()
            except OperationalError as ex:
                if "Lost connection to MySQL server during query" not in str(ex):
                    raise
                if attempts < self.__retry_count__:
                    logging.debug(
                        "MySQL connection lost - sleeping for %.2f sec and will retry (attempt #%d)",
                        self.__retry_sleep_interval_sec__, attempts
                    )
                    sleep(self.__retry_sleep_interval_sec__)
                    continue
                else:
                    raise

用法:

class BaseModel(Model):
    ...
    query_class = RetryingQuery
    ...

db = SQLAlchemy(model_class=BaseModel, query_class=RetryingQuery)

【讨论】:

  • 你能解释一下它背后的逻辑吗?
  • @Deva 通过使用这种错误拦截,您可以处理所有查询,而无需包装所有sqlalchemy api
  • 我是一个初学者,你能解释一下如何覆盖 iter 我们能够重试查询方法
  • @Deva 这需要深入挖掘sqlalchemy 代码。大多数查询执行器最终都调用__iter__ 作为最低级别的公共点。我从他们的源代码和反复试验中发现了这一点。它绝对不是失败证明,可能无法涵盖所有​​情况
  • 您好-这里是 SQLAlchemy 作者。我刚刚被指出这个食谱。我强烈建议不要使用上述模式。连接的“重试”应仅在事务的顶层执行,这就是池 pre_ping 功能的用途。如果您在事务中间丢失连接,则需要重新运行整个操作。显式优于隐式。
【解决方案2】:

非常感谢这个 sn-p,我不得不对其进行一些调整以直接使用 sqlalchemy.orm:如果它可以对任何人有用..

from sqlalchemy.exc import OperationalError, StatementError
from sqlalchemy.orm.query import Query as _Query
from time import sleep

class RetryingQuery(_Query):
    __max_retry_count__ = 3

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def __iter__(self):
        attempts = 0
        while True:
            attempts += 1
            try:
                return super().__iter__()
            except OperationalError as ex:
                if "server closed the connection unexpectedly" not in str(ex):
                    raise
                if attempts <= self.__max_retry_count__:
                    sleep_for = 2 ** (attempts - 1)
                    logging.error(
                        "/!\ Database connection error: retrying Strategy => sleeping for {}s"
                    " and will retry (attempt #{} of {}) \n Detailed query impacted: {}".format(
                        sleep_for, attempts, self.__max_retry_count__, ex)
                )
                    sleep(sleep_for)
                    continue
                else:
                    raise
            except StatementError as ex:
                if "reconnect until invalid transaction is rolled back" not in str(ex):
                    raise
                self.session.rollback()

对于用法:将选项传递给 sessionmaker:

sqlalchemy.orm.sessionmaker(bind=engine, query_cls=RetryingQuery)

【讨论】:

    【解决方案3】:

    我不得不稍微调整一下以使其与 Postgres 一起工作,后者有不同的错误消息。我知道这个问题被标记为mysql,但通过搜索找到了这个问题(并且遇到了完全相同的问题),因此可能会对某人有所帮助。

    我还必须赶上StatementError: (sqlalchemy.exc.InvalidRequestError) Can't reconnect until invalid transaction is rolled back,它在重试发生之前炸毁了 Flask。

    最后我做了指数退避,因为为什么不?

    import logging
    from flask_sqlalchemy import BaseQuery
    from sqlalchemy.exc import OperationalError, StatementError
    from time import sleep
    
    class RetryingQuery(BaseQuery):
        __retry_count__ = 3
    
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
    
        def __iter__(self):
            attempts = 0
            while True:
                attempts += 1
                try:
                    return super().__iter__()
                except OperationalError as ex:
                    if "server closed the connection unexpectedly" not in str(ex):
                        raise
                    if attempts < self.__retry_count__:
                        sleep_for = 2 ** (attempts - 1)
                        logging.error(
                            "Database connection error: {} - sleeping for {}s"
                            " and will retry (attempt #{} of {})".format(
                                ex, sleep_for, attempts, self.__retry_count__
                            )
                        )
                        sleep(sleep_for)
                        continue
                    else:
                        raise
                except StatementError as ex:
                    if "reconnect until invalid transaction is rolled back" not in str(ex):
                        raise
                    self.session.rollback()
    

    【讨论】:

      【解决方案4】:

      SQLAlchemy 还允许您侦听在创建 connection 之前触发的 engine_connect 事件。这使得实现悲观断开处理的自定义逻辑成为可能

      下面的 sn-p 实现了重试的指数退避。它取自 Apache Airflow 的 SQLAlchemy Utils: http://airflow.apache.org/docs/1.10.3/_modules/airflow/utils/sqlalchemy.html

      @event.listens_for(engine, "engine_connect")
          def ping_connection(connection, branch):
              """
              Pessimistic SQLAlchemy disconnect handling. Ensures that each
              connection returned from the pool is properly connected to the database.
      
              http://docs.sqlalchemy.org/en/rel_1_1/core/pooling.html#disconnect-handling-pessimistic
              """
              if branch:
                  # "branch" refers to a sub-connection of a connection,
                  # we don't want to bother pinging on these.
                  return
      
              start = time.time()
              backoff = initial_backoff_seconds
      
              # turn off "close with result".  This flag is only used with
              # "connectionless" execution, otherwise will be False in any case
              save_should_close_with_result = connection.should_close_with_result
      
              while True:
                  connection.should_close_with_result = False
      
                  try:
                      connection.scalar(select([1]))
                      # If we made it here then the connection appears to be healthy
                      break
                  except exc.DBAPIError as err:
                      if time.time() - start >= reconnect_timeout_seconds:
                          log.error(
                              "Failed to re-establish DB connection within %s secs: %s",
                              reconnect_timeout_seconds,
                              err)
                          raise
                      if err.connection_invalidated:
                          log.warning("DB connection invalidated. Reconnecting...")
      
                          # Use a truncated binary exponential backoff. Also includes
                          # a jitter to prevent the thundering herd problem of
                          # simultaneous client reconnects
                          backoff += backoff * random.random()
                          time.sleep(min(backoff, max_backoff_seconds))
      
                          # run the same SELECT again - the connection will re-validate
                          # itself and establish a new connection.  The disconnect detection
                          # here also causes the whole connection pool to be invalidated
                          # so that all stale connections are discarded.
                          continue
                      else:
                          log.error(
                              "Unknown database connection error. Not retrying: %s",
                              err)
                          raise
                  finally:
                      # restore "close with result"
                      connection.should_close_with_result = save_should_close_with_result
      
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-04-27
        • 1970-01-01
        • 1970-01-01
        • 2014-07-14
        • 2016-02-13
        相关资源
        最近更新 更多