【问题标题】:Run Celery task locking Flask运行 Celery 任务锁定 Flask
【发布时间】:2020-04-17 12:05:27
【问题描述】:

我正在尝试为 Flask+Celery 应用程序组织项目结构。 当我在单个文件中工作时,一切正常。 但是当我在模块中分发代码时,调用 test_task.apply_async() 是锁定烧瓶。 我的项目结构:

web_spider/
 app/
  __init__.py
  rest/
   __init__.py
   views/
    __init__.py
    test_view.py
   flask_app.py
  task_runner/
   __init__.py
   celery_app.py
   tasks.py
 requirements.txt

test_view.py

import flask
from app.task_runner.tasks import test_task

api_test_view = flask.Blueprint('api_test_view', __name__)

@api_test_view.route('/')
def test_view():
 test_task.apply_async() #lock there
 return 'Hello, World!'

flask_app.py

import flask
from app.rest.views.api_test_view import test_view

flask_app = flask.Flask(__name__)
flask_app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
flask_app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
flask_app.register_blueprint(test_view)

if __name__ == '__main__':
flask_app.run(debug=True)

celery_app.py

from app.rest.flask_app import flask_app
import celery

celery_app = celery.Celery(flask_app.name, broker=flask_app.config['CELERY_BROKER_URL'])

celery_app.conf.update(flask_app.config)

tasks.py

from celery import shared_task

@shared_task
def test_task():
 return 1 + 1

【问题讨论】:

    标签: python flask celery


    【解决方案1】:

    很可能,您没有加载 celery_app 并且 shared_task 找不到要使用的 Celery 应用程序。添加到您的 web_spider/app/__init__.py 或 web_spider/app/task_runner/__init__.py:

    from app.task_runner.celery_app import celery_app
    
    __all__ = ('celery_app',)
    

    记录在https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html#django-first-steps 搜索shared_task

    【讨论】:

    • 我将此添加到 web_spider/app/task_runner/__init__.py 并没有帮助
    • 只是为了科学在你的 tasks.py 中尝试下一个:from app.task_runner.celery_app import celery_app; @celery_app.task 而不是你的。它为您的任务设置了适当的 Celery 应用程序,并且应该可以工作。
    • 谢谢。它有帮助。
    • 所以,诀窍是在导入用shared_task装饰的任务之前配置celery_app