【问题标题】:Can celery,celerybeat and django-celery-beat dynamically add/remove tasks in runtime without restart celerybeat?celery、celerybeat 和 django-celery-beat 可以在运行时动态添加/删除任务而不重启 celerybeat?
【发布时间】:2023-10-17 12:12:01
【问题描述】:

我尝试了所有能找到的方法,包括:

堆栈溢出

How to dynamically add / remove periodic tasks to Celery (celerybeat)

Can celery celerybeat dynamically add/remove tasks in runtime?

Github 问题

How to dynamically add or remove tasks to celerybeat?

我从上面得到的是,如果我只使用 celery 和 celery beat,我必须在添加/删除任务后重新启动 celery beat。但是如果我结合 django-celery-beat,我就不必重新启动它。

我一步步跟着docs

from celery import Celery
from celery.schedules import crontab

app = Celery('tasks')
app.config_from_object('celeryconfig')
app.conf.timezone = 'UTC'

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)

我的芹菜配置

BROKER_URL = 'amqp://rabbit'
CELERY_RESULT_BACKEND = 'rpc://rabbit'
CELERY_RESULT_PERSISTENT = True
# CELERY_ACKS_LATE = True
CELERY_DEFAULT_DELIVERY_MODE = 2
CELERY_TASK_RESULT_EXPIRES = 3600
CELERYBEAT_SCHEDULER ="django_celery_beat.schedulers:DatabaseScheduler"

我的 celery beat 运行命令

celery -A tasks beat -l info -S django

这很好,任务按预期运行。之后,我写了一个脚本来在运行时添加任务

import django
django.setup()
from tasks import app, setup_periodic_tasks
from django_celery_beat.models import PeriodicTask, CrontabSchedule


crontab = CrontabSchedule.objects.create(
       minute='*/1',
       hour='*',
       day_of_week='*',
   )

period = PeriodicTask.objects.create(
       name='testfasd',
       kwargs={},
       crontab=crontab,
       task='tasks.test',
   )

setup_periodic_tasks(app)

当我查看数据库时,我得到了我的预期,新记录以及 last_update 字段已更新。而 celery beat 中的日志也证明了这一点

[2016-12-20 17:37:21,796: INFO/MainProcess] Writing entries...
[2016-12-20 17:37:21,840: INFO/MainProcess] Scheduler: Sending due task add every 10 (tasks.test)
[2016-12-20 17:37:31,848: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[2016-12-20 17:37:31,851: INFO/MainProcess] Writing entries...
[2016-12-20 17:37:31,930: INFO/MainProcess] Scheduler: Sending due task add every 10 (tasks.test)

我的问题是虽然 celery beat 知道数据库发生了变化,但它仍然发送旧任务并且不将新任务发送给工作人员。有什么想法吗?

更新

我在我的项目中使用 docker,也许是相关的。

标签: python django celery


【解决方案1】:

This github issue

[celerybeat中不能添加或删除任务]目前需要重启beat。

没有。为了刷新 celery[beat] 内部的任务或任务计时,您必须重新启动 celery[beat] 实例。任务在运行时加载到内存中。要更改/添加任务,您必须刷新实例。

您可以考虑使用自重复任务、使用自定义计时和条件执行。示例:

from datetime import timedelta
from celery import shared_task

@shared_task
def check_conditions():
    # Do some db-level code
    if condition:
        check_conditions.apply_async(eta=timedelta(hours=6))

我在生产中使用它,它表现良好。

如果您需要重新加载任务,只需以编程方式重新启动 celery[beat]:

@shared_task
def autoreload():
    if condition:
        execute_shell_code_to_restart_celery()

我没有使用过,无法保证它的可用性,但理论上应该可以。

This github issue

我必须重新加载节拍才能在工作人员上更新此更改 ... 使用 django-celery-beat ... 这个问题在 4.0.2 和 master 上仍然存在,测试 [2016 年 12 月 21 日]。

【讨论】:

    【解决方案2】:

    警告

    如果您更改 Django TIME_ZONE 设置,您的定期任务计划仍将基于旧时区。

    要解决这个问题,您必须为每个定期任务重置“上次运行时间”:

     from django_celery_beat.models import PeriodicTask, PeriodicTasks
     PeriodicTask.objects.all().update(last_run_at=None)
     for task in PeriodicTask.objects.all():
         PeriodicTasks.changed(task)
    

    【讨论】: