【问题标题】:Use existing celery workers for Airflow's Celeryexecutor workers为 Airflow 的 Celeryexecutor 工作人员使用现有的 celery 工作人员
【发布时间】:2021-08-15 21:37:38
【问题描述】:

我正在尝试在我的环境中引入动态工作流,其中涉及不同模型推理的多个步骤,其中一个模型的输出被馈送到另一个模型。目前,我们很少有 Celery 工作人员分布在主机上来管理推理链。随着复杂性的增加,我们正在尝试动态构建工作流。为此,我使用 Celeryexecutor 进行了动态 DAG 设置。现在,有没有办法可以保留当前的 ​​Celery 设置并将气流驱动的任务分配给相同的工作人员?我确实了解这些工作人员中的设置应该可以访问 DAG 文件夹和与气流服务器相同的环境。我想知道如何在这些服务器中启动 celery worker,以便气流可以路由过去由 python 应用程序的手动工作流完成的相同任务。如果我使用命令“airflow celery worker”启动工作程序,我将无法访问我的应用程序任务。如果我以目前的方式启动 celery,即“celery -A proj”,气流与它无关。寻找使其发挥作用的想法。

【问题讨论】:

  • 我们正是这样做的。我们的 Airflow 工作人员只是将任务发送到我们的 Celery 集群,然后等待结果。简而言之,我们的 Airflow DAG 的任务使用 Celery 的 send_task() 将任务发送到我们的 Celery 集群,并返回 .get() 的结果。
  • 感谢@DejanLekic 的回复。我已经尝试过了,但我的任务没有到达队列。您能否分享完成此操作的操作员代码?我的设置类似于 python_callable=send_task, op_kwargs={'name':taskname, 'args':json_data}。执行 dag 时没有看到任何错误,但在 UI 或 list 命令的输出中也没有看到 dag 条目
  • 好吧,您需要正确配置所有内容,以便您的 Airflow 任务知道要使用哪个代理...

标签: python celery airflow celery-task


【解决方案1】:

感谢@DejanLekic。我让它工作了(尽管 DAG 任务调度延迟太大,我放弃了这种方法)。如果有人想看看这是如何实现的,我做了几件事来让它发挥作用。

  1. 更改airflow.cfg 以更改执行器、队列和结果后端设置(明显)
  2. 如果我们必须使用在气流伞外生成的 Celery worker,请将 celery_app_name 设置更改为 celery.execute 而不是气流.executors.celery_execute 并将执行器更改为“LocalExecutor”。我没有对此进行测试,但甚至可以通过在项目的 celery App 中注册气流的 Task 来避免切换到 celery 执行器。
  3. 现在每个任务都将调用 send_task(),然后返回的 AsynResult 对象存储在 Xcom(隐式或显式)或 Redis(隐式推送到队列)中,然后子任务将收集 Asyncresult(它将是从 Xcom 或 Redis 获取值的隐式调用),然后调用 .get() 以获取上一步的结果。

注意:没有必要在 DAG 的两个任务之间拆分 send_task() 和 .get()。通过将它们分配给父母和孩子,我试图利用任务之间的滞后。但在我的例子中,celery 任务的执行完成速度比气流在调度相关任务时的固有延迟要快。

【讨论】:

    最近更新 更多