【发布时间】:2020-03-16 02:31:41
【问题描述】:
我想使用 Celery 从 Flask 执行 cron 作业,但我有一个关于 celery 节拍计划的问题,因为我的任务似乎没有加载,我不知道如何检查问题出在哪里。
这是我在 views.py 中定义我的 Flask 应用程序的地方:
from celery.schedules import crontab
app = Flask(__name__)
app.config.update(
CELERY_BROKER_URL='redis://localhost:6379',
CELERY_RESULT_BACKEND='redis://localhost:6379',
CELERY_BEAT_SCHEDULE={
'task-number-one': {
'task': 'app.tasks.test',
'schedule': crontab(minute="*"),
}
},
CELERY_IMPORTS = ('app.tasks'),
CELERY_TASK_RESULT_EXPIRES = 30,
CELERY_TIMEZONE = 'UTC',
)
我的 celery 对象是在 tasks.py 中创建的:
from celery import Celery
from app.views import app
from celery import shared_task
def make_celery(app):
celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
celery_app = make_celery(app)
@celery_app.task()
def test():
logger = test.get_logger()
logger.info("Hello")
views.py 和 tasks.py 在同一个目录下,叫做 app
这是我在启动 celery worker 时所拥有的(这里一切正常):
但这是我在启动 celery beat 时所拥有的,似乎我的任务从未按我的日程安排发送,但我不知道在哪里检查:
你能帮我解决这个问题吗? 最好的
【问题讨论】:
标签: flask celery celerybeat