【问题标题】:Setup celery periodic task设置 celery 周期性任务
【发布时间】:2016-11-21 04:18:51
【问题描述】:

如何使用 Celerybeat 和 Flask 设置一个每小时查询一次数据库的周期性任务?

环境是这样的:

/
|-app
  |-__init__.py
  |-jobs
    |-task.py
|-celery-beat.sh
|-celery-worker.sh
|-manage.py

我目前有一个名为run_query() 的查询函数位于task.py

我希望调度程序在应用程序启动后启动,所以我的/app/__init__.py 文件夹中有以下几行:

celery = Celery()

@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
  sender.add_periodic_task(1, app.jobs.task.run_query())

(为简单起见,我已将其设置为 如果它运行,它将每分钟运行一次。还没有这样的运气。)

当我启动 celery-worker.sh 时,它会在 [tasks] 标题下识别我的功能。但是 scheduled 函数永远不会运行。我可以通过在命令提示符下发出以下命令来手动强制该函数运行:

>> from app.jobs import task
>> task.run_query.delay()

编辑:添加 celerybeat.sh

作为后续:如果通过烧瓶上下文访问数据库,在我的异步函数调用期间创建一个新的烧瓶上下文来访问数据库是否明智?使用现有的烧瓶上下文?或者完全忘记上下文,只是启动到数据库的连接?我担心的是,如果我只是启动一个新连接,它可能会干扰现有上下文的连接?

【问题讨论】:

标签: python flask celery


【解决方案1】:

要运行周期性任务,您需要某种 schduler(例如 celery beat)。

celery beat是一个调度器;它定期启动任务,然后由可用的工作节点执行 集群。

您必须确保只有一个调度程序正在运行 一次,否则你最终会得到重复的任务。用一个 集中式方法意味着时间表不必 同步,并且服务可以在不使用锁的情况下运行。

参考:periodic-tasks

您可以使用命令调用调度程序,

$ celery -A proj beat #different process from your worker

您还可以通过启用 workers -B 来将 beat 嵌入到 worker 中 选项,如果你永远不会运行超过一个工人,这很方便 节点,但它不常用,因此不推荐 用于生产使用启动调度程序:

$ celery -A proj worker -B

参考:celery starting scheduler

【讨论】: