【问题标题】:celery beat schedule: run task instantly when start celery beat?芹菜节拍时间表:启动芹菜节拍时立即运行任务?
【发布时间】:2015-05-11 18:15:45
【问题描述】:

如果我创建一个 celery beat 时间表,使用timedelta(days=1),第一个任务将在 24 小时后执行,引用 celery beat 文档:

使用 timedelta 作为计划意味着任务将以 30 秒的间隔发送(第一个任务将在 celery beat 开始后 30 秒发送,然后在最后一次运行后每 30 秒发送一次) .

但事实是,在很多情况下,调度程序在启动时运行任务实际上很重要,但是我没有找到允许我在 celery 启动后立即运行任务的选项,我没有阅读小心,还是芹菜缺少这个功能?

【问题讨论】:

    标签: python celery celerybeat


    【解决方案1】:

    我决定我可以声明每个任务的一个实例,并在 celery 启动时执行它们。我根本不喜欢这个,因为它使 celery 的启动速度非常慢(如果你的速度很慢 PeriodicTask),但它可以满足我的要求。

    只需将其添加到tasks.py 的末尾:

    ########### spawn all tasks at launch! ############
    localmess = locals().values()
    for obj in localmess:
        if isclass(obj):
            if obj is not PeriodicTask and issubclass(obj, PeriodicTask):
                instance = obj()
                logger.info('running {0}'.format(obj))
                try:
                    instance.run()
                except:
                    logger.warn('task fail: {0}'.format(obj))
                    pass
    
    ######## all tasks must be decleared above! ########
    

    【讨论】:

    • 如果您只是将任务安排为“现在”运行(或者如果需要,可以在一分钟后运行),这样它就不会阻止节拍启动。那么直接迭代locals(),删除不必要的isclass检查和pass
    【解决方案2】:

    您可以在工作人员准备好后立即使用 worker_ready.connect 装饰器运行任务:

    from celery.signals import worker_ready
    
    @worker_ready.connect
    def at_start(sender, **kwargs):
        """Run tasks at startup"""
        with sender.app.connection() as conn:
            sender.app.send_task("app.module.task", connection=conn)
    

    学分转到这个答案:https://*.com/a/14589445/3922534

    【讨论】:

      【解决方案3】:

      最好的办法是创建一个在完成任务后自行安排任务的实现。此外,创建一个入口锁,这样该任务就不能每时执行多次。 触发一次执行。

      在这种情况下,

      • 您不需要 celerybeat 进程
      • 任务保证执行

      【讨论】:

      • 这需要在每个任务中进行大量实施,我倾向于避免这种情况,因为任务可能出错并且无法安排下一个任务,我必须在每个任务中处理异常
      最近更新 更多