【问题标题】:python celery: How to append a task to an old chainpython celery:如何将任务附加到旧链
【发布时间】:2013-10-24 09:12:43
【问题描述】:

我在我的数据库中保存了对链的引用。

from tasks import t1, t2, t3
from celery import chain
res = chain(t1.s(123)|t2.s()|t3.s())()
res.get()

如何将其他任务附加到此特定链?

res.append(t2.s())

我的目标是确保链按照我在代码中指定的顺序执行。 如果我的链中的任务失败,则不会执行以下任务。

知道我在指定队列中使用超大型任务。

【问题讨论】:

    标签: django rabbitmq celery chain


    【解决方案1】:

    所有信息都包含在消息中。

    消息可能正在传输中,可能在世界的另一端,也可能被中间处理器使用。因此,无法在消息发送后对其进行修改。

    http://docs.celeryproject.org/en/latest/userguide/tasks.html#state

    我的目标是确保链按照我在代码中指定的顺序执行。和 如果我的链中的任务失败,则不会执行以下任务。

    您可以确定,订单是作为消息的一部分发送的,并且不会继续 如果任何任务失败。

    现在,如果您真的希望能够在运行时添加任务,那么您可以存储 数据库中的信息并让任务本身检查并调用新任务。 但是,这样做会遇到一些挑战:

    1) 链中的第一个任务如果成功将调用下一个任务, 然后下一个任务将调用之后的下一个任务,依此类推。

    2) 如果你在这个进程中添加一个任务,如果第一个任务已经执行了会发生什么? 还是第二个,还是第三个?

    因此,您可能会猜到,这需要一些繁重的同步才能工作。

    我想一个更简单的解决方案是创建一个等待一个任务完成的任务 然后应用回调:

    from celery import subtask
    from celery.result import from_serializable
    
    @app.task(bind=True)
    def after_task(self, result, callback, errback=None):
        result = from_serializable(result)
        if not result.ready():
            raise self.retry(countdown=1)
        if task.successful():
            subtask(callback).delay(result.get())
        else:
            if errback:
                subtask(errback)()
    
    
    def add_to_chain(result, callback, errback=None):
        callback = callback.clone()     # do not modify caller
        new_result = callback.freeze()  # sets id for callback, returns AsyncResult
        new_result.parent = result
        after_task.delay(result.serializable(), callback, errback)
        return new_result
    

    那么你可以这样使用它:

    from tasks import t1, t2, t3
    
    res = (t1.s(123) | t2.s() | t3.s())()
    res = add_to_chain(t2.s())
    

    注意事项:

    bind=True 是即将发布的 3.1 版本中的新功能,适用于旧版本 您必须删除 self 参数并使用 current_task.retry(获取此 from celery import current_task)。

    Signature.freeze 也是 3.1 中的新功能,用于 在旧版本中也可以使用:

    from celery import uuid
    
    def freeze(sig, _id=None):
        opts = sig.options
        try:
            tid = opts['task_id']
        except KeyError:
            tid = opts['task_id'] = _id or uuid()
        return sig.AsyncResult(tid)
    

    【讨论】:

    • 谢谢@asksol,我会阅读您的回答并尝试正确理解
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-06-17
    • 1970-01-01
    • 1970-01-01
    • 2023-04-04
    • 1970-01-01
    • 2020-02-19
    • 2014-11-03
    相关资源
    最近更新 更多