【发布时间】:2019-05-31 03:44:18
【问题描述】:
我有一个调用自身的 celery 任务(使用do_stuff.apply_async(queue="foo"))。以前我跑过app.control.add_consumer("foo", reply=True),所以我的工人可以从这个队列中消费。
一段时间后,我想停止该队列中的所有任务以及从do_stuff 启动的所有正在运行的任务。
所以我运行这段代码:
app.control.cancel_consumer("foo", reply=True)
i = app.control.inspect()
for queue in [i.active, i.scheduled, i.reserved]:
for worker_name, worker_tasks in queue().items():
for task in worker_tasks:
args = ast.literal_eval(task["args"])
if "do_stuff" in task["name"] and args[0] == crawler.name:
app.control.revoke(task["id"], terminate=True)
这种“有效”的方式。它确实停止了来自do_stuff 的所有正在运行的任务,并且它确实清除了计划任务(或者至少在运行此代码后我在 Flower 中看不到任何任务)。
问题是,如果我再次运行app.control.add_consumer("foo", reply=True),而不运行其他任何东西,新任务就会开始运行。这意味着 celery/redis 以某种方式设法将任务保留在某个地方。
为什么会这样?那些“隐藏”的任务保存在哪里?我怎样才能删除它们?
【问题讨论】:
标签: django python-3.x celery