【问题标题】:SQLAlchemy returns stale data in celery tasksSQLAlchemy 在 celery 任务中返回陈旧数据
【发布时间】:2015-06-15 10:51:57
【问题描述】:

我目前的设置包括

Flask、Flask-SQLAlchemy、芹菜

现在我遇到的问题是有时在 celery 任务中的数据库查询会提供陈旧的数据,即如果我要求从表中获取最后一条记录,我会得到倒数第二个,而最后一个条目是在执行查询前 10-15 分钟插入的,并且有时我会看到这样的异常。

OperationalError("(OperationalError) (2006, \'MySQL server has gone away\')",)'

后面是引用

  File "/home/sys_user/repo/my_app/app/tasks/reminders.py", line 63, in run
    config = Reminder.query.filter_by(id=reminder_id).first()
  File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2341, in first
    ret = list(self[0:1])
  File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2208, in __getitem__
    return list(res)
  File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2412, in __iter__
    return self._execute_and_instances(context)
  File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2427, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 729, in execute
    return meth(self, multiparams, params)
  File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 321, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 826, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 958, in _execute_context
    context)
  File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1160, in _handle_dbapi_exception
    exc_info
  File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 199, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb)
  File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 951, in _execute_context
    context)
  File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 436, in do_execute
    cursor.execute(statement, parameters)
  File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/MySQLdb/cursors.py", line 205, in execute
    self.errorhandler(self, exc, value)
  File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/MySQLdb/connections.py", line 36, in defaulterrorhandler
    raise errorclass, errorvalue

下面是我的 celery_app.py

from app import app as flask_app, db
from celery.signals import worker_process_init

def make_celery(app=None):
    celery = Celery()
    celery.conf.update(flask_app.config)
    celery.config_from_object(celeryconfig)

    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with flask_app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    db.init_app(flask_app)
    celery.Task = ContextTask
    return celery


celery_instance = make_celery()


@worker_process_init.connect
def celery_worker_init_db(**_):
    db.init_app(flask_app)

一些已定义的任务。

class ReminderTask(Task):
    ignore_result = True

    def run(self, data):
        a = Reminder.query.filter_by(id=int(data['reminder_id'])).first()
        a.send_reminders()

【问题讨论】:

    标签: python flask sqlalchemy celery flask-sqlalchemy


    【解决方案1】:

    sqlalchemy google group 也回答了同样的问题

    基于此,我最终实施了ask 提到here 的解决方案。从那以后我再也没有看到这些问题。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-01-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-01-23
      • 2020-06-23
      • 2015-04-07
      • 2023-03-10
      相关资源
      最近更新 更多