【问题标题】:How to dynamically add a scheduled task to Celery beat如何将计划任务动态添加到 Celery beat
【发布时间】:2023-04-04 20:14:02
【问题描述】:

使用 Celery ver.3.1.23,我正在尝试将计划任务动态添加到 celery beat。我有一个 celery worker 和一个 celery beat 实例正在运行。

运行 task.delay() 触发标准 celery 任务可以正常工作。当我将计划定期任务定义为配置中的设置时,celery beat 会运行它。

但是,我需要的是能够在运行时添加在指定 crontab 上运行的任务。将任务添加到持久调度程序后,celery beat 似乎没有检测到新添加的新任务。我可以看到 celery-schedule 文件确实有一个包含新任务的条目。

代码:

scheduler = PersistentScheduler(app=current_app, schedule_filename='celerybeat-schedule')
scheduler.add(name="adder",
          task="app.tasks.add",
          schedule=crontab(minute='*/1'),
          args=(1,2))
scheduler.close()

当我跑步时:

print(scheduler.schedule)

我明白了:

{'celery.backend_cleanup': <Entry: celery.backend_cleanup celery.backend_cleanup() <crontab: 0 4 * * * (m/h/d/dM/MY)>,
'adder': <Entry: adder app.tasks.add(1, 2) <crontab: */1 * * * * (m/h/d/dM/MY)>}

注意 app.tasks.add 有 @celery.task 装饰器。

【问题讨论】:

    标签: python python-3.x celery celerybeat


    【解决方案1】:

    您可以通过启用autoreloading来解决您的问题。

    但是我不能 100% 确定它是否适用于您的配置文件,但如果它位于 CELERY_IMPORTS 路径中,它应该可以。

    悬停者请注意,此功能是实验性的,请勿在生产中使用。

    如果您真的想要动态 celerybeat 调度,您可以随时使用another scheduler like the django-celery one 通过 django 管理员在 db 上管理周期性任务。

    【讨论】:

    • 谢谢。 AFAIK 自动重装需要成为 celery beat 中的一个选项,而 beat 不支持自动重装。是的,我确实想要动态调度,而且我不使用 django。看来我超前了,Celery ver 4.0将支持动态任务调度
    • 无论如何,重新启动 celery-beat 的成本应该不会那么高,尤其是如果您有专门的工作人员只做调度。
    • 每次客户端提交可能需要添加新计划的请求时,我都无法重新启动 celery-beat。此外,您将如何以编程方式执行此操作?顺便提一句。我已经切换到 python-crontab。
    • 在您的用例中,很明显基于配置的调度不是正确的方法,只需编写您自己的调度程序,并使用数据库作为后端,或者使用像 github.com/kongluoxing/celerybeatredis 这样的众多调度程序之一。
    【解决方案2】:

    我遇到了类似的问题,我想到的解决方案是预先定义一些通用的周期性任务(每 1 秒、每 5 分钟等),然后让它们从数据库中获取要执行的函数列表. 每次您想添加新任务时,只需在数据库中添加一个条目即可。

    【讨论】:

      【解决方案3】:

      Celery beat 将所有周期性调度的任务存储在模型 PeriodicTask 中。作为节拍任务可以以不同的方式安排,包括 crontab、interval 或 solar。所有这些字段都是 PeriodicTask 模型中的外键。

      为了动态添加定时任务,只需在 celery beat 中填充相关模型,调度器就会检测到变化。当元组的计数发生变化或调用 save() 函数时,就会检测到这些变化。

      from django_celery_beat.models import PeriodicTask, CrontabSchedule
      
      # -- Inside the function you want to add task dynamically 
      
      schedule = CrontabSchedule.objects.create(minute='*/1')
      task = PeriodicTask.objects.create(name='adder',
                                         task='apps.task.add', crontab=schedule)
      task.save()
      

      【讨论】:

        【解决方案4】:

        建议您切换到Celery Redbeat,而不是试图找到一个好的解决方法。

        【讨论】:

        • 这看起来不错,但有一个小警告:您必须使用 Redis。
        • RedBeat 不仅要解决动态时间表的问题,还要解决更大的问题——高可用性和可靠性。
        最近更新 更多