【问题标题】:Python Celery task workflow excutionPython Celery 任务工作流执行
【发布时间】:2026-01-04 04:20:03
【问题描述】:

我是 Python Celery 的新手,正在尝试使用 Celery 执行工作流。工作流程:Task workflow

在所附图片中,我试图展示下面以文字解释的工作流程。

  1. 客户端调用 Task-1。
  2. Task-1 成功时,Task-2 和 Task-3 会(同时)执行,Task-4 会在 Task-1 失败时执行。
  3. Task-2 成功时,Task-5 和 Task-6 会(同时)执行,Task-7 会在 Task-2 失败时执行。
  4. Task-5 成功时,大量(100 秒)Task-8 会(同时)执行,如果 Task-5 失败则执行 Task-9。

当客户端调用 Task-1 时,它会传递一个参数,该参数需要传递给成功和失败场景中的下一个任务。

我知道 Canvas 可用于设计带有 Celery 的工作流程。但是,我无法弄清楚如何在成功和失败的不同层次上进行如此庞大的工作流程。请建议我应该如何使用 Celery 和 Canvas 来规划这样的工作流程。 教程或示例的任何链接也会有很大帮助。

【问题讨论】:

    标签: python canvas error-handling celery


    【解决方案1】:

    你尝试过这样的事情吗? (伪代码)

    workflow = (t1.s().set(link_error=t4.s()) | 
    
                group((t2.s().set(link_error=t7.s()) |
    
                       group(t6.s(),
                            (t5.s().set(link_error=t9.s()) |
                             group(t8.s(i) for i in xrange(100)))),
    
                      t3.s())
                )
    ).apply_async()
    

    link_error 是错误的回调。

    http://docs.celeryproject.org/en/latest/userguide/canvas.html

    【讨论】:

    • 问题不在于在任务中启动(“调用”)任务,而在于等待任务中的任务(通常这也意味着启动它) . this page 上的警告很明确。不幸的是,文档here 谈到了“启动同步子任务”。这可能会导致认为“启动”部分是问题,但实际上问题是“同步”,这意味着等待任务。李的答案没有等待。
    • "if task_two(arg_one)":这应该等待任务结束。关于工作流程的问题非常明确。见图片。如果前一个任务成功,则按顺序执行这些任务。我称之为等待,我不确定你所说的“没有等待”是什么意思
    • 调用像你这样显示的任务 (task_two(...)) 完全绕过工作人员。就是here描述的__call__方法:“表示任务将在当前进程中执行,而不是由worker执行(不会发送消息)。”不涉及等待。如果你想称之为“同步”,那很好,但它不是我前面提到的 Celery 文档中讨论的“同步”。 那个同步就是让一个任务由一个工人运行并立即等待它。
    • 我明白了。我会更正我的答案。我认为 task_two(arg) 是一些等效于 task_two().get() 的伪代码。
    【解决方案2】:

    你可以有一个包装任务来处理调用其他任务的逻辑。

    您的代码最终可能会是这样。

    @celery.task(name='tasks.wrapper_task')
    def wrapper_task(one_arg):
        if task_one(one_arg):
            task_three.apply_async(arg_one)
            if task_two(arg_one):
                task_six.apply_async(arg_one)
                if task_five(arg_one): 
                    task_eight.apply_async(1)
                    task_eight.apply_async(2)
                    # etc...
                else:
                    task_nine(arg_one)
            else:
                task_seven(arg_one)
        else:
            task_four(arg_one)
    

    我还没有为你定义所有的任务。如您所见,除非您需要在其他情况下异步调用它们,否则并非所有任务都需要成为 celery 任务。

    我认为“失败”表示返回错误,但您可以轻松地修改代码以使用 try / except

    【讨论】:

    • 感谢 Lee 提出包装任务的想法。