【问题标题】:Running celery task when celery beat startscelery beat 开始时运行 celery 任务
【发布时间】:2015-10-24 05:11:41
【问题描述】:

当我启动 celery beat 时如何安排任务运行,然后在 1 小时后再次运行。

目前我在 settings.py 中有时间表:

CELERYBEAT_SCHEDULE = {
    'update_database': {
        'task': 'myapp.tasks.update_database',
        'schedule': timedelta(seconds=60),
    },
}

我在 * 上看到了 1 年的帖子,问了同样的问题: How to run celery schedule instantly?

但这对我不起作用,因为当我运行 django 服务器时,我的 celery worker 会收到 3-4 个相同任务的请求

我正在启动我的工人并像这样击败:

celery -A dashboard_web worker -B --loglevel=INFO --concurrency=10

【问题讨论】:

  • 您找到解决方案了吗?

标签: django celery


【解决方案1】:

Crontab 计划

您可以尝试改用crontab 计划,该计划将每小时运行一次,并在计划程序初始化后 1 分钟启动。 警告:您可能希望在几分钟后执行此操作,以防启动需要更长的时间,否则您可能需要等待整整一个小时。

from celery.schedules import crontab
from datetime import datetime

CELERYBEAT_SCHEDULE = {
    'update_database': {
        'task': 'myapp.tasks.update_database',
        'schedule': crontab(minute=(datetime.now().minute + 1) % 60),
    },
}

参考:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#crontab-schedules

MyAppConfig 的就绪方法

为了确保您的任务立即运行,您可以使用与之前相同的方法创建周期性任务,而无需在minute 中添加1。然后,您在MyAppConfigready 方法中调用您的任务,只要您的应用程序准备就绪,就会调用该方法。

#myapp/apps.py

class MyAppConfig(AppConfig):
    name = "myapp"

    def ready(self):
        from .tasks import update_database
        update_database.delay()

请注意,如果您要使用django_celery_beat,也可以直接在ready 方法中创建周期性任务。

编辑: 没有看到您提到的链接中已经涵盖了第二种方法。我会把它留在这里,以防它对到达这里的其他人有用。

【讨论】:

  • 你需要使用minute=(datetime.now().minute+1) % 60 否则 celerybeat 在最后一分钟执行时会引发异常
【解决方案2】:

尝试设置配置参数CELERY_ALWAYS_EAGER = True

类似的东西

app.conf.CELERY_ALWAYS_EAGER = True

【讨论】:

  • 这不是一个好的建议。这意味着每个任务都不会被发送给 celery worker,而只会在它们被调用时以及由调用它们的进程执行。
  • 公平地说,如果您需要在同一台运行 Celery Beat 的服务器上运行 Celery 任务,这是可以的(可能是有效的用例),但请记住,任何额外的 Celery 任务都会从这些任务中触发都将在 EAGER 模式下运行——它们都将是同步执行。虽然这有利于将一堆任务“链接”在一起,但如果您想触发任务以并行运行,它们永远不会。