【问题标题】:Celery task schedule (Celery, Django and RabbitMQ)Celery 任务计划(Celery、Django 和 RabbitMQ)
【发布时间】:2011-07-18 16:56:05
【问题描述】:

我想要一个每 5 分钟执行一次的任务,但它会等待最后一次执行完成,然后开始计算这 5 分钟。 (这样我也可以确定只有一个任务在运行)我发现最简单的方法是运行 django application manage.py shell 并运行这个:

while True:
    result = task.delay()
    result.wait()
    sleep(5)

但是对于我想以这种方式执行的每个任务,我必须运行它自己的 shell,有没有简单的方法可以做到这一点?可能是一些国王定制 ot django celery 调度程序?

【问题讨论】:

    标签: rabbitmq celery django-celery


    【解决方案1】:

    哇,没有人能理解这个人的问题,真是令人惊讶。他们问的不是定期运行任务,而是如何确保 Celery 不会同时运行同一任务的两个实例。我认为没有办法直接使用 Celery 执行此操作,但是您可以做的是让其中一个任务在开始时立即获得锁,如果失败,请在几秒钟内重试(使用重试) .该任务将在它返回之前释放锁;如果它崩溃或超时,你可以让锁在几分钟后自动过期。

    对于锁,您可能只使用您的数据库或 Redis 之类的东西。

    【讨论】:

    • +1。唯一解决唯一实例问题的人!如果您使用 django 数据库,如何实现锁的详细信息可以在这里找到:*.com/questions/4095940/…
    【解决方案2】:

    您可能对这种不需要更改 celery conf 的简单方法感兴趣。

    @celery.decorators.periodic_task(run_every=datetime.timedelta(minutes=5))
    def my_task():
        # Insert fun-stuff here
    

    【讨论】:

    • 我得到一个错误'Celery'对象没有属性'decorators'。对此有任何想法吗?我在我的任务上方写了@celery.decorators.periodic_task(run_every=datetime.timedelta(minutes=5))。
    • 最新版的celery没有这个装饰器。您只需使用此处的说明:docs.celeryproject.org/en/latest/userguide/periodic-tasks.html
    【解决方案3】:

    您只需要在 celery conf 中指定要定期运行的女巫任务以及间隔时间。

    示例:每 30 秒运行一次 tasks.add 任务

    from datetime import timedelta
    
    CELERYBEAT_SCHEDULE = {
        "runs-every-30-seconds": {
            "task": "tasks.add",
            "schedule": timedelta(seconds=30),
            "args": (16, 16)
         },
    }
    

    请记住,您必须使用 -B 选项在节拍模式下运行 celery

    manage celeryd -B
    

    您也可以使用 crontab 样式代替时间间隔,请查看:

    http://ask.github.com/celery/userguide/periodic-tasks.html

    如果您使用 django-celery,请记住您也可以使用 tha django db 作为定期任务的调度程序,这样您可以轻松地通过 django-celery 管理面板添加新的定期任务。 为此,您需要以这种方式在 settings.py 中设置 celerybeat 调度程序

    CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
    

    【讨论】:

    • 这样做的问题是它不会等待任务完成,而是会在时间到时(每 30 秒)发送另一个任务。还是我错了?
    • 感谢您的建议,但我想我还想要一些其他的东西——我想创建一个作业,将其发送执行,并且只有在前一个作业的执行完成后才创建另一个作业。在我知道上一个已经完成之前,我不想创造工作。我希望任务具有同步(非异步)行为
    • 全局目标是运行一个我不知道需要多少时间以及何时完成的任务,请等待一段时间并重新启动它。此外,我必须确保它不会被不同的工作线程同时执行 2 次或更多次,而且我不必编写自己的程序代码来执行此操作。
    • 如果你想确保一个任务只在最后一个完成后启动,使用 memcached(或 django 缓存)在所述任务中创建任务类型或资源的锁。它易于操作且可扩展。
    • @MauroRocco 这不是真的,至少从 3.0.12 开始,celery beat 肯定会创建重叠任务。
    【解决方案4】:

    扩展@MauroRocco 的帖子,来自http://docs.celeryproject.org/en/v2.2.4/userguide/periodic-tasks.html

    使用 timedelta 作为计划意味着任务将在 celerybeat 启动后 30 秒执行,然后在最后一次运行后每 30 秒执行一次。类似 crontab 的时间表也存在,请参阅 Crontab 时间表部分。

    所以这确实会达到你想要的目标。

    【讨论】:

    • 对不起,关于这个问题,但是如果任务需要 20 秒才能完成,它会在 0:30(1-st)运行,在 0:50 完成,然后在 1:20 开始(这是我真正想要的)
    • 如果您希望任务每 30 秒独立于持续时间运行一次,则必须使用 crontab 计划,但请记住,此任务已添加到 celery 队列中,并且如果还有其他任务正在执行/in queue 你不确定你的任务是否在给定时间开始。
    【解决方案5】:

    由于不推荐使用 celery.decorators,您可以像这样使用periodic_task 装饰器:

    from celery.task.base import periodic_task
    from django.utils.timezone import timedelta
    
    @periodic_task(run_every=timedelta(seconds=5))
    def my_background_process():
        # insert code
    

    【讨论】:

      【解决方案6】:

      将该任务添加到单独的队列中,然后为该队列使用单独的工作线程,并将并发选项设置为 1。

      【讨论】: