正如the docs 中所述,您可以实现自己的自定义调度程序。您应该重写 is_due 方法,该方法决定是否该运行任务。
下面是一个概念验证(我没有检查它是否有错误)。请注意,__reduce__ 方法也被覆盖,因此新参数也被序列化。
import celery.schedules.schedule
class myschedule(celery.schedules.schedule):
def __init__(self, *args, **kwargs):
super(myschedule, self).__init__(*args, **kwargs)
self.start_date = kwargs.get('start_date', None)
def is_due(self, last_run_at):
if self.start_date is not None and self.now() < self.start_date:
return (False, 20) # try again in 20 seconds
return super(myschedule, self).is_due(last_run_at)
def __reduce__(self):
return self.__class__, (self.run_every, self.relative, self.nowfun, self.start_date)
然后你在配置中使用它:
CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': myschedule(timedelta(seconds=30), start_date=start_date),
'args': (16, 16)
},
}