【问题标题】:Test if a celery task is still being processed测试是否仍在处理 celery 任务
【发布时间】:2011-05-19 03:30:51
【问题描述】:

如何测试celery 中是否仍在处理任务(task_id)?我有以下情况:

  1. 在 Django 视图中启动任务
  2. 将 BaseAsyncResult 存储在会话中
  3. 关闭 celery 守护程序(硬),以便不再处理该任务
  4. 检查任务是否“死亡”

有什么想法吗?是否可以查找 celery 正在处理的所有任务并检查我的任务是否仍然存在?

【问题讨论】:

  • 嘿,我也在寻找类似的东西,你有没有解决这个问题?我想知道是否将 task_id 存储在 django 的缓存框架中。我知道我可以使用类似dpaste.com/370419 的东西来获取任务的状态。但是我在使用数据库、缓存来存储 task_id 之间感到困惑。

标签: python django celery celery-task


【解决方案1】:

在你的模型中定义一个字段(PickledObjectField)来存储芹菜任务:

class YourModel(models.Model):
    .
    .
    celery_task = PickledObjectField()
    .
    .

    def task():
        self.celery_task = SubmitTask.apply_async(args = self.task_detail())
        self.save()

如果您的任务不是特定于任何模型的,您应该专门为 celery 任务创建一个。

否则我建议使用 django-celery。它有一个很好的监控功能:
http://ask.github.com/celery/userguide/monitoring.html#django-admin-monitor,以漂亮的图形方式将任务详细信息保存在 django 模型中。

【讨论】:

  • 是的,我使用 django-celery 并在内部 TaskMeta 模型中进行查找以获取状态。感谢您的回答。
【解决方案2】:

我认为有比在模型中存储任务对象更好的方法。例如,如果您想检查一组任务(并行)是否已完成:

# in the script you launch the task
from celery import group

job = group(
    task1.s(param1, param2),
    task2.s(param3, param4)
)
result = job.apply_async()
result.save()

# save the result ID in your model
your_obj_model = YourModel.objects.get(id='1234')
your_obj_model.task_id = result.id
your_obj_model.save()

那么在你看来

from celery.result import GroupResult
# ...
task_result = GroupResult.restore(your_obj_model.task_id)
task_finished = task_result.ready()
# will be True or False

【讨论】:

    猜你喜欢
    • 2018-06-03
    • 1970-01-01
    • 2012-06-06
    • 2017-12-19
    • 2012-04-07
    • 2018-06-10
    • 1970-01-01
    • 2016-04-09
    • 2020-11-24
    相关资源
    最近更新 更多