【发布时间】:2016-11-21 04:18:51
【问题描述】:
如何使用 Celerybeat 和 Flask 设置一个每小时查询一次数据库的周期性任务?
环境是这样的:
/
|-app
|-__init__.py
|-jobs
|-task.py
|-celery-beat.sh
|-celery-worker.sh
|-manage.py
我目前有一个名为run_query() 的查询函数位于task.py
我希望调度程序在应用程序启动后启动,所以我的/app/__init__.py 文件夹中有以下几行:
celery = Celery()
@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(1, app.jobs.task.run_query())
(为简单起见,我已将其设置为 如果它运行,它将每分钟运行一次。还没有这样的运气。)
当我启动 celery-worker.sh 时,它会在 [tasks] 标题下识别我的功能。但是 scheduled 函数永远不会运行。我可以通过在命令提示符下发出以下命令来手动强制该函数运行:
>> from app.jobs import task
>> task.run_query.delay()
编辑:添加 celerybeat.sh
作为后续:如果通过烧瓶上下文访问数据库,在我的异步函数调用期间创建一个新的烧瓶上下文来访问数据库是否明智?使用现有的烧瓶上下文?或者完全忘记上下文,只是启动到数据库的连接?我担心的是,如果我只是启动一个新连接,它可能会干扰现有上下文的连接?
【问题讨论】:
-
您还需要运行调度程序服务。 docs.celeryproject.org/en/latest/userguide/…。 celery beat 是一个调度器;它定期启动任务,然后由集群中可用的工作节点执行。