【发布时间】:2021-08-15 21:37:38
【问题描述】:
我正在尝试在我的环境中引入动态工作流,其中涉及不同模型推理的多个步骤,其中一个模型的输出被馈送到另一个模型。目前,我们很少有 Celery 工作人员分布在主机上来管理推理链。随着复杂性的增加,我们正在尝试动态构建工作流。为此,我使用 Celeryexecutor 进行了动态 DAG 设置。现在,有没有办法可以保留当前的 Celery 设置并将气流驱动的任务分配给相同的工作人员?我确实了解这些工作人员中的设置应该可以访问 DAG 文件夹和与气流服务器相同的环境。我想知道如何在这些服务器中启动 celery worker,以便气流可以路由过去由 python 应用程序的手动工作流完成的相同任务。如果我使用命令“airflow celery worker”启动工作程序,我将无法访问我的应用程序任务。如果我以目前的方式启动 celery,即“celery -A proj”,气流与它无关。寻找使其发挥作用的想法。
【问题讨论】:
-
我们正是这样做的。我们的 Airflow 工作人员只是将任务发送到我们的 Celery 集群,然后等待结果。简而言之,我们的 Airflow DAG 的任务使用 Celery 的
send_task()将任务发送到我们的 Celery 集群,并返回 .get() 的结果。 -
感谢@DejanLekic 的回复。我已经尝试过了,但我的任务没有到达队列。您能否分享完成此操作的操作员代码?我的设置类似于 python_callable=send_task, op_kwargs={'name':taskname, 'args':json_data}。执行 dag 时没有看到任何错误,但在 UI 或 list 命令的输出中也没有看到 dag 条目
-
好吧,您需要正确配置所有内容,以便您的 Airflow 任务知道要使用哪个代理...
标签: python celery airflow celery-task