【问题标题】:Celery: Chaining subtasksCelery:链接子任务
【发布时间】:2014-11-03 15:44:54
【问题描述】:

我有一个主要的 celery(在 Django 中)任务,它调用了一个子任务prefetch

@shared_task
def main():
    ...
    tasks.prefetch.s().delay(x)

Prefetch本身调用了一些子任务:

@shared_task
def prefetch(x):
    ...
    do_prefetch.s().delay(x)

main 中,稍后在调用prefetch 之后,我执行了许多其他子任务来进行一些处理。这些子任务需要在prefetch 完成后执行。 Prefetch 本身需要一些时间,所以我不仅要在 prefetch 之后执行 process 任务,还要延迟 60 秒。 Process 任务不需要prefetch 的输出,它们只需要在它完成后运行。并且process 任务本身可以彼此并行运行。

@shared_task
def main():
    ...
    tasks.prefetch.s().delay(x)
    ...
    for i in range(10):
        tasks.process.s().delay(i)

我看到 Celery 的画布命令应该可以满足我的需要,但我不知道如何设置任务。下面是正确的吗?链是简单地通过创建它们来执行还是需要显式地执行?我的倒计时参数是否在正确的位置并意味着正确的事情(在预取后 60 秒的任何时间执行此任务)?

prefetch = tasks.prefetch.s(x)
g = group(tasks.process.s(i, countdown=60) for i in range(10))
c = (prefetch | g)
c()

【问题讨论】:

    标签: django celery django-celery


    【解决方案1】:

    您可以使用chord 代替最适合您情况的chain

    prefetch_task = tasks.prefetch.s(x)
    group_task = group(tasks.process.s((i), countdown=60) for i in range(10))
    work_flow = chord([prefetch_task])(group_task)   
    

    Chord:

    和弦由标题和正文组成。标头是一组必须在调用回调之前完成的任务。和弦本质上是一组任务的回调。

    示例:

    >>> from celery import chord
    >>> res = chord([add.s(2, 2), add.s(4, 4)])(sum_task.s())
    

    这里它将首先处理add 两个任务,完成后它将执行sum_task

    【讨论】:

    • 和弦seems to require tasks that do not ignore results。在我们的例子中,没有结果。
    • 所以,你正在做的似乎很好。如果prefetch 没有向g 发送任何值,则使用不可变签名。使用tasks.process.si 而不是tasks.process.s
    • 你还推荐chord还是chain好?
    • 链在你的情况下是好的。您必须使用group(tasks.process.s((i), countdown=60) for i in range(10)),以便芹菜知道哪些是kwargs;否则,它将忽略您的countdown
    • 好的,谢谢。 tasks.process.s(i).set(countdown=60) 怎么样?the documentation bottom of this section
    猜你喜欢
    • 2013-02-13
    • 1970-01-01
    • 1970-01-01
    • 2017-03-27
    • 2017-06-27
    • 2014-07-30
    • 1970-01-01
    • 2017-02-10
    • 2015-12-11
    相关资源
    最近更新 更多