【问题标题】:Celery tasks don't get revoked芹菜任务不会被撤销
【发布时间】:2023-03-07 19:44:01
【问题描述】:

我正在通过 django 的 celery(版本 2.3.2)作为任务运行多个模拟。模拟由另一个任务设置:

在views.py中:

result = setup_simulations.delay(parameters)
request.session['sim'] = result.task_id # Store main task id

在tasks.py中:

@task(priority=1)
def setup_simulations(parameters):
    task_ids = []
    for i in range(number_of_simulations):
        result = run_simulation.delay(other_parameters)
        task_ids.append(result.task_id)
    return task_ids

初始任务(setup_simulations)完成后,我尝试如下撤销模拟任务:

main_task_id = request.session['sim']
main_result = AsyncResult(main_task_id)
# Revoke sub tasks
from celery.task.control import revoke
for sub_task_id in main_result.get():
    sub_result = AsyncResult(sub_task_id); sub_result.revoke() # Does not work
    # revoke(sub_task_id) # Does not work neither

当我查看“python manage.py celeryd -l info”的输出时,任务被执行,就好像什么都没发生一样。任何可能出错的想法?​​

【问题讨论】:

  • 问题是否取决于我使用 Kombu 作为经纪人这一事实? (这些“虚拟传输”可能具有有限的广播和事件功能。例如,远程控制命令仅适用于 AMQP 和 Redis。)

标签: django celery


【解决方案1】:

正如您在评论中提到的,revoke 是一个远程控制命令,因此目前仅支持 amqp 和 redis 传输。

您可以通过在数据库中存储已撤销标志来自己完成此操作,例如:

from celery import states
from celery import task
from celery.exceptions import Ignore

from myapp.models import RevokedTasks


@task
def foo():
    if RevokedTasks.objects.filter(task_id=foo.request.id).count():
        if not foo.ignore_result:
            foo.update_state(state=states.REVOKED)
        raise Ignore()

如果您的任务正在处理某个模型,您甚至可以在其中存储一个标志。

【讨论】:

  • 感谢您确认我对 Kombu 的怀疑。我不确定是不是这样,因为 Kombu 确实是一个 AMQP 消息传递框架。也非常感谢您的建议,我想我会采用那个解决方案!
  • 有没有办法用这个变通方法来撤销以前安排的任务?
  • 我更新了示例以使用 Ignore,这是 Celery 3.0.11 中的新功能。这使得工作人员在任务返回后忽略任务,从而单独保留任务的状态。
  • @Fitoria 之前安排的内容是什么意思?工人收到的任何任务都可以通过这种方式被忽略。您可以使用相同的技术,例如通过忽略任务并发送带有新 ETA 的新任务来更改任务的 ETA/倒计时。 (当然,如果任务已经在执行,那也无济于事)。
  • @asksol 在 celery 任务表上添加字段的最佳方法是什么? stackoverflow.com/questions/25046200/…
猜你喜欢
  • 2015-01-21
  • 1970-01-01
  • 2018-03-07
  • 2014-09-22
  • 2022-01-21
  • 1970-01-01
  • 2019-02-26
  • 2018-05-11
  • 2018-05-17
相关资源
最近更新 更多