【问题标题】:python rq - how to trigger a job when multiple other jobs are finished? Multi job dependency work arround?python rq - 当多个其他作业完成时如何触发一个作业?多工作依赖工作?
【发布时间】:2018-09-03 07:18:12
【问题描述】:

我的 python redis 队列中有一个嵌套的作业结构。首先执行 rncopy 作业。完成后,接下来是 3 个依赖注册作业。当所有这 3 个作业的计算完成后,我想触发一个作业以向我的前端发送 websocket 通知。

我目前的尝试:

    rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
    t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
    t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
    fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)
    notify = redisqueue.enqueue(print, patient_finished, patientid, timeout=6000, depends_on=(t1c_reg, t2_reg, fla_reg))

不幸的是,多作业依赖功能似乎从未合并到主服务器中。我看到目前在 git 上有两个拉取请求。有没有我可以使用的解决方法?

很抱歉未能提供可重现的示例。

【问题讨论】:

    标签: python python-3.x redis queue python-rq


    【解决方案1】:

    新版本(RQ >= 1.8)

    您可以简单地使用depends_on 参数,传递一个列表或一个元组。

    rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
    t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
    t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
    fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)
    
    notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=(t1c_reg, t2_reg, fla_reg))
    
    # you can also use a list instead of a tuple:
    # notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=[t1c_reg, t2_reg, fla_reg])
    

    旧版本(RQ

    我使用这个解决方法:如果依赖项是 n,我创建真正函数的 n-1 个包装器:每个包装器依赖于不同的作业。

    这个解决方案有点内卷,但是很管用。

    rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
    t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
    t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
    fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)
    
    notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=t1c_reg)
    
    def first_wrapper(patient_finished, patientid,t2_reg_id,fla_reg_id):
        queue = Queue('YOUR-QUEUE-NAME'))
        queue.enqueue(second_wrapper, patient_finished, patientid, fla_reg_id, timeout=6000, depends_on=t2_reg_id)
    
    def second_wrapper(patient_finished, patientid,fla_reg_id):
        queue = Queue('YOUR-QUEUE-NAME'))
        queue.enqueue(print, patient_finished, patientid, timeout=6000, depends_on=fla_reg_id)
    

    一些注意事项:

    • 我没有将队列对象传递给包装器,因为会出现一些序列化问题;所以,队列必须按名称恢复...

    • 出于同样的原因,我将 job.id(而不是作业对象)传递给包装器。

    【讨论】:

    • 有趣的方法..我希望 rq 尽快获得多工作依赖!调整你的解决方案会给我带来很多混乱的代码,因为我的依赖关系非常嵌套。
    【解决方案2】:

    我创建了一个“rq-manager”来解决具有多个和树状依赖的类似问题: https://github.com/crispyDyne/rq-manager

    具有多个依赖项的项目结构如下所示。

    def simpleTask(x):
        return 2*x
    
    project = {'jobs':[
                {
                    'blocking':True, # this job must finished before moving on.
                    'func':simpleTask,'args': 0
                },
                {
                    'blocking':True, # this job, and its child jobs, must finished before moving on.
                    'jobs':[ # these child jobs will run in parallel
                        {'func':simpleTask,'args': 1},
                        {'func':simpleTask,'args': 2},
                        {'func':simpleTask,'args': 3}],
                },
                { # this job will only run when the blocking jobs above finish.
                    'func':simpleTask,'args': 4
                }
            ]}
    

    然后传给经理完成。

    from rq_manager import manager, getProjectResults
    
    managerJob = q.enqueue(manager,project)
    projectResults = getProjectResults(managerJob)
    
    

    返回

    projectResults = [0, [2, 4, 6], 8]
    

    当依赖作业需要父级的结果时。我创建了一个执行第一个作业的函数,然后将其他作业添加到项目中。所以对于你的例子:

    def firstTask(patientid,imagepath):
    
        raw_nifti_result  = raw_nifti_copymachine(patientid,imagepath)
    
        moreTasks = {'jobs':[
            {'func':modality_registrator,'args':(patientid, "t1c", raw_nifti_result)},
            {'func':modality_registrator,'args':(patientid, "t2", raw_nifti_result)},
            {'func':modality_registrator,'args':(patientid, "fla", raw_nifti_result)},
        ]}
    
        # returning a dictionary with an "addJobs" will add those tasks to the project. 
        return {'result':raw_nifti_result, 'addJobs':moreTasks}
    

    项目如下所示:

    project = {'jobs':[
                {'blocking':True, # this job, and its child jobs, must finished before moving on.
                 'jobs':[
                    {
                        'func':firstTask, 'args':(patientid, imagepath)
                        'blocking':True, # this job must finished before moving on.
                    },
                    # "moreTasks" will be added here
                    ]
                }
                { # this job will only run when the blocking jobs above finish.
                    'func':print,'args': (patient_finished, patientid)
                }
            ]}
    

    如果最终作业需要以前作业的结果,则设置“previousJobArgs”标志。 "finalJob" 将接收先前结果的数组及其子作业结果的嵌套数组。

    def finalJob(previousResults):
        # previousResults = [ 
        #     raw_nifti_copymachine(patientid,imagepath),
        #     [
        #         modality_registrator(patientid, "t1c", raw_nifti_result),
        #         modality_registrator(patientid, "t2", raw_nifti_result),
        #         modality_registrator(patientid, "fla", raw_nifti_result),
        #     ]
        # ]
        return doSomethingWith(previousResults)
    
    

    那么项目应该是这样的

    project = {'jobs':[
                {
                 #'blocking':True, # Blocking not needed.
                 'jobs':[
                    {
                        'func':firstTask, 'args':(patientid, imagepath)
                        'blocking':True, # this job must finished before moving on.
                    },
                    # "moreTasks" will be added here
                    ]
                }
                { # This job will wait, since it needs the previous job's results. 
                    'func':finalJob, 'previousJobArgs': True # it gets all the previous jobs results
                }
            ]}
    

    希望https://github.com/rq/rq/issues/260 得到实施,我的解决方案将过时!

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-02-28
      • 2019-09-06
      • 1970-01-01
      相关资源
      最近更新 更多