【问题标题】:How can I iterate through a Celery Chain in Django?如何在 Django 中遍历芹菜链?
【发布时间】:2026-01-20 02:45:02
【问题描述】:

我将 Celery 5.1.0 与 Django 3.2.3 一起使用,并设置了 Celery 链。我希望循环运行这个链,但我需要确保只有在链中的前一组命令完成后才重新开始循环。

这是链条:

def base_data_chain(user_email, store):


    chain = (
            product_count.s(store=store) |
            get_data.s(store=store) |
            normalise.s() |
            merge.s() |
            send_task_email.s(user_email=user_email, store=store))
                             
    chain()

我需要的是这样的:

for store in stores:
    base_data_chain(user_email=user_email, store=store)

但我需要每个store 的循环在前一个store 被处理之前不要开始。

任何有关实现此目标的最佳方式的帮助将不胜感激。谢谢。

【问题讨论】:

    标签: python django celery


    【解决方案1】:

    我设法通过遍历所有任务并将它们添加到单个链中来解决这个问题(通过一些离线帮助) - 即

    from celery import chain
    
    def stores_data_chain(user_email, stores):
        store_chain = chain()
        for store in stores:
            store_chain |= product_count.s(store)
            store_chain |= get_data.s(store=store)
            store_chain |= normalise.s()
            store_chain |= merge.s()
            store_chain |= send_task_email.s(user_email=user_email, store=store)
        return store_chain()
    

    需要注意的唯一问题是每个任务都接收前一个任务的输出。

    在上面的示例中,他的意思是product_count 函数在第一个循环(正确)上接收到 store_id,但在第二个循环上它接收到下一个 store_id(正确)和 send_task_email 的返回(不需要)。

    为了解决这个问题,我不得不重构 product_count 函数以接受 args 而不是 kwargs,然后忽略第一个参数(如果有多个参数)。

    希望这个答案对面临类似问题的其他人有所帮助。

    【讨论】: