【问题标题】:Retry celery tasks before their countdown在倒计时之前重试 celery 任务
【发布时间】:2017-07-17 18:53:00
【问题描述】:

我们使用 celery v4.0.2、rabbitMQ v3.5.7 和flower v0.9.1 运行 Django v1.10,并且在 celery、rabbitMQ 和 Flower 上运行相当新。

有一个函数 x() 设置为在失败的情况下在 7 天后重试。我们有 1000 个 x 的实例在生产中重新排队。我们已经解决了这个问题,并希望尽快重试这些实例。

有没有办法在预定时间之前强制重试?

【问题讨论】:

    标签: django rabbitmq celery


    【解决方案1】:

    如果您可以获得任务列表,只需对每个任务调用task.retry(exc=exc)See docs.

    试试celery.task.control.inspect().reserved(),看看你能不能用这种方式过滤任务。 Example here.

    根据this answer.,您可以从它的 id 中获得一个任务对象

    result = MyTask.AsyncResult(task_id)
    result.get()
    

    【讨论】:

    • 我试过了,但是 inspect.scheduled() 不返回任务对象,而是它的字典表示,因此我不能调用 task.retry 。我一直在尝试找到如何从任务 ID 获取任务对象,但一直找不到。如果您可以分享替代方案/分享如何从其 id 获取任务对象,那就太好了。
    • @PrakharGupta 试试MyTask.AsyncResult()
    【解决方案2】:

    所以在尝试了很多事情之后,我不得不通过获取计划任务列表及其参数并使用参数在 for 循环中调用函数来解决它。

    显然没有办法通过设计手动重试任务。您必须使用相同的参数创建另一个任务。这就是我最终所做的:

    i = inspect()
    scheduled = i.scheduled()
    for key in scheduled:
        for element in scheduled[key]:
            reqDict = element['request']
            if reqDict['type']=='module.function':
                module.function.delay(converted_arguments)
                revoke(reqDict['id'], terminate=True)
    

    【讨论】: