【问题标题】:celery - chaining groups and subtasks. -> out of order executioncelery - 链接组和子任务。 -> 乱序执行
【发布时间】:2013-02-13 22:47:25
【问题描述】:

当我遇到以下情况时

group1 = group(task1.si(), task1.si(), task1.si())
group2 = group(task2.si(), task2.si(), task2.si())

workflow = chain(group1, group2, task3.si())

直观的解释是 task3 应该只在组 2 中的所有任务都完成后执行。

实际上,任务 3 在 group1 已启动但尚未完成时执行。

我做错了什么?

【问题讨论】:

标签: python celery django-celery


【解决方案1】:

事实证明,在 celery 中,您不能将两组链接在一起。
我怀疑这是因为与任务链接的组自动成为一个和弦
--> Celery 文档:http://docs.celeryproject.org/en/latest/userguide/canvas.html

将组与另一个任务链接在一起将自动升级 它是一个和弦:

组返回父任务。当将两组链接在一起时,我怀疑当第一组完成时,和弦开始回调“任务”。我怀疑这个“任务”实际上是第二组的“父任务”。我进一步怀疑这个父任务在完成启动组内的所有子任务后立即完成,因此执行第二组之后的下一个项目。

为了证明这一点,这里有一些示例代码。您需要已经有一个正在运行的 celery 实例。

# celery_experiment.py

from celery import task, group, chain, chord
from celery.signals import task_sent, task_postrun, task_prerun

import time
import logging

import random
random.seed()

logging.basicConfig(level=logging.DEBUG)

### HANDLERS ###    
@task_prerun.connect()
def task_starting_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):    
    try:
        logging.info('[%s] starting' % kwargs['id'])
    except KeyError:
        pass

@task_postrun.connect()
def task_finished_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
    try:    
        logging.info('[%s] finished' % kwargs['id'])
    except KeyError:
        pass


def random_sleep(id):
    slp = random.randint(1, 3)
    logging.info('[%s] sleep for %ssecs' % (id, slp))
    time.sleep(slp)

@task()
def thing(id):
    logging.info('[%s] begin' % id)
    random_sleep(id)
    logging.info('[%s] end' % id)


def exec_exp():
    st = thing.si(id='st')
    st_arr = [thing.si(id='st_arr1_a'), thing.si(id='st_arr1_b'), thing.si(id='st_arr1_c'),]
    st_arr2 = [thing.si(id='st_arr2_a'), thing.si(id='st_arr2_b'),]
    st2 = thing.si(id='st2')
    st3 = thing.si(id='st3')
    st4 = thing.si(id='st4')

    grp1 = group(st_arr)
    grp2 = group(st_arr2)

    # chn can chain two groups together because they are seperated by a single subtask
    chn = (st | grp1 | st2 | grp2 | st3 | st4)

    # in chn2 you can't chain two groups together. what will happen is st3 will start before grp2 finishes
    #chn2 = (st | st2 | grp1 | grp2 | st3 |  st4)

    r = chn()
    #r2 = chn2()

【讨论】:

  • 谢谢。对我来说不幸的是,我的工作流程不允许我在组之间使用“相关”任务。所以我最终创建了一个假任务def fake_celery_task(): pass 在组之间运行......
  • 在这种情况下,当一个组接受 2 个任务并将结果返回给和弦时,我假设和弦以任意顺序获取结果
【解决方案2】:

我对 celery 也有同样的问题,试图建立一个第一步是“产生一百万个任务”的工作流程。尝试了分组、子任务,最终我的 step2 在 step1 结束之前就开始了。

长话短说,我可能已经找到了使用和弦和哑音完成器的解决方案:

@celery.task
def chordfinisher( *args, **kwargs ):
  return "OK"

什么都不做,但它使我能够做到这一点:

tasks = []
for id in ids:
    tasks.append( mytask.si( id ) )
step1 = chord( group( tasks ), chordfinisher.si() )

step2 = ...

workflow = chain( step1, step2 )

本来我想在一个子任务中有step1,但出于同样的原因怀疑,调用组的动作结束,任务被认为完成,我的工作流程继续......

如果有人有更好的东西,我很感兴趣!

【讨论】:

  • 嗨,这就是我最终要做的。需要记住的一件事是,您需要哑铃器返回组执行的结果。否则,如果组中的任何内容失败,您的链将不会在第 1 步停止。 (这可能是也可能不是你想要的)
猜你喜欢
  • 2014-11-03
  • 2016-12-09
  • 2019-02-26
  • 1970-01-01
  • 1970-01-01
  • 2014-01-18
  • 2017-03-27
  • 2018-07-30
  • 1970-01-01
相关资源
最近更新 更多