【问题标题】:Celery groups and chains芹菜组和链
【发布时间】:2018-12-07 04:17:06
【问题描述】:

我需要对 Celery 中的一些任务进行排序,其中一些应该作为单个任务,一些应该并行工作,当组中的任务完成时,它应该通过下一个:

chain(
    task1.s(),
    task2.s(),
    group(task3.s(), task4.s()), 
    group(task5.s(), task6.s(), task7.s()), 
    task7.s()
).delay()

但我认为我做错了。有没有人知道怎么做?

另外,我不关心将每个任务的结果发送给其他人。

【问题讨论】:

    标签: python celery


    【解决方案1】:

    这听起来像是一个和弦,即并行执行任务并在并行任务完成时回调另一个任务:http://docs.celeryproject.org/en/latest/userguide/canvas.html#chords

    因此,您可能需要将其更改为: 链(task1.s(),task2.s(),弦(task3.s(),task4.s())(弦(task5.s()​​,task6.s(),task7.s())( task7.s())))

    此外,链/组等总是返回结果并将它们传递给子任务,因此您必须相应地对任务参数进行建模。

    由于这是一个相当复杂的工作流程,您最好从上一个任务中调用下一个任务(例如在 task1 结束时调用 task2.s().delay()) - 但我想没有办法为和弦建模。

    【讨论】:

    • 是的,Björn,我也考虑过和弦,所以我认为返回和弦中的任务结果,我可以使用一个虚拟任务。那么,和弦中的内容是否也适用于许多工作人员(多线程),例如在组中?编辑:嗯,看起来是并行的(现在你可以使用和弦并行计算每个加法步骤,然后得到结果数字的总和)
    【解决方案2】:

    这个终于成功了:

    chain(
        task1.s(),
        task2.s(),
        chord([task3.s(), task4.s()], body=task_result.s(), immutable=True), 
        chord([task5.s(), task6.s(), task7.s()], body=task_result.s(), immutable=True), 
        task7.s()
    ).delay()
    

    【讨论】:

    • 太棒了!我明天测试一下。我今天为此苦苦挣扎了一天。哈哈。
    • 什么是 task_result ?
    • 函数 task_result() 的示例。 ``` @app.task(name='FINISH_GROUP') def task_result(): # 可能需要参数 task_result(results): print('TEST: finish_group({}) -> {}'.format(nb, results) ) ```
    猜你喜欢
    • 2012-12-05
    • 1970-01-01
    • 2014-12-04
    • 2012-05-01
    • 2015-01-21
    • 2020-08-07
    • 1970-01-01
    • 2013-07-01
    • 2012-09-28
    相关资源
    最近更新 更多