【问题标题】:Django Celery Workflow Chain Pause/ResumeDjango Celery 工作流链暂停/恢复
【发布时间】:2013-01-30 23:40:40
【问题描述】:

有没有办法暂停/恢复使用 celery 3.0 中的链创建的正在运行的工作流?

基本上,我们的系统中有两种不同类型的任务:交互式任务和非交互式任务。我们拥有所有参数的非交互式的,但交互式的需要用户输入。请注意,对于交互任务,我们只能在链中所有先前的任务都完成后才请求用户输入,因为它们的结果会影响交互任务(即我们不能在创建实际链之前请求用户输入)。

关于如何解决这个问题的任何建议?在这里真是不知所措..

目前的想法:

  • 创建两个任务子类(来自 celery 导入任务)。向 Interactive 任​​务子类添加一个额外的实例(类成员)变量,默认设置为 false,表示仍需要一些用户输入。不知何故可以访问 Task 的实例,并从 celery worker 外部将其设置为 true (虽然我已经对此进行了很多查找,但似乎无法直接从另一个模块访问 Task 对象)李>
  • 将链划分为由交互式作业分隔的多个链。一旦链到达末端并触发交互式任务的交互式客户端组件,celery worker 外部的某种机制就会检测到。用户输入所有这些数据后,获取数据并启动新链,其中交互式任务位于新链的头部。

【问题讨论】:

    标签: celery django-celery


    【解决方案1】:

    我们已经在我们的项目中实施了类似于您的第二个想法的东西,并且效果很好。这是实现的要点。

    将新字段 status 添加到您的模型并覆盖保存方法。

    models.py:

    class My_Model(models.Model):
        # some fields
        status = models.IntegerField(default=0)
    
        def save(self, *args, **kwargs):
            super(My_Model, self).save(*args, **kwargs)
            from .functions import custom_func
            custom_func(self.status)
    

    tasks.py

    @celery.task()
    def non_interactive_task():
        #do something.
    
    @celery.task()
    def interactive_task():
        #do something.
    

    functions.py

    def custom_func(status):
        #Change status after non interactive task is completed.
        #Based on status, start interactive task.
    

    status 变量传递给模板,这对于显示UI 元素以供用户输入信息很有用。当用户输入所需信息时,更改状态。这会调用custom_func,它会触发您的interactive_task

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-06-12
      • 1970-01-01
      • 1970-01-01
      • 2016-08-26
      • 1970-01-01
      • 1970-01-01
      • 2012-08-27
      • 2017-05-09
      相关资源
      最近更新 更多