【问题标题】:Way to make a task issue success only when all the children tasks have finished successfully?只有在所有子任务都成功完成后才能使任务问题成功?
【发布时间】:2020-12-21 22:00:30
【问题描述】:

所有,

我有一个关于芹菜的问题。假设我有以下 Celery 任务:

@celery_app.task
def add(x, y):
    return x + y


@celery_app.task
def task_no(n):
    return f'Finished task {n}.'


@celery_app.task
def add_bunch():
    return chord([add.si(1, 1), add.si(2, 2)])(task_no.si('1'))


@celery_app.task
def do_it_all():
    chain(
        add_bunch.si(),
        task_no.si('2')
    ).apply_async()

如果我运行 do_it_all() ,我会得到以下输出:

[INFO/MainProcess] Received task: lumi_translation.celery_tasks.add_bunch[d40dc179-602d-4414-9fbd-ee8d62fe7604]  
[INFO/ForkPoolWorker-1] Task lumi_translation.celery_tasks.add_bunch[d40dc179-602d-4414-9fbd-ee8d62fe7604] succeeded in 0.01651039347052574s: <AsyncResult: d5564664-1e6f-445f-a172-442fef547422>
[INFO/MainProcess] Received task: lumi_translation.celery_tasks.add[fbc7288a-1f76-447a-ac2b-906ddaa6c00c]  
[INFO/ForkPoolWorker-1] Task lumi_translation.celery_tasks.add[fbc7288a-1f76-447a-ac2b-906ddaa6c00c] succeeded in 0.0005592871457338333s: 2
[INFO/MainProcess] Received task: lumi_translation.celery_tasks.add[472d6142-355d-466b-8ee4-0d8cc7e1d96e]  
[INFO/ForkPoolWorker-1] Task lumi_translation.celery_tasks.add[472d6142-355d-466b-8ee4-0d8cc7e1d96e] succeeded in 0.0012424923479557037s: 4
[INFO/MainProcess] Received task: lumi_translation.celery_tasks.task_no[faa013e7-42c5-4321-b132-e749169810ee]  
[INFO/ForkPoolWorker-1] Task lumi_translation.celery_tasks.task_no[faa013e7-42c5-4321-b132-e749169810ee] succeeded in 0.0003700973466038704s: 'Finished task 2.'
[INFO/MainProcess] Received task: lumi_translation.celery_tasks.task_no[d5564664-1e6f-445f-a172-442fef547422]  
[INFO/ForkPoolWorker-1] Task lumi_translation.celery_tasks.task_no[d5564664-1e6f-445f-a172-442fef547422] succeeded in 0.0003337441012263298s: 'Finished task 1.'

add_bunch 即使子任务还没有完成,任务也会成功;因此,任务 2 在任务 1 之前完成。只有当所有子任务都成功完成时,有没有办法让 add_bunch 任务问题成功?在上面的例子中,有没有办法确保任务 1 在任务 2 之前完成?

【问题讨论】:

    标签: celery celery-task


    【解决方案1】:

    add_bunch() 什么都不做,它只是创建一个 Chord 对象并返回它。这总是会成功的,即。返回一个有效的 Chord 对象,当然,除非它不能分配更多的内存......

    【讨论】:

    • 您好 DejanLekic,感谢您的回复。我认为 add_bunch() 不仅会创建一个 Chord 对象,还会执行它。所以,我想知道是否有办法让 add_bunch 任务问题只有在所有子任务(add() 和 task_no())都成功完成时才成功?
    • 你是绝对正确的 - 它确实执行,但 return 语句只返回任何底层结果对象......它总是会成功。如果我有时间,我会更新/改进答案...
    • 谢谢。我希望找到一种方法使 add_bunch() 任务延迟发出“成功”,直到所有子任务都成功为止。
    【解决方案2】:

    我的一位同事向我展示了一个解决方法。他说他花了非常不合理的时间来解决这个问题。我把它放在这里,这样可以省去别人的麻烦。

    add_bunch() 任务的可行重写是:

    @celery_app.task(bind=True)
    def add_bunch(self):
        self.replace(
            chord(header=[add.si(1, 1), add.si(2, 2)], body=task_no.si('1'))
        )
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2012-10-09
      • 1970-01-01
      • 1970-01-01
      • 2020-04-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-07-21
      相关资源
      最近更新 更多