【发布时间】:2020-06-18 19:03:10
【问题描述】:
目前我有一个用 django 运行的芹菜批次,如下所示:
芹菜.py:
from __future__ import absolute_import, unicode_literals
import os
import celery
from celery import Celery
from celery.schedules import crontab
import django
load_dotenv(os.path.join(os.path.dirname(os.path.dirname(__file__)), '.env'))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'base.settings')
django.setup()
app = Celery('base')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
app.control.purge()
sender.add_periodic_task(30.0, check_loop.s())
recursion_function.delay() #need to use recursive because it need to wait for loop to finish(time can't be predict)
print("setup_periodic_tasks")
@app.task()
def check_loop():
.....
start = database start number
end = database end number
callling apis in a list from id=start to id=end
create objects
update database(start number = end, end number = end + 3)
....
@app.task()
def recursion_function(default_retry_delay=10):
.....
do some looping
....
#when finished, call itself again
recursion_function.apply_async(countdown=30)
我的目标是每当 celery 文件被编辑然后它会重新启动所有任务 - 删除尚未执行的排队任务(我这样做是因为recursion_function 将在完成检查每条记录的工作后再次运行我的数据库中的一个表,所以我不担心它会在中途停止)。
check_loop 函数将调用具有分页功能的 api 以返回对象列表,我将按表中的记录将其与表中的记录进行比较,如果匹配则创建另一个模型的新自定义记录
我的问题是当我清除所有消息时,当前正在运行的任务会中途停止还是继续运行?因为如果 check_loop 函数中途停止循环遍历 api 列表,那么它将再次运行循环,我将创建我不想要的新重复记录
示例:
在check_loop() 的运行任务期间,它在中途创建了对象(在从元素 id=2 到 id=5 的 api 列表上),服务器重新启动 -> 再次运行,现在 check_loop() 从头开始运行(在元素 id 的 api 列表上=2 到 id=5) 并再次从该列表创建对象(100% 我不想要)
它是这样运行的吗?我只需要确认
编辑:
https://docs.celeryproject.org/en/4.4.1/faq.html#how-do-i-purge-all-waiting-tasks
我添加了app.control.purge(),因为当我重新启动时,recursion_function 在setup_periodic_tasks 中再次被调用,而之前来自recursion_function.apply_async(countdown=30) 的recursion_function 也执行,所以它会自我繁殖
【问题讨论】:
标签: django python-3.x celery celerybeat