【发布时间】:2018-07-04 00:52:43
【问题描述】:
我正在尝试根据用户按钮单击临时暂停 celery 任务。
我所做的是:
当用户点击按钮时;我发布了一个 AJAX 请求,将我的 celery 任务状态更新为“PAUSE”
然后;我的策略是;当我对芹菜发起一项任务时;它运行一个 for 循环。 每个 for 循环;我读了我的数据库“状态”,看看它是否设置为暂停:如果它设置为暂停;我想让它休眠 60 秒或让它休眠直到用户点击恢复按钮;同样的想法。
这是我的代码:
r = redis.StrictRedis(host='localhost', port=6379, db=0)
@celery.task(bind=True)
def runTask(self, arr)
for items in arr:
current_task_id = self.request.id
item = r.get('celery-task-meta-'+current_task_id)
load_as_json = json.loads(item)
if "PAUSE" in load_as_json['status']:
sleep(50)
@app.route('/start')
def start_task()
runTask.apply_async(args=[arr])
return 'task started running
这是我的暂停 API 端点的样子:
@app.route('/stop/<task_id>')
def updateTaskState():
task_id = request.cookie.get('task_id')
loadAsJson = json.loads(r.get('celery-task-meta-'+str(task_id)))
loadAsJson['status'] = 'PAUSE'
loadAsJson.update(loadAsJson)
dump_as_json = json.dumps(loadAsJson)
updated_state = r.set('celery-task-meta-'+last_key, dump_as_json)
return 'updated state';
根据我的概念理解;是我没有看到更新状态的原因是因为;该任务已经执行并且无法从数据库中检索更新的值。 仅供参考:任务更新状态立即设置为暂停;我通过创建一个单独的脚本来检查这一点,该脚本检查while循环中的状态;每次我单击释放 AJAX 请求以更新状态的按钮时;我的数据库得到更新,它在单独的脚本上显示“PAUSE”;但是在 @celery.task 装饰器中,我似乎无法获得更新的状态。
以下是我用来测试的单独脚本;它似乎是预期的更新状态;我只是无法在任务装饰器中获得更新的状态......奇怪。
r = redis.StrictRedis(host='localhost', port=6379, db=0)
last_key = r.keys()
while True:
response = r.get('celery-task-meta-b1534a87-e18b-4f0a-89e2-08348d833056')
loadAsJson = json.loads(response)
print loadAsJson['status']
【问题讨论】: