【发布时间】:2017-02-10 20:06:17
【问题描述】:
我有一个用于实例化 celery 实例的 Flask Celery 应用程序。 我知道从 .py 文件的角度来看,我可以将普通的 Flask 路由添加到同一个 .py 文件,并且我需要运行相同的代码两次:
-
运行工人:
% celery worker -A app.celery ...
-
运行与普通 Flask 应用相同的代码:
% python app.py ...
我的问题是:如果普通的 Flask 应用程序是真正独立于 Celery 应用程序的进程,那么我如何从 Flask 路由中操作正在运行的 celery 实例来执行类似的操作:
celery.control.purge()
celery.control.inspect() etc ???
这是我的代码:
import os
import random
import time
from flask import Flask, request, render_template, session, flash, redirect, \
url_for, jsonify
from celery import Celery
app = Flask(__name__)
# Celery configuration
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
# Initialize Celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def send_async_email(msg):
"""Background task to send an email with Flask-Mail."""
with app.app_context():
mail.send(msg)
@app.route('/purge', methods=['GET', 'POST'])
def purge_tasks():
## want to do stuffs with the running celery instance, e.g:
## doing:
## celery.control.purge()
## celery.control.inspect()
##
## BUT HOW??
if __name__ == '__main__':
app.run(debug=True)
我一直在互联网上寻找答案,但没有一个答案能具体回答这个问题。
非常感谢您的任何帮助/指点。
【问题讨论】:
-
不是 100% 确定,但是如果你的烧瓶应用是一个 URI,你不能从 celery 任务中调用来消费它吗?
-
感谢您的评论。但是,目标是能够通过 Flask 路由引用活动的 celery 实例来对任务进行一些操作(列出、检查)。即:我认为我应该通过引用这个 celery 实例来做到这一点:'celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])' 但是,我如何从 Flask 路由中引用该实例(与
celery worker -A ...实例不同的进程? ?也许我认为这是错误的方向,但是我们如何从 Flask 路由检查当前活动的芹菜队列? -
如果有帮助,请接受答案,如果您不介意。
-
如上所述,您无法将任何任务排队:您唯一的路线是“purge_tasks()”,您想在其中检查/消除队列中的任务。您调用“send_async_email.delay(msg)”的第二条路线将允许用户将任务排入队列。