【发布时间】:2019-07-17 02:43:55
【问题描述】:
我对此进行了大量研究,包括尝试像this 这样的答案。看来 Celery 无法访问我的 Flask 应用程序的上下文。
我非常了解我的 celery 对象,我的任务将由什么来装饰,必须能够访问我的 Flask 应用程序的上下文。我确实相信它应该,因为我按照this 指南创建了我的芹菜对象。我不确定混淆是否在于我使用 Flask-HTTPAuth 的事实。
这是我拥有的一些东西。
def make_celery(app):
celery = Celery(app.import_name, backend=app.config["CELERY_RESULT_BACKEND"], broker=app.config["CELERY_BROKER_URL"])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
app = Flask(__name__)
auth = HTTPBasicAuth()
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///flask_app.db"
app.config["CELERY_BROKER_URL"] = "redis://localhost:6379"
app.config["CELERY_RESULT_BACKEND"] = "redis://localhost:6379"
celery = make_celery(app)
db = SQLAlchemy(app)
@celery.task(bind=True, name="flask_app.item_loop")
def loop(self):
items = g.user.items
for item in items:
print(item)
不过,使用 Flask 运行这个任务是不行的。我尝试通过点击服务器来启动这个功能(在授权的情况下!)。
@app.route("/item_loop")
@auth.login_required
def item_loop():
result = loop.delay()
return "It's running."
但是 Celery 工人告诉我任务 raised unexpected: AttributeError("'_AppCtxGlobals' object has no attribute 'user'",),我相信这意味着,如上所述,我的 celery 对象没有应用程序上下文,即使我使用了推荐的工厂模式。
【问题讨论】:
标签: python flask celery flask-sqlalchemy flask-httpauth