【问题标题】:Add and delete Celery periodic tasks at runtime在运行时添加和删除 Celery 周期性任务
【发布时间】:2017-07-04 15:50:46
【问题描述】:

我整天都在与这个任务作斗争。

我有一个 Django 应用程序。我将 Celery 用于异步任务。偶尔,我想创建一个周期性任务。任务运行的次数未知,但稍后需要将其删除。所以任务可能是这样的:

@shared_task
def foobar_task(id):
    if this_should_run:
        do_task()
    else:
        PeriodicTask.objects.get(name='{} task'.format(id)).delete()

我的应用正在运行。我在 Docker 容器中运行 celery beat,使用 celery --app=myproject beat --loglevel=info --scheduler=django 运行。我有另一个容器运行标准 celery worker。

所以现在我想动态地创建我的周期性任务。我有一个触发类似这样的视图/API 端点:

schedule, _ = IntervalSchedule.objects.get_or_create(every=15, period=IntervalSchedule.SECONDS)
PeriodicTask.objects.create(interval=schedule,
                            name='{} task'.format(id),
                            task='myapp.tasks.foobar_task')

在 Django admin 中,我可以看到已创建周期性任务。但是,查看 celery 容器和 celery beat 容器的日志,没有任何反应。

为什么 celery beat 没有发现有新的周期性任务?我不想每次创建或删除新任务时都必须重新启动 celery beat。

注意:我使用的是 Django 1.11.2、PostgreSQL、Celery 4.0.2、Django Celery Beat 1.0.1。

【问题讨论】:

    标签: django celery celerybeat


    【解决方案1】:

    您可以创建如下所示的自定义调度程序,改编自 this answer

    from django_celery_beat.schedulers import DatabaseScheduler
    
    class AutoUpdateScheduler(DatabaseScheduler):
    
        def tick(self, *args, **kwargs):
            if self.schedule_changed():
                self.sync()
                self._heap = None
                new_schedule = self.all_as_schedule()
    
                if new_schedule:
                    to_add = [x for x in new_schedule.keys() if x not in self.schedule.keys()]
                    to_remove = [x for x in self.schedule.keys() if x not in new_schedule.keys()]
                    for key in to_add:
                        self.schedule[key] = new_schedule[key]
                    for key in to_remove:
                        del self.schedule[key]
    
            super(AutoUpdateScheduler, self).tick(*args, **kwargs)
    
        @property
        def schedule(self):
            if not self._initial_read and not self._schedule:
                self._initial_read = True
                self._schedule = self.all_as_schedule()
            return self._schedule
    

    当你运行 celery beat 时,把它指向这个类:

    celery --app=myproject beat --loglevel=info --scheduler=myproject.scheduler.AutoUpdateScheduler
    

    【讨论】:

      最近更新 更多