【问题标题】:celery beat using flask task issuecelery beat 使用烧瓶任务问题
【发布时间】:2020-03-16 02:31:41
【问题描述】:

我想使用 Celery 从 Flask 执行 cron 作业,但我有一个关于 celery 节拍计划的问题,因为我的任务似乎没有加载,我不知道如何检查问题出在哪里。

这是我在 views.py 中定义我的 Flask 应用程序的地方:

from celery.schedules import crontab

app = Flask(__name__)
app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379',
    CELERY_BEAT_SCHEDULE={
        'task-number-one': {
            'task': 'app.tasks.test',
            'schedule': crontab(minute="*"),
        }
    },
    CELERY_IMPORTS = ('app.tasks'),
    CELERY_TASK_RESULT_EXPIRES = 30,
    CELERY_TIMEZONE = 'UTC',
)

我的 celery 对象是在 tasks.py 中创建的:

from celery import Celery
from app.views import app
from celery import shared_task


def make_celery(app):
    celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'],
                    broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

celery_app = make_celery(app)

@celery_app.task()
def test():
    logger = test.get_logger()
    logger.info("Hello")

views.py 和 tasks.py 在同一个目录下,叫做 app

这是我在启动 celery worker 时所拥有的(这里一切正常):

但这是我在启动 celery beat 时所拥有的,似乎我的任务从未按我的日程安排发送,但我不知道在哪里检查:

你能帮我解决这个问题吗? 最好的

【问题讨论】:

    标签: flask celery celerybeat


    【解决方案1】:

    我相信至少在发送@app.on_after_configure.connect 信号之后需要配置 Celery-Beat 任务。您应该能够在 tasks.py 文件中执行以下操作。

    celery_app.conf.CELERYBEAT_SCHEDULE = {
        "test-every-minue": {
        "task": "tasks.test",
        "schedule": crontab(minute="*"),
         },
    }
    

    如果您的任务与 celery 应用程序实例在同一个文件中定义,您也可以使用此装饰器语法。

    @celery_app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        sender.add_periodic_task(5 , test_two.s(), name='test-every-5')
    

    如果您的任务是在单独的模块中定义的,您可以在导入任务后使用 @app.on_after_finalize.connect 装饰器。

    @app.on_after_finalize.connect
    def setup_periodic_tasks(sender, **kwargs):
        from app.tasks import test
        sender.add_periodic_task(10.0, test.s(), name='test-every-10')
    

    Celery Beat Entry Docs

    【讨论】: