【问题标题】:How to dynamically add / remove periodic tasks to Celery (celerybeat)如何向 Celery (celerybeat) 动态添加/删除周期性任务
【发布时间】:2012-04-29 00:19:40
【问题描述】:

如果我有如下定义的函数:

def add(x,y):
  return x+y

有没有办法将此函数动态添加为 celery PeriodicTask 并在运行时启动它?我希望能够做类似(伪代码)的事情:

some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30"))
celery.beat.start(some_unique_task_id)

我还想使用(伪代码)之类的东西动态停止或删除该任务:

celery.beat.remove_task(some_unique_task_id)

celery.beat.stop(some_unique_task_id)

仅供参考,我没有使用 djcelery,它可以让您通过 django 管理员管理周期性任务。

【问题讨论】:

    标签: python celery celerybeat


    【解决方案1】:

    @asksol 的答案是在 Django 应用程序中需要什么。

    对于非 Django 应用程序,您可以使用 celery-sqlalchemy-scheduler,它的建模类似于 Django 的 django-celery-beat,因为它也使用数据库而不是文件 celerybeat-schedule

    这是一个运行时添加新任务的示例。

    tasks.py

    from celery import Celery
    
    celery = Celery('tasks')
    
    beat_dburi = 'sqlite:///schedule.db'
    
    celery.conf.update(
        {'beat_dburi': beat_dburi}
    )
    
    
    @celery.task
    def my_task(arg1, arg2, be_careful):
        print(f"{arg1} {arg2} be_careful {be_careful}")
    

    日志(生产者)

    $ celery --app=tasks beat --scheduler=celery_sqlalchemy_scheduler.schedulers:DatabaseScheduler --loglevel=INFO
    celery beat v5.1.2 (sun-harmonics) is starting.
    [2021-08-20 15:20:20,927: INFO/MainProcess] beat: Starting...
    

    日志(消费者)

    $ celery --app=tasks worker --queues=celery --loglevel=INFO
    -------------- celery@ubuntu20 v5.1.2 (sun-harmonics)
    [2021-08-20 15:20:02,287: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
    

    数据库计划

    $ sqlite3 schedule.db 
    sqlite> .databases
    main: /home/nponcian/Documents/Program/1/db/schedule.db
    sqlite> .tables
    celery_crontab_schedule       celery_periodic_task_changed
    celery_interval_schedule      celery_solar_schedule       
    celery_periodic_task        
    sqlite> select * from celery_periodic_task;
    1|celery.backend_cleanup|celery.backend_cleanup||1||[]|{}|||||2021-08-20 19:20:20.955246|0||1||0|2021-08-20 07:20:20|
    

    现在,当这些工作人员已经在运行时,让我们通过添加一个新的计划任务来更新计划。请注意,这是在运行时,无需重新启动工作人员。

    $ python3
    >>> # Setup the session.
    >>> from celery_sqlalchemy_scheduler.models import PeriodicTask, IntervalSchedule
    >>> from celery_sqlalchemy_scheduler.session import SessionManager
    >>> from tasks import beat_dburi
    >>> session_manager = SessionManager()
    >>> engine, Session = session_manager.create_session(beat_dburi)
    >>> session = Session()
    >>> 
    >>> # Setup the schedule (executes every 10 seconds).
    >>> schedule = session.query(IntervalSchedule).filter_by(every=10, period=IntervalSchedule.SECONDS).first()
    >>> if not schedule:
    ...     schedule = IntervalSchedule(every=10, period=IntervalSchedule.SECONDS)
    ...     session.add(schedule)
    ...     session.commit()
    ... 
    >>> 
    >>> # Create the periodic task
    >>> import json
    >>> periodic_task = PeriodicTask(
    ...     interval=schedule,                  # we created this above.
    ...     name='My task',                     # simply describes this periodic task.
    ...     task='tasks.my_task',               # name of task.
    ...     args=json.dumps(['arg1', 'arg2']),
    ...     kwargs=json.dumps({
    ...        'be_careful': True,
    ...     }),
    ... )
    >>> session.add(periodic_task)
    >>> session.commit()
    

    数据库计划(更新)

    • 我们现在可以看到,新添加的时间表已反映到数据库中,由 celery beat 调度程序持续读取。因此,如果 args 或 kwargs 的值有任何更新,我们可以轻松地对数据库执行 SQL 更新,并且它应该与正在运行的工作人员实时反映(无需重新启动)。
    sqlite> select * from celery_periodic_task;
    1|celery.backend_cleanup|celery.backend_cleanup||1||[]|{}|||||2021-08-20 19:20:20.955246|0||1||0|2021-08-20 07:20:20|
    2|My task|tasks.my_task|1|||["arg1", "arg2"]|{"be_careful": true}||||||0||1||0|2021-08-20 07:26:49|
    

    日志(生产者)

    • 现在,新任务每 10 秒排队一次
    [2021-08-20 15:26:51,768: INFO/MainProcess] DatabaseScheduler: Schedule changed.
    [2021-08-20 15:26:51,768: INFO/MainProcess] Writing entries...
    [2021-08-20 15:27:01,789: INFO/MainProcess] Scheduler: Sending due task My task (tasks.my_task)
    [2021-08-20 15:27:11,776: INFO/MainProcess] Scheduler: Sending due task My task (tasks.my_task)
    [2021-08-20 15:27:21,791: INFO/MainProcess] Scheduler: Sending due task My task (tasks.my_task)
    

    日志(消费者)

    • 新添加的任务每10秒按时正确执行一次
    [2021-08-20 15:27:01,797: INFO/MainProcess] Task tasks.my_task[04dcb40c-0a77-437b-a129-57eb52850a51] received
    [2021-08-20 15:27:01,798: WARNING/ForkPoolWorker-4] arg1 arg2 be_careful True
    [2021-08-20 15:27:01,799: WARNING/ForkPoolWorker-4] 
    
    [2021-08-20 15:27:01,799: INFO/ForkPoolWorker-4] Task tasks.my_task[04dcb40c-0a77-437b-a129-57eb52850a51] succeeded in 0.000763321000704309s: None
    [2021-08-20 15:27:11,783: INFO/MainProcess] Task tasks.my_task[e8370a6b-085f-4bd5-b7ad-8f85f4b61908] received
    [2021-08-20 15:27:11,786: WARNING/ForkPoolWorker-4] arg1 arg2 be_careful True
    [2021-08-20 15:27:11,786: WARNING/ForkPoolWorker-4] 
    
    [2021-08-20 15:27:11,787: INFO/ForkPoolWorker-4] Task tasks.my_task[e8370a6b-085f-4bd5-b7ad-8f85f4b61908] succeeded in 0.0006725780003762338s: None
    [2021-08-20 15:27:21,797: INFO/MainProcess] Task tasks.my_task[c14d875d-7f6c-45c2-a76b-4e9483273185] received
    [2021-08-20 15:27:21,799: WARNING/ForkPoolWorker-4] arg1 arg2 be_careful True
    [2021-08-20 15:27:21,799: WARNING/ForkPoolWorker-4] 
    
    [2021-08-20 15:27:21,800: INFO/ForkPoolWorker-4] Task tasks.my_task[c14d875d-7f6c-45c2-a76b-4e9483273185] succeeded in 0.0006371149993356084s: None
    

    【讨论】:

      【解决方案2】:

      Celery可以通过数据库和自身调用实现动态周期性任务。

      但是APSchedule 更好。

      因为动态周期性任务总是意味着长倒计时或 eta。太多这些周期性任务会占用大量内存,导致重新启动和执行非延迟任务非常耗时。

      tasks.py

      import sqlite3
      from celery import Celery
      from celery.utils.log import get_task_logger
      
      logger = get_task_logger(__name__)
      
      app = Celery(
          'tasks',
          broker='redis://localhost:6379/0',
          backend='redis://localhost:6379/1',
          imports=['tasks'],
      )
      
      conn = sqlite3.connect('database.db', check_same_thread=False)
      c = conn.cursor()
      sql = '''
      CREATE TABLE IF NOT EXISTS `tasks` 
      (
         `id` INTEGER UNIQUE PRIMARY KEY AUTOINCREMENT,
         `name` TEXT,
         `countdown` INTEGER
      );
      '''
      c.execute(sql)
      
      
      def create(name='job', countdown=5):
          sql = 'INSERT INTO `tasks` (`name`, `countdown`) VALUES (?, ?)'
          c.execute(sql, (name, countdown))
          conn.commit()
          return c.lastrowid
      
      
      def read(id=None, verbose=False):
          sql = 'SELECT * FROM `tasks` '
          if id:
              sql = 'SELECT * FROM `tasks` WHERE `id`={}'.format(id)
          all_rows = c.execute(sql).fetchall()
          if verbose:
              print(all_rows)
          return all_rows
      
      
      def update(id, countdown):
          sql = 'UPDATE `tasks` SET `countdown`=? WHERE `id`=?'
          c.execute(sql, (countdown, id))
          conn.commit()
      
      
      def delete(id, verbose=False):
          sql = 'DELETE FROM `tasks` WHERE `id`=?'
          affected_rows = c.execute(sql, (id,)).rowcount
          if verbose:
              print('deleted {} rows'.format(affected_rows))
          conn.commit()
      
      
      @app.task
      def job(id):
          id = read(id)
          if id:
              id, name, countdown = id[0]
          else:
              logger.info('stop')
              return
      
          logger.warning('id={}'.format(id))
          logger.warning('name={}'.format(name))
          logger.warning('countdown={}'.format(countdown))
      
          job.apply_async(args=(id,), countdown=countdown)
      

      main.py

      from tasks import *
      
      id = create(name='job', countdown=5)
      job(id)
      # job.apply_async((id,), countdown=5)  # wait 5s
      
      print(read())
      
      input('enter to update')
      update(id, countdown=1)
      
      input('enter to delete')
      delete(id, verbose=True)
      

      【讨论】:

        【解决方案3】:

        我一直在为 Celery + Redis 寻找可以灵活添加/删除的相同解决方案。看看这个,redbeat,来自 Heroku 的同一个人,甚至他们也使用了 Redis + Sentinel。

        希望有帮助:)

        【讨论】:

          【解决方案4】:

          这最终由 celery v4.1.0 中包含的a fix 实现。现在,您只需要更改数据库后端中的日程表条目,celery-beat 就会根据新的日程表进行操作。

          文档vaguely describe 这是如何工作的。 celery-beat 的默认调度程序PersistentScheduler 使用shelve file 作为其调度数据库。对 PersistentScheduler 实例中的 beat_schedule 字典的任何更改都会与此数据库同步(默认情况下,每 3 分钟一次),反之亦然。文档使用app.add_periodic_taskhow to add new entries 描述为beat_schedule。要修改现有条目,只需添加具有相同 name 的新条目。像从字典中一样删除条目:del app.conf.beat_schedule['name']

          假设您想使用外部应用监控和修改 celery 节拍时间表。那么你有几个选择:

          1. 您可以open 搁置数据库文件并像字典一样读取其内容。写回此文件以进行修改。
          2. 您可以运行 Celery 应用程序的另一个实例,并使用该实例来修改搁置文件,如上所述。
          3. 您可以use the custom scheduler class from django-celery-beat 将计划存储在 django 管理的数据库中,并访问那里的条目。
          4. 您可以使用celerybeat-mongo 中的调度程序将调度存储在MongoDB 后端,并访问那里的条目。

          【讨论】:

          • 很好的解决方案!
          • 迟到的评论,但我不明白如何以真正的动态方式做到这一点;即在我的应用程序收到 API 调用后,然后让它配置周期性任务。从代码示例来看,它似乎总是在函数定义期间进行评估(使用装饰器)。
          • 例如,当我尝试这个时:_gdbm.error: [Errno 11] Resource temporarily unavailable。所以似乎在 celery 运行时我似乎无法通过shelve.open(file) 打开文件。
          • @Tristan Brown 很好的解决方案,你有任何非 django 特定的例子吗?
          • 我为非 django 应用程序添加了答案。见*.com/a/68858483/11043825
          【解决方案5】:

          有一个名为 django-celery-beat 的库,它提供了人们需要的模型。要使其动态加载新的周期性任务,必须创建自己的调度程序。

          from django_celery_beat.schedulers import DatabaseScheduler
          
          
          class AutoUpdateScheduler(DatabaseScheduler):
          
              def tick(self, *args, **kwargs):
                  if self.schedule_changed():
                      print('resetting heap')
                      self.sync()
                      self._heap = None
                      new_schedule = self.all_as_schedule()
          
                      if new_schedule:
                          to_add = new_schedule.keys() - self.schedule.keys()
                          to_remove = self.schedule.keys() - new_schedule.keys()
                          for key in to_add:
                              self.schedule[key] = new_schedule[key]
                          for key in to_remove:
                              del self.schedule[key]
          
                  super(AutoUpdateScheduler, self).tick(*args, **kwargs)
          
              @property
              def schedule(self):
                  if not self._initial_read and not self._schedule:
                      self._initial_read = True
                      self._schedule = self.all_as_schedule()
          
                  return self._schedule
          

          【讨论】:

          • 谢谢。没有立即工作,但使用to_add = [key for key in new_schedule.keys() if key not in self.schedule.keys()] 和类似的to_remove 就可以了。为什么这不是标准选项?到目前为止,我不得不让 Celery 任务调用其他 Celery 任务并进行倒计时。这对我来说听起来不太好。
          【解决方案6】:

          你可以看看这个flask-djcelery,它配置了flask和djcelery,还提供了可浏览的rest api

          【讨论】:

            【解决方案7】:

            此问题已在 google groups 上回答。

            我不是作者,所有功劳归功于 Jean Mark

            这是一个合适的解决方案。确认工作,在我的场景中, 我将定期任务分类并从中创建了一个模型,因为我可以 根据需要将其他字段添加到模型中,这样我也可以添加 “终止”方法。您必须设置周期性任务的启用 属性为 False 并在删除之前将其保存。整体 子类化不是必须的, schedule_every 方法是 真的做的工作。当你准备好终止你的任务时(如果你 没有子类化它)你可以使用 PeriodicTask.objects.filter(name=...) 搜索您的任务,禁用 它,然后删除它。

            希望这会有所帮助!

            from djcelery.models import PeriodicTask, IntervalSchedule
            from datetime import datetime
            
            class TaskScheduler(models.Model):
            
                periodic_task = models.ForeignKey(PeriodicTask)
            
                @staticmethod
                def schedule_every(task_name, period, every, args=None, kwargs=None):
                """ schedules a task by name every "every" "period". So an example call would be:
                     TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3]) 
                     that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task. 
                """
                    permissible_periods = ['days', 'hours', 'minutes', 'seconds']
                    if period not in permissible_periods:
                        raise Exception('Invalid period specified')
                    # create the periodic task and the interval
                    ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task
                    interval_schedules = IntervalSchedule.objects.filter(period=period, every=every)
                    if interval_schedules: # just check if interval schedules exist like that already and reuse em
                        interval_schedule = interval_schedules[0]
                    else: # create a brand new interval schedule
                        interval_schedule = IntervalSchedule()
                        interval_schedule.every = every # should check to make sure this is a positive int
                        interval_schedule.period = period 
                        interval_schedule.save()
                    ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule)
                    if args:
                        ptask.args = args
                    if kwargs:
                        ptask.kwargs = kwargs
                    ptask.save()
                    return TaskScheduler.objects.create(periodic_task=ptask)
            
                def stop(self):
                    """pauses the task"""
                    ptask = self.periodic_task
                    ptask.enabled = False
                    ptask.save()
            
                def start(self):
                    """starts the task"""
                    ptask = self.periodic_task
                    ptask.enabled = True
                    ptask.save()
            
                def terminate(self):
                    self.stop()
                    ptask = self.periodic_task
                    self.delete()
                    ptask.delete()
            

            【讨论】:

            • @kai IntervalSchedulePeriodicTask 等是 djcelery 类,并且 OP 说他没有使用 djcelery。尽管如此,绝对有用。
            【解决方案8】:

            不,很抱歉,常规的 celerybeat 无法做到这一点。

            但它很容易扩展为你想要的,例如django 芹菜 调度程序只是一个子类,将调度读取和写入数据库 (顶部有一些优化)。

            您甚至可以将 django-celery 调度程序用于非 Django 项目。

            类似这样的:

            • 安装 django + django-celery:

              $ pip install -U django django-celery

            • 将以下设置添加到您的 celeryconfig:

              DATABASES = {
                  'default': {
                      'NAME': 'celerybeat.db',
                      'ENGINE': 'django.db.backends.sqlite3',
                  },
              }
              INSTALLED_APPS = ('djcelery', )
              
            • 创建数据库表:

              $ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig
              
            • 使用数据库调度器启动 celerybeat:

              $ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig \
                  -S djcelery.schedulers.DatabaseScheduler
              

            还有可用于非 Django 项目的 djcelerymon 命令 要在同一进程中启动 celerycam 和 Django Admin 网络服务器,您可以 使用它还可以在漂亮的 Web 界面中编辑您的定期任务:

               $ djcelerymon
            

            (请注意,由于某些原因,无法使用 Ctrl+C 停止 djcelerymon,您 必须使用 Ctrl+Z + kill %1)

            【讨论】:

            • 能否请您提及添加和删除任务的代码?对不起,我没有得到。
            • 从 2012 年到 2016 年有什么变化吗?