【发布时间】:2017-07-17 18:53:00
【问题描述】:
我们使用 celery v4.0.2、rabbitMQ v3.5.7 和flower v0.9.1 运行 Django v1.10,并且在 celery、rabbitMQ 和 Flower 上运行相当新。
有一个函数 x() 设置为在失败的情况下在 7 天后重试。我们有 1000 个 x 的实例在生产中重新排队。我们已经解决了这个问题,并希望尽快重试这些实例。
有没有办法在预定时间之前强制重试?
【问题讨论】:
我们使用 celery v4.0.2、rabbitMQ v3.5.7 和flower v0.9.1 运行 Django v1.10,并且在 celery、rabbitMQ 和 Flower 上运行相当新。
有一个函数 x() 设置为在失败的情况下在 7 天后重试。我们有 1000 个 x 的实例在生产中重新排队。我们已经解决了这个问题,并希望尽快重试这些实例。
有没有办法在预定时间之前强制重试?
【问题讨论】:
如果您可以获得任务列表,只需对每个任务调用task.retry(exc=exc)。 See docs.
试试celery.task.control.inspect().reserved(),看看你能不能用这种方式过滤任务。 Example here.
根据this answer.,您可以从它的 id 中获得一个任务对象
result = MyTask.AsyncResult(task_id)
result.get()
【讨论】:
MyTask.AsyncResult()
所以在尝试了很多事情之后,我不得不通过获取计划任务列表及其参数并使用参数在 for 循环中调用函数来解决它。
显然没有办法通过设计手动重试任务。您必须使用相同的参数创建另一个任务。这就是我最终所做的:
i = inspect()
scheduled = i.scheduled()
for key in scheduled:
for element in scheduled[key]:
reqDict = element['request']
if reqDict['type']=='module.function':
module.function.delay(converted_arguments)
revoke(reqDict['id'], terminate=True)
【讨论】: