【问题标题】:How to dynamically set task routes over celery tasks?如何动态设置芹菜任务的任务路线?
【发布时间】:2021-05-30 04:10:34
【问题描述】:

我有一个 API 端点,它从用户那里获取一些数据并使用 1 个或多个 celery worker 处理它。

例如,有 3 个不同的任务:

@celery.task
def a(data):
    res = do_a_work(data)
    if res.status:
         b.apply_async(res)
    else:
         return {'message': 'a task failed'}


@celery.task
def b(data):
    res = do_b_work(data)
    if res.status:
        c.apply_async(res)
    else:
         return {'message': 'b task failed'}


@celery.task
def c(data):
    res = do_c_work(data)
    return {'result': res}

API 端点根据数据将任务提交给ab

可能的流程:

  • a -> b -> c -> 结果
  • b -> c -> 结果
  • a -> 结果(如果 a 状态为 False)
  • a -> b -> 结果(如果 b 状态为 False)
  • b -> 结果(如果 b 状态为 False)

我计划使用 celery 链,我可以在其中定义应在该数据上执行哪些任务。但问题是如果任务a 或任务b 状态为False,我没有找到停止处理的方法。

另一种方法是从任务内部提交额外的任务,如任务示例,但问题是用户需要获取一个 task_id 以便稍后他可以得到结果。

我可以使用第三种方法来解决这个问题吗?

【问题讨论】:

    标签: python python-3.x redis celery celery-task


    【解决方案1】:

    chord 是执行此操作的正确方法,您只需重写 do_a_workdo_b_work 以引发异常而不返回 True/False 值。重写这些函数以引发异常后,您可以使用和弦编写流程并设置错误。

    打电话给a,你可能会做类似beloe的事情

    chord(chord(a.s(data), b.s()).on_error(handle_a_error.s()), c.s()).on_error(handle_b_error.s())
    

    【讨论】:

      猜你喜欢
      • 2012-11-16
      • 1970-01-01
      • 1970-01-01
      • 2011-04-21
      • 1970-01-01
      • 1970-01-01
      • 2020-11-22
      • 2021-10-12
      • 2014-12-04
      相关资源
      最近更新 更多