【问题标题】:Airflow DAG tasks parallelism on different worker nodesAirflow DAG 在不同工作节点上执行并行任务
【发布时间】:2021-08-07 02:50:45
【问题描述】:

我有一个由 3 个工作节点组成的 Airflow 集群,其中 CeleryExecutor 和 RabbitMQ 用于通信。 我的 DAG 通常由下载文件、解压缩文件、上传到 hadoop 等任务组成。因此它们相互依赖并且必须在单个机器/节点上运行。

当气流调度单个 DAG 的这些任务时,在不同的节点上,我最终会出错,因为这些任务被调度在不同的机器上,但我需要将 DAG 中的所有任务调度在一个单机。

我尝试在气流.cfg 和初始化 dag 时设置 dag_concurrency = 1 和 max_active_runs_per_dag = 1,但没有成功。

我的气流.cfg 的其余部分:

parallelism = 32
dag_concurrency = 1
worker_concurrency = 16
max_active_runs_per_dag = 16

据我了解,将 dag_concurrency 设置为 1 应该可以解决问题,但我在这里缺少什么?

【问题讨论】:

    标签: python airflow-scheduler airflow


    【解决方案1】:

    CeleryExecutor 支持多个queues,您可以为每个操作员定义一个特定的队列(是BaseOperator 的一个属性),然后为每个工作人员订阅该特定队列。请注意,worker 可以监听一个或多个队列。

    来自docs

    Worker 可以监听一个或多个任务队列。当工人 启动(使用命令airflow celery worker),一组 可以指定逗号分隔的队列名称(例如,airflow celery 工人 -q 火花)。然后,该工作人员将仅拾取连接到的任务 指定的队列

    这是一个 DAG 示例:

    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.utils.dates import days_ago
    
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': days_ago(1),
    }
    dag = DAG('dist_example',
              schedule_interval='@once',
              catchup=False,
              default_args=default_args
              )
    get_hostname = 'echo $(hostname)'
    
    t1 = BashOperator(
        task_id='task_for_q1',
        bash_command=get_hostname,
        queue='queue_1',
        dag=dag
    )
    t2 = BashOperator(
        task_id='task_for_q2',
        bash_command=get_hostname,
        queue='queue_2',
        dag=dag
    )
    t1 >> t2
    
    

    worker_1: airflow celery worker -q default,queue_1

    worker_2: airflow celery worker -q default,queue_2

    通过侦听您的特定队列和default(由default_queue 配置键定义),您不会影响任何其他任务的标准多工人行为。

    希望对你有用!

    【讨论】:

    • 感谢您的回复。我会试试这个,也让其他人知道。 dag_concurrency 怎么样?我是否误解了它应该做什么或做什么?
    • 是的,我想是的。 Quote: ..."dag_concurrency 确定 Airflow Scheduler 能够在每个 DAG 一次调度多少个任务实例。可以将其视为“每个 DAG 一次可以调度的最大任务。”"...如果您有多个工人,这些任务将在他们之间分配。
    • 再次感谢,您的解决方案有效,我现在正在使用它。但是我仍然不明白为什么简单地设置 dag_concurrency = 1 没有做这项工作。
    • 很高兴听到它成功了!关于dag_concurrency,Airflow FAQ 将其称为......“定义了一个 DAG 允许有多少个正在运行的任务实例,超过这个点,事情就会排队”......您还可以监控有多少任务正在执行使用 Celery Flower 执行,当仅运行特定的 DAG 时,您应该看到正在执行的任务的最大数量将等于并发设置,其他任务保持排队。
    猜你喜欢
    • 1970-01-01
    • 2017-11-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-08-12
    • 1970-01-01
    相关资源
    最近更新 更多