【问题标题】:How to use Flask-SQLAlchemy in a Celery task如何在 Celery 任务中使用 Flask-SQLAlchemy
【发布时间】:2012-08-16 04:52:45
【问题描述】:

我最近切换到 Celery 3.0。在此之前,我使用Flask-Celery 将 Celery 与 Flask 集成。虽然它有很多问题,比如隐藏一些强大的 Celery 功能,但它允许我使用 Flask 应用程序的完整上下文,尤其是 Flask-SQLAlchemy。

在我的后台任务中,我正在处理数据和 SQLAlchemy ORM 来存储数据。 Flask-Celery 的维护者已经放弃了对该插件的支持。该插件在任务中腌制 Flask 实例,因此我可以完全访问 SQLAlchemy。

我试图在我的 tasks.py 文件中复制此行为,但没有成功。您对如何实现这一点有任何提示吗?

【问题讨论】:

    标签: python flask celery flask-sqlalchemy


    【解决方案1】:

    更新:我们已经开始使用更好的方法来处理应用程序的拆卸和设置,基于in the more recent flask documentation 描述的模式。

    extensions.py

    import flask
    from flask.ext.sqlalchemy import SQLAlchemy
    from celery import Celery
    
    class FlaskCelery(Celery):
    
        def __init__(self, *args, **kwargs):
    
            super(FlaskCelery, self).__init__(*args, **kwargs)
            self.patch_task()
    
            if 'app' in kwargs:
                self.init_app(kwargs['app'])
    
        def patch_task(self):
            TaskBase = self.Task
            _celery = self
    
            class ContextTask(TaskBase):
                abstract = True
    
                def __call__(self, *args, **kwargs):
                    if flask.has_app_context():
                        return TaskBase.__call__(self, *args, **kwargs)
                    else:
                        with _celery.app.app_context():
                            return TaskBase.__call__(self, *args, **kwargs)
    
            self.Task = ContextTask
    
        def init_app(self, app):
            self.app = app
            self.config_from_object(app.config)
    
    
    celery = FlaskCelery()
    db = SQLAlchemy()
    

    app.py

    from flask import Flask
    from extensions import celery, db
    
    def create_app():
        app = Flask()
        
        #configure/initialize all your extensions
        db.init_app(app)
        celery.init_app(app)
    
        return app
    

    一旦您以这种方式设置应用程序,您就可以运行和使用 celery,而无需在应用程序上下文中显式运行它,因为您的所有任务将在必要时自动在应用程序上下文中运行,而您不需要'不必明确担心任务后拆解,这是一个需要管理的重要问题(请参阅下面的其他回复)。

    疑难解答

    那些不断收到with _celery.app.app_context(): AttributeError: 'FlaskCelery' object has no attribute 'app' 的人请确保:

    1. celery 导入保持在app.py 文件级别。避免:

    app.py

    from flask import Flask
    
    def create_app():
        app = Flask()
    
        initiliaze_extensions(app)
    
        return app
    
    def initiliaze_extensions(app):
        from extensions import celery, db # DOOMED! Keep celery import at the FILE level
        
        db.init_app(app)
        celery.init_app(app)
    
    1. flask run 之前开始你的芹菜工人并使用
    celery worker -A app:celery -l info -f celery.log
    

    注意app:celery,即从app.py加载。

    您仍然可以从扩展导入来装饰任务,即from extensions import celery

    下面的旧答案,仍然有效,但不是一个干净的解决方案

    我更喜欢在应用程序上下文中运行所有 celery,方法是创建一个单独的文件,该文件在应用程序的上下文中调用 celery.start()。这意味着您的任务文件不必到处都是上下文设置和拆卸。它也很适合烧瓶“应用程序工厂”模式。

    extensions.py

    from from flask.ext.sqlalchemy import SQLAlchemy
    from celery import Celery
    
    db = SQLAlchemy()
    celery = Celery()
    

    tasks.py

    from extensions import celery, db
    from flask.globals import current_app
    from celery.signals import task_postrun
    
    @celery.task
    def do_some_stuff():
        current_app.logger.info("I have the application context")
        #you can now use the db object from extensions
    
    @task_postrun.connect
    def close_session(*args, **kwargs):
        # Flask SQLAlchemy will automatically create new sessions for you from 
        # a scoped session factory, given that we are maintaining the same app
        # context, this ensures tasks have a fresh session (e.g. session errors 
        # won't propagate across tasks)
        db.session.remove()
    

    app.py

    from extensions import celery, db
    
    def create_app():
        app = Flask()
        
        #configure/initialize all your extensions
        db.init_app(app)
        celery.config_from_object(app.config)
    
        return app
    

    RunCelery.py

    from app import create_app
    from extensions import celery
    
    app = create_app()
    
    if __name__ == '__main__':
        with app.app_context():
            celery.start()
    

    【讨论】:

    • 我不明白新的更新。任务在哪里,在 app.py 或 tasks.py 中?如果在tasks.py中,你从哪里导入芹菜,扩展名?你还在使用 runcelery.py 吗?如果是这样,您还必须使用 create_app() 吗?
    • 我在 extensions.py 中附加了“add_together”任务:@celery.task() def add_together(a, b): return a + b,当我使用celery -A app.extensions.celery worker -l debug 运行worker 后导入它并像result = add_together.delay(5, 11) 一样调用它时,我收到此错误:AttributeError:' FlaskCelery' 对象没有属性 'app'。但是,如果我从 extensions.py (它是一个 FlaskCelery 实例)导入 celery,它具有正确的 app 价值。请问,我错过了什么?
    • 在新更新中出现此错误:with _celery.app.app_context(): AttributeError: 'FlaskCelery' object has no attribute 'app'
    • 此示例中存在潜在问题。如果你用--concurrency=N(N>1) 或--autoscale=M,N(N>1) 启动 celery worker,你可能会得到一些 MySQL2006/2014 错误或 sqlalchemy.exc.ResourceClosedError。由于 db.session 是一个非线程安全的对象,我们必须在每个 worker 初始化之前初始化 db 实例,就像 @Robpol86 所做的那样。
    • 想提一下,我需要这种集成的原因只是为了正确地让current_app.config['key'] 调用工作,不幸的是,在尝试在 celery 任务中执行此操作时,我仍然收到外部上下文警告。
    【解决方案2】:

    我使用Paul Gibbs' answer 有两个不同之处。我使用了worker_process_init 而不是task_postrun。而不是 .remove() 我使用了 db.session.expire_all()。

    我不是 100% 确定,但据我了解,它的工作方式是当 Celery 创建一个工作进程时,所有继承/共享的数据库会话都将过期,并且 SQLAlchemy 将按需创建新的会话唯一到那个工作进程。

    到目前为止,它似乎已经解决了我的问题。使用 Paul 的解决方案,当一名工作人员完成并删除会话时,使用同一会话的另一名工作人员仍在运行其查询,因此 db.session.remove() 在使用时关闭了连接,给我一个“与 MySQL 的连接丢失查询期间的服务器”异常。

    感谢 Paul 引导我朝着正确的方向前进!

    没关系。如果 Celery 调用它,我最终在我的 Flask 应用程序工厂中有一个参数不运行 db.init_app(app) 。相反,在 Celery 分叉后,工人们会调用它。我现在在我的 MySQL 进程列表中看到了几个连接。

    from extensions import db
    from celery.signals import worker_process_init
    from flask import current_app
    
    @worker_process_init.connect
    def celery_worker_init_db(**_):
        db.init_app(current_app)
    

    【讨论】:

      【解决方案3】:

      在您的 tasks.py 文件中执行以下操作:

      from main import create_app
      app = create_app()
      
      celery = Celery(__name__)
      celery.add_defaults(lambda: app.config)
      
      @celery.task
      def create_facet(project_id, **kwargs):
          with app.test_request_context():
             # your code
      

      【讨论】:

      • 迟到的评论,但我认为这很重要。我不认为在这里使用 test_request_context 是一个好主意,因为它适用于测试环境,而不是生产环境。
      【解决方案4】:
      from flask import Flask
      from werkzeug.utils import import_string
      from celery.signals import worker_process_init, celeryd_init
      from flask_celery import Celery
      from src.app import config_from_env, create_app
      
      celery = Celery()
      
      def get_celery_conf():
          config = import_string('src.settings')
          config = {k: getattr(config, k) for k in dir(config) if k.isupper()}
          config['BROKER_URL'] = config['CELERY_BROKER_URL']
          return config
      
      @celeryd_init.connect
      def init_celeryd(conf=None, **kwargs):
          conf.update(get_celery_conf())
      
      @worker_process_init.connect
      def init_celery_flask_app(**kwargs):
          app = create_app()
          app.app_context().push()
      
      • 在 celeryd 初始化时更新 celery 配置
      • 使用您的烧瓶应用工厂来初始化所有烧瓶扩展,包括 SQLAlchemy 扩展。

      通过这样做,我们能够维护每个工作人员的数据库连接。

      如果你想在flask上下文下运行你的任务,你可以继承Task.__call__:

      class SmartTask(Task):
      
          abstract = True
      
          def __call__(self, *_args, **_kwargs):
              with self.app.flask_app.app_context():
                  with self.app.flask_app.test_request_context():
                      result = super(SmartTask, self).__call__(*_args, **_kwargs)
                  return result
      
      class SmartCelery(Celery):
      
          def init_app(self, app):
              super(SmartCelery, self).init_app(app)
              self.Task = SmartTask
      

      【讨论】:

      • 环顾了大约 18 个小时后,我终于找到了一些有用的东西。在 create app 或 manger.py 之外调用 app=create_app 和 app_context 对我来说很奇怪,但确实可以
      猜你喜欢
      • 1970-01-01
      • 2016-10-14
      • 2019-05-25
      • 2021-05-16
      • 1970-01-01
      • 1970-01-01
      • 2015-09-07
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多