【问题标题】:How to pause or resume celery task?如何暂停或恢复 celery 任务?
【发布时间】:2016-08-26 12:48:17
【问题描述】:

我的项目中有一个要求,客户可以暂停或恢复待处理的流程,而不是流程一。我正在使用 web socket 来显示 celery 任务结果,但在暂停/恢复时我不明白如何设计代码。我想到的唯一一种方法是撤销暂停请求中的任务,同时将撤销进程的数据保存在缓存中,稍后在resume api中使用该缓存来启动芹菜再次任务。通过使用这种方法,我的 web socket 设计流程受到干扰,因为我通过 websocket 轮询了任务处理状态,并且当没有进程时,我发送了一个 finish true 标志来关闭连接。要知道哪个任务正在处理或待处理,我为任务映射添加了一个单独的表,并在执行最后一个任务时刷新此表。请帮我把这个设计完美无缺,如果我遗漏了什么也请指出我。

【问题讨论】:

    标签: python redis django-rest-framework celery django-celery


    【解决方案1】:

    我想展示一种通过工作流模式实现可暂停(和可恢复)进行中 celery 任务的通用方法。

    概念

    使用celery workflows - 您可以将整个操作设计为分为chain 的任务。它不一定必须是纯粹的链,但它应该遵循一个任务后另一个任务(或任务group)完成的一般概念。

    一旦您有了这样的工作流程,您就可以最终定义点以在整个工作流程中暂停。在每个这些点,您可以检查前端用户是否请求操作暂停并相应地继续。概念是这样的:-

    一个复杂且耗时的操作 O 被拆分为 5 个 celery 任务——T1、T2、T3、T4 和 T5——这些任务中的每一个(第一个除外)都取决于前一个任务的返回值。

    假设我们定义了在每个任务之后暂停的点,所以工作流看起来像-

    • T1 执行
    • T1 完成,检查用户是否请求暂停
      • 如果用户没有请求暂停 - 继续
      • 如果用户请求暂停,序列化剩余的工作流链并将其存储在某个地方以便以后继续

    ... 等等。由于每个任务之后都有一个暂停点,因此在每个任务之后都会执行该检查(当然最后一个除外)。

    但这只是理论,我很难在网上任何地方找到它的实现,所以这就是我想出的-

    实施

    from typing import Any, Optional
    
    from celery import shared_task
    from celery.canvas import Signature, chain, signature
    
    @shared_task(bind=True)
    def pause_or_continue(
        self, retval: Optional[Any] = None, clause: dict = None, callback: dict = None
    ):
        # Task to use for deciding whether to pause the operation chain
        if signature(clause)(retval):
            # Pause requested, call given callback with retval and remaining chain
            # chain should be reversed as the order of execution follows from end to start
            signature(callback)(retval, self.request.chain[::-1])
            self.request.chain = None
        else:
            # Continue to the next task in chain
            return retval
    
    
    def tappable(ch: chain, clause: Signature, callback: Signature, nth: Optional[int] = 1):
        '''
        Make a operation workflow chain pause-able/resume-able by inserting
        the pause_or_continue task for every nth task in given chain
    
        ch: chain
            The workflow chain
    
        clause: Signature
            Signature of a task that takes one argument - return value of
            last executed task in workflow (if any - othewise `None` is passsed)
            - and returns a boolean, indicating whether or not the operation should continue
    
            Should return True if operation should continue normally, or be paused
    
        callback: Signature
            Signature of a task that takes 2 arguments - return value of
            last executed task in workflow (if any - othewise `None` is passsed) and
            remaining chain of the operation workflow as a json dict object
            No return value is expected
    
            This task will be called when `clause` returns `True` (i.e task is pausing)
            The return value and the remaining chain can be handled accordingly by this task
    
        nth: Int
            Check `clause` after every nth task in the chain
            Default value is 1, i.e check `clause` after every task
            Hence, by default, user given `clause` is called and checked
            after every task
    
        NOTE: The passed in chain is mutated in place
        Returns the mutated chain
        '''
        newch = []
        for n, sig in enumerate(ch.tasks):
            if n != 0 and n % nth == nth - 1:
                newch.append(pause_or_continue.s(clause=clause, callback=callback))
            newch.append(sig)
        ch.tasks = tuple(newch)
        return ch
    

    解释 - pause_or_continue

    这里pause_or_continue 是前面提到的暂停点。这是一项将以特定时间间隔调用的任务(时间间隔为任务间隔,而不是时间间隔)。该任务然后调用用户提供的函数(实际上是一个任务) - clause - 以检查该任务是否应该继续。

    如果clause函数(实际上是一个任务)返回True,用户提供的callback函数被调用,最新的返回值(如果有的话-None否则)被传递给这个回调,以及作为剩余的任务链callback 做它需要做的事情,pause_or_continueself.request.chain 设置为 None,这告诉 celery “任务链现在是空的 - 一切都完成了”。

    如果clause 函数(实际上是一个任务)返回False,则返回上一个任务的返回值(如果有的话 - None 否则)将返回给下一个要接收的任务 - 并且链继续.因此工作流程继续。

    为什么是clausecallback 任务签名而不是常规函数?

    clausecallback 都被直接调用 - 没有delayapply_async。它在当前进程中,在当前上下文中执行。所以它的行为与普通函数完全一样,那为什么要使用signatures

    答案是序列化。您不能方便地将常规函数对象传递给 celery 任务。但是您可以传递任务签名。这正是我在这里所做的。 clausecallback 都应该是 celery 任务的常规 signature 对象。

    什么是self.request.chain

    self.request.chain 存储 dicts 列表(将 jsons 表示为 celery 任务序列化程序,默认情况下为 json) - 每个都代表一个任务签名。此列表中的每个任务都以相反的顺序执行。这就是为什么在传递给用户提供的callback 函数(实际上是一个任务)之前,列表会被颠倒——用户可能希望任务的顺序是从左到右的。

    快速说明:与本次讨论无关,但如果您使用 apply_async 中的 link 参数而不是 chain 原语本身来构造链。 self.request.callback 是要修改的属性(即设置为None 以删除回调和停止链)而不是self.request.chain

    解释 - tappable

    tappable 只是一个基本函数,它接受一个链(为简洁起见,这是这里介绍的唯一工作流原语)并在每个 nth 任务之后插入 pause_or_continue。您可以将它们插入您真正想要的任何位置,由您在操作中定义暂停点。这只是一个例子!

    对于每个chain 对象,任务的实际签名(按顺序,从左到右)存储在.tasks 属性中。这是一个任务签名的元组。所以我们所要做的就是获取这个元组,转换成一个列表,插入暂停点并转换回一个元组以分配给链。然后返回修改后的链对象。

    clausecallback 也附加到 pause_or_continue 签名。普通的芹菜。

    这涵盖了主要概念,但为了展示使用此模式的真实项目(以及展示暂停任务的恢复部分),这里有一个所有必要资源的小演示

    用法

    此示例使用假设具有数据库的基本 Web 服务器的概念。每当启动操作(即工作流链)时,它就会分配一个 id 并存储到数据库中。该表的架构看起来像-

    -- Create operations table
    -- Keeps track of operations and the users that started them
    CREATE TABLE operations (
      id INTEGER PRIMARY KEY AUTOINCREMENT,
      requester_id INTEGER NOT NULL,
      completion TEXT NOT NULL,
      workflow_store TEXT,
      result TEXT,
      FOREIGN KEY (requester_id) REFERENCES user (id)
    );
    

    现在唯一需要知道的字段是completion。它只是存储操作的状态-

    • 当操作开始并创建一个数据库条目时,它被设置为IN PROGRESS
    • 当用户请求暂停时,路由控制器(即视图)将其修改为REQUESTING PAUSE
    • 当操作实际暂停并调用callback(来自tappable,内部pause_or_continue)时,callback 应将其修改为PAUSED
    • 任务完成后修改为COMPLETED

    clause 的一个例子

    @celery.task()
    def should_pause(_, operation_id: int):
        # This is the `clause` to be used for `tappable`
        # i.e it lets celery know whether to pause or continue
        db = get_db()
    
        # Check the database to see if user has requested pause on the operation
        operation = db.execute(
            "SELECT * FROM operations WHERE id = ?", (operation_id,)
        ).fetchone()
        return operation["completion"] == "REQUESTING PAUSE"
    

    这是在暂停点调用的任务,以确定是否暂停。这是一个带有 2 个参数的函数……嗯。第一个是强制性的,tappable 要求 clause 有一个(并且恰好是一个)参数 - 因此它可以将前一个任务的返回值传递给它(即使该返回值是 @ 987654384@)。在这个例子中,不需要使用返回值——所以我们可以忽略它。

    第二个参数是操作id。看,clause 所做的一切 - 是检查数据库中的操作(工作流)条目并查看它是否具有状态 REQUESTING PAUSE。为此,它需要知道操作 ID。但是clause 应该是一个有参数的任务,什么给出?

    好吧,好东西签名可以是部分的。首次启动任务并创建 tappable 链时。操作 id 是已知的,因此我们可以通过should_pause.s(operation_id) 来获取带有 one 参数的任务的签名,即前一个任务的返回值。这符合clause!

    callback 的一个例子

    import os
    import json
    from typing import Any, List
    
    @celery.task()
    def save_state(retval: Any, chains: dict, operation_id: int):
        # This is the `callback` to be used for `tappable`
        # i.e this is called when an operation is pausing
        db = get_db()
    
        # Prepare directories to store the workflow
        operation_dir = os.path.join(app.config["OPERATIONS"], f"{operation_id}")
        workflow_file = os.path.join(operation_dir, "workflow.json")
        if not os.path.isdir(operation_dir):
            os.makedirs(operation_dir, exist_ok=True)
        
        # Store the remaining workflow chain, serialized into json
        with open(workflow_file, "w") as f:
            json.dump(chains, f)
    
        # Store the result from the last task and the workflow json path
        db.execute(
            """
            UPDATE operations
            SET completion = ?,
                workflow_store = ?,
                result = ?
            WHERE id = ?
            """,
            ("PAUSED", workflow_file, f"{retval}", operation_id),
        )
        db.commit()
    

    这是在任务暂停时要调用的任务。请记住,这应该采用最后执行的任务的返回值和剩余的签名列表(按顺序,从左到右)。还有一个额外的参数 - operation_id - 再次。对此的解释与clause的解释相同。

    此函数将剩余的链存储在一个 json 文件中(因为它是一个字典列表)。请记住,您可以使用不同的序列化程序 - 我使用的是 json,因为它是 celery 使用的默认任务序列化程序。

    存储剩余链后,将completion状态更新为PAUSED,并将json文件的路径记录到db中。

    现在,让我们看看这些在行动-

    启动工作流的示例

    def start_operation(user_id, *operation_args, **operation_kwargs):
        db = get_db()
        operation_id: int = db.execute(
            "INSERT INTO operations (requester_id, completion) VALUES (?, ?)",
            (user_id, "IN PROGRESS"),
        ).lastrowid
        # Convert a regular workflow chain to a tappable one
        tappable_workflow = tappable(
            (T1.s() | T2.s() | T3.s() | T4.s() | T5.s(operation_id)),
            should_pause.s(operation_id),
            save_state.s(operation_id),
        )
        # Start the chain (i.e send task to celery to run asynchronously)
        tappable_workflow(*operation_args, **operation_kwargs)
        db.commit()
        return operation_id
    

    接受用户 ID 并启动操作工作流的函数。这或多或少是一个围绕视图/路由控制器建模的不切实际的虚拟函数。但我认为它可以通过总体思路。

    假设T[1-4]是操作的所有单元任务,每一个都以前一个任务的返回作为参数。只是普通芹菜链的一个例子,你可以随意使用你的链子。

    T5 是将最终结果(T4 的结果)保存到数据库的任务。因此,除了来自T4 的返回值,它还需要operation_id。哪个被传递到签名中。

    暂停工作流的示例

    def pause(operation_id):
        db = get_db()
    
        operation = db.execute(
            "SELECT * FROM operations WHERE id = ?", (operation_id,)
        ).fetchone()
    
        if operation and operation["completion"] == "IN PROGRESS":
            # Pause only if the operation is in progress
            db.execute(
                """
                UPDATE operations
                SET completion = ?
                WHERE id = ?
                """,
                ("REQUESTING PAUSE", operation_id),
            )
            db.commit()
            return 'success'
    
        return 'invalid id'
    

    这使用了前面提到的修改数据库条目以将completion 更改为REQUESTING PAUSE 的概念。一旦提交,下次pause_or_continue 调用should_pause 时,它会知道用户已请求暂停操作,并会相应地这样做。

    恢复工作流程的示例

    def resume(operation_id):
        db = get_db()
    
        operation = db.execute(
            "SELECT * FROM operations WHERE id = ?", (operation_id,)
        ).fetchone()
    
        if operation and operation["completion"] == "PAUSED":
            # Resume only if the operation is paused
            with open(operation["workflow_store"]) as f:
                # Load the remaining workflow from the json
                workflow_json = json.load(f)
            # Load the chain from the json (i.e deserialize)
            workflow_chain = chain(signature(x) for x in serialized_ch)
            # Start the chain and feed in the last executed task result
            workflow_chain(operation["result"])
    
            db.execute(
                """
                UPDATE operations
                SET completion = ?
                WHERE id = ?
                """,
                ("IN PROGRESS", operation_id),
            )
            db.commit()
            return 'success'
    
        return 'invalid id'
    

    回想一下,当操作暂停时 - 剩余的工作流存储在 json 中。由于我们目前将工作流限制为 chain 对象。我们知道这个 json 是一个签名列表,应该变成一个chain。因此,我们相应地对其进行反序列化并将其发送给 celery worker。

    请注意,这个剩余的工作流仍然具有原来的 pause_or_continue 任务 - 所以这个工作流本身再次可以暂停/恢复。当它暂停时,workflow.json 将被更新。

    【讨论】:

      【解决方案2】:

      错误的方法。您永远不应该手动暂停或撤销进程以获取当前状态。暂停和撤销状态是为代理错误保留的。

      尝试重新设计您的代码。

      实现的主要目标在这句话中

      customer can pause or resume process which are pending not the process one

      将您的代码设计为https://en.wikipedia.org/wiki/Workflow_pattern

      将您的代码拆分为步骤或状态。一个 celery 流程可以完成所有工作流程,但如果您对许多外部提供者(一个请求 = 一个状态)执行许多请求,则没有必要。如果客户暂停状态,则停止您的 celery 进程。添加事件以检查状态何时变为活动状态并再次为此任务运行新的 celery 进程。

      【讨论】:

      • 通过停止 celery 进程意味着停止 worker ro 消费它?
      • 没有。我的意思是完全结束这个过程。当客户再次按下开始时,为此流程创建新任务。
      • 我在一个队列中运行所有客户任务,所以每当客户暂停或恢复时,我都会获取客户相关的任务 ID 并使用终止真标志撤销它,这会进一步终止进程。这是我的理解。
      • 您不应该因客户需求而暂停和撤销 celery 任务!这很危险!如果客户单击暂停按钮,则将进程状态保存在您的数据库中,并且应该完成 celery 任务。在这个 celery 任务代码中应该类似于 if state == 'pause': return 如果客户点击按钮“恢复”然后再次运行该进程,并从保存状态使用新的 celery 任务。
      猜你喜欢
      • 2021-12-17
      • 2017-02-15
      • 2014-10-25
      • 2021-05-26
      • 2012-12-31
      • 2013-01-30
      • 2020-12-25
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多