【发布时间】:2014-12-17 16:47:59
【问题描述】:
如果这仅仅是我的误解造成的,我深表歉意。我一直在四处搜索和阅读文档,但无法找到适合我的解决方案。
我有一个树结构,其中每个节点可以有任意数量的子节点。对于每个节点,都会实例化一个新的 celery 任务来创建它,然后它会查看所有子节点并实例化新任务来创建它们。这样做的原因是为了更好地利用 celery 的多线程特性。递归创建整个树的单个任务似乎只利用了单个线程。
虽然我已经能够以这样的方式设置我的代码,但我遇到的问题是我在原始任务中有一些依赖项,这些依赖项在整个结构创建完成之前无法执行。代码如下所示:
@app.task
def initial_task(tree_data):
jobs = []
for node in tree_data:
jobs.append(recursive_task.s(node))
job = group(jobs)
result = job.apply_async()
# Block execution until group is finished
while not result.ready():
time.sleep(0.5)
... do dependent stuff ...
@app.task
def recursive_task(node, parent=None):
# Create node object
node_obj = Node(node.name, parent=parent)
jobs = []
for child in node.children:
jobs.append(recursive_task.s(child, node_obj))
job = group(jobs)
result = job.apply_async()
return node_obj
我遇到的问题是所有孩子的子任务都没有阻止第一组任务的完成,我不知道如何强制这样做。非常感谢您对此事的任何帮助。
因为我在创建children的时候需要node_obj的ID,所以不能简单的递归树,把任务链起来。
更新: 我已经稍微更改了代码以尝试改变结果。以下代码使所有子节点(包括孙子、曾孙等)成为顶级节点的直接子节点:
@app.task
def initial_task(tree_data):
def _recursive_link_task(task_set, children):
for child in children:
task_set.link(create_node.s(child))
if child.children:
_recursive_link_task(task_set, child.children)
for node in tree_data:
s = create_node.s(None, node)
if node.children:
_recursive_link_task(s, node.children)
s.apply_async()
@app.task
def create_node(parent, node):
node_obj = Node(node.name, parent=parent)
return (node_obj,)
我曾期望我可能会通过上面的 sn-p 代码获得更多的财富 - 但由于它只是传递给所有后续任务的初始节点对象,我仍然没有进一步尝试得到这个生成树结构。
【问题讨论】:
标签: python django asynchronous celery