【发布时间】:2023-10-17 12:12:01
【问题描述】:
我尝试了所有能找到的方法,包括:
堆栈溢出
How to dynamically add / remove periodic tasks to Celery (celerybeat)
Can celery celerybeat dynamically add/remove tasks in runtime?
Github 问题
How to dynamically add or remove tasks to celerybeat?
我从上面得到的是,如果我只使用 celery 和 celery beat,我必须在添加/删除任务后重新启动 celery beat。但是如果我结合 django-celery-beat,我就不必重新启动它。
我一步步跟着docs:
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks')
app.config_from_object('celeryconfig')
app.conf.timezone = 'UTC'
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
# Calls test('world') every 30 seconds
sender.add_periodic_task(30.0, test.s('world'), expires=10)
# Executes every Monday morning at 7:30 a.m.
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
test.s('Happy Mondays!'),
)
@app.task
def test(arg):
print(arg)
我的芹菜配置
BROKER_URL = 'amqp://rabbit'
CELERY_RESULT_BACKEND = 'rpc://rabbit'
CELERY_RESULT_PERSISTENT = True
# CELERY_ACKS_LATE = True
CELERY_DEFAULT_DELIVERY_MODE = 2
CELERY_TASK_RESULT_EXPIRES = 3600
CELERYBEAT_SCHEDULER ="django_celery_beat.schedulers:DatabaseScheduler"
我的 celery beat 运行命令
celery -A tasks beat -l info -S django
这很好,任务按预期运行。之后,我写了一个脚本来在运行时添加任务
import django
django.setup()
from tasks import app, setup_periodic_tasks
from django_celery_beat.models import PeriodicTask, CrontabSchedule
crontab = CrontabSchedule.objects.create(
minute='*/1',
hour='*',
day_of_week='*',
)
period = PeriodicTask.objects.create(
name='testfasd',
kwargs={},
crontab=crontab,
task='tasks.test',
)
setup_periodic_tasks(app)
当我查看数据库时,我得到了我的预期,新记录以及 last_update 字段已更新。而 celery beat 中的日志也证明了这一点
[2016-12-20 17:37:21,796: INFO/MainProcess] Writing entries...
[2016-12-20 17:37:21,840: INFO/MainProcess] Scheduler: Sending due task add every 10 (tasks.test)
[2016-12-20 17:37:31,848: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[2016-12-20 17:37:31,851: INFO/MainProcess] Writing entries...
[2016-12-20 17:37:31,930: INFO/MainProcess] Scheduler: Sending due task add every 10 (tasks.test)
我的问题是虽然 celery beat 知道数据库发生了变化,但它仍然发送旧任务并且不将新任务发送给工作人员。有什么想法吗?
更新
我在我的项目中使用 docker,也许是相关的。