【问题标题】:Celery Generating group tasks from chain taskCelery 从链任务生成组任务
【发布时间】:2017-03-27 01:42:21
【问题描述】:

我正在尝试使用 celery(v4.0) 链接以下任务,

task = group([tasks1.s(), task2.s()) | generate_job_requests.s() | execute_job.map() | aggregate_result.s()
result = task.get()

以上部分在generate_job_requests 作为和弦之前工作正常。 但是问题从execute_job 开始,它从generate_job_requests 获取作业列表,我需要为此创建并行任务,然后是所有作业的汇总结果。

我正在尝试验证 celery 是否可以使用这种任务图?是否有任何可能的替代工作流程来解决这种依赖问题? 我在文档中缺少的任何内容。

【问题讨论】:

    标签: python celery celery-task


    【解决方案1】:

    我在中间任务创建器中使用了类似地图的功能,它的作用类似于和弦,

    @shared_task(ignore_result=False)
    def dmap(it, callback, end_task):
        callback = subtask(callback)
        grp = group(callback.clone([arg, ]) for arg in it)
        c = (grp | end_task)
        return c()
    

    所以任务流程就这样减少了,

    task = (group([tasks1.s(), task2.s()) | generate_job_requests.s() | dmap.s(
            execute_job.s(), aggregate_result.s())).apply_async()
    

    为了获得最终的任务输出,我做了一些调整,

    # As we get dmap task id here
    dmap_task = celery_app.AsyncResult(task.id)
    dmap_result = dmap_task.get()
    # Get actual aggregate_result task id
    aggr_res_task_id = dmap_result[0][0]
    result = celery_app.AsyncResult(aggr_res_task_id)
    # Here we receive actual output of overall task
    result.get()
    

    我推荐了answer

    【讨论】:

      猜你喜欢
      • 2014-11-03
      • 2019-02-22
      • 2017-06-27
      • 1970-01-01
      • 2017-12-21
      • 1970-01-01
      • 1970-01-01
      • 2013-02-13
      • 1970-01-01
      相关资源
      最近更新 更多