【问题标题】:Celery chain tasks芹菜链任务
【发布时间】:2014-12-04 19:02:39
【问题描述】:

上下文:

tasks.py

def chunkify(lst,n):
    return [ lst[i::n] for i in xrange(n) ]

@task
def swarm_restart(procs):
   chunks = chunkify(procs, 4)
    res = chain(
        group_restart.s(( [ (proc.name, proc.host.name) for proc in chunks[0] ] )),
        group_restart.s(( [ (proc.name, proc.host.name) for proc in chunks[1] ] )),
        group_restart.s(( [ (proc.name, proc.host.name) for proc in chunks[2] ] )),
        group_restart.s(( [ (proc.name, proc.host.name) for proc in chunks[3] ] )),
    )()

@ task
def group_restart(procs):
    # this task runs only once, seems to be called just 1 time
    res = group( proc_restart.s(proc[0], proc[1]) for proc in procs ).apply_async()

@ task
def proc_restart(proc_name, hostname):
    # This method works, tested several times
    proc = Host.objects.get(name=hostname).get_proc(proc_name)
    proc.restart()

views.py

def call():
    procs = get_procs()
    tasks.swarm_restart.delay(procs)

我得到的错误: TypeError: group_restart() takes exactly 1 argument (2 given)

我做错了什么,有灯吗?

顺便说一句。 celery==3.0.25, django-celery==3.0.23

【问题讨论】:

    标签: django python-2.7 celery django-celery


    【解决方案1】:

    如果您查看 swarm_restart 任务,您正在链接 group_restart 任务。在这里,链中的第一个任务可以正常执行,但第二个任务会抛出错误。

    TypeError: group_restart() 只需要 1 个参数(给定 2 个)

    因为,第一个任务的结果作为参数传递给它。链中的下一个任务也会发生同样的情况。

    例如,

    from celery import task, chain
    
    @app.task
    def t1():
        return 't1'
    
    @app.task
    def t2():
       return 't2'
    
    wrong_chain = chain( t1.s(), t2.s() )
    

    如果您执行wrong_chain,即使您没有将任何参数传递给t2,它也会遇到类似的错误

    因此,您必须根据自己的工作来改变工作流程。

    【讨论】:

    • 您可以将您的 task.s() 替换为 task.si(),这使得签名不可变,并且一切都适用于当前工作流程本身。