【问题标题】:Calling tasks recursively on a tree structure在树结构上递归调用任务
【发布时间】:2014-12-17 16:47:59
【问题描述】:

如果这仅仅是我的误解造成的,我深表歉意。我一直在四处搜索和阅读文档,但无法找到适合我的解决方案。

我有一个树结构,其中每个节点可以有任意数量的子节点。对于每个节点,都会实例化一个新的 celery 任务来创建它,然后它会查看所有子节点并实例化新任务来创建它们。这样做的原因是为了更好地利用 celery 的多线程特性。递归创建整个树的单个任务似乎只利用了单个线程。

虽然我已经能够以这样的方式设置我的代码,但我遇到的问题是我在原始任务中有一些依赖项,这些依赖项在整个结构创建完成之前无法执行。代码如下所示:

@app.task
def initial_task(tree_data):
    jobs = []
    for node in tree_data:
        jobs.append(recursive_task.s(node))
    job = group(jobs)
    result = job.apply_async()

    # Block execution until group is finished
    while not result.ready():
        time.sleep(0.5)

    ... do dependent stuff ...

@app.task
def recursive_task(node, parent=None):
    # Create node object
    node_obj = Node(node.name, parent=parent)

    jobs = []
    for child in node.children:
        jobs.append(recursive_task.s(child, node_obj))
    job = group(jobs)
    result = job.apply_async()

    return node_obj

我遇到的问题是所有孩子的子任务都没有阻止第一组任务的完成,我不知道如何强制这样做。非常感谢您对此事的任何帮助。

因为我在创建children的时候需要node_obj的ID,所以不能简单的递归树,把任务链起来。

更新: 我已经稍微更改了代码以尝试改变结果。以下代码使所有子节点(包括孙子、曾孙等)成为顶级节点的直接子节点:

@app.task
def initial_task(tree_data):
    def _recursive_link_task(task_set, children):
        for child in children:
            task_set.link(create_node.s(child))

            if child.children:
                _recursive_link_task(task_set, child.children)


    for node in tree_data:
        s = create_node.s(None, node)
        if node.children:
            _recursive_link_task(s, node.children)
        s.apply_async()

@app.task
def create_node(parent, node):
    node_obj = Node(node.name, parent=parent)
    return (node_obj,)

我曾期望我可能会通过上面的 sn-p 代码获得更多的财富 - 但由于它只是传递给所有后续任务的初始节点对象,我仍然没有进一步尝试得到这个生成树结构。

【问题讨论】:

    标签: python django asynchronous celery


    【解决方案1】:

    使用chords 执行依赖于一堆任务结果的任务。

    由于我无法准确理解您需要如何调用递归任务,因此我实现了一个 reference 合并排序示例。

    注意这不适用于 celery 3.2.0+,因为在任务中调用 get 会导致异常。

    from celery import Celery, chord
    app = Celery('tasks', backend='amqp', broker='amqp://')
    app.conf.CELERY_RESULT_BACKEND = 'amqp'
    
    
    def mergesort(list_obj):
        '''normal mergesort
        '''
        if len(list_obj) <= 1:
            return list_obj
        middle = len(list_obj) / 2
        left, right = list_obj[:middle], list_obj[middle:]
        return list(merge(list(mergesort(left)), list(mergesort(right))))
    
    def merge(left, right):
        '''normal merge
        '''
        while 1:
            if left == []:
                for j in right:
                    yield j
                break
            elif right == []:
                for j in left:
                    yield j
                break
            elif left[0] < right[0]:
                yield left.pop(0)
            else:
                yield right.pop(0)
    
    def merge2(left_r, right_r):
        '''celery merge
        '''
        left =left_r.get()
        right = right_r.get()
        while 1:
            if left == []:
                for j in right:
                    yield j
                break
            elif right == []:
                for j in left:
                    yield j
                break
            elif left[0] < right[0]:
                yield left.pop(0)
            else:
                yield right.pop(0)
    @app.task
    def merge_c(in_list):
        '''celery merge
        '''
        #unpack
        print '*'*21 + str( in_list)
        left, right = in_list
        return list(merge2(left, right))
    
    @app.task
    def same_object(l_obj):
        '''helper function to convert list to `result`
        '''
        return l_obj
    
    @app.task
    def mergesort_c(list_obj):
        '''celery mergesort
        '''
        if len(list_obj) <= 1:
            # make sure that you return a `result` object for merge
            return same_object.delay(list_obj)
        middle = len(list_obj) / 2
        left, right = list_obj[:middle], list_obj[middle:]
        # finish mergesort (left) and mergesort(right) and merge them
        res = chord([mergesort_c.s(left), mergesort_c.s(right)])(merge_c.s())
        return res
    
    if __name__ == '__main__':
        l = [2,1, 3]
        #normal mergesort
        print mergesort(l) #[1, 2, 3, 3, 5]
        # with celery
        res = mergesort_c(l)
        print res.get()
    

    【讨论】:

    • 谢谢。如果我有机会,我会看看我是否可以将它应用到我正在编写的代码中。我曾尝试链接任务 - 也许和弦会提供更好的结果。
    • 祝你好运。同时,我将研究一个不会被新芹菜弃用的答案:)
    猜你喜欢
    • 2012-10-22
    • 2019-07-10
    • 1970-01-01
    • 2013-08-28
    • 2017-11-09
    • 1970-01-01
    • 2015-02-14
    • 2022-09-27
    • 1970-01-01
    相关资源
    最近更新 更多