【问题标题】:Kubernetes executor do not parallelize sub DAGs execution in AirflowKubernetes 执行器不并行化 Airflow 中的子 DAG 执行
【发布时间】:2018-09-03 09:32:37
【问题描述】:

由于一些执行限制,我们从 Airflow 1.10.0 中的 Celery Executor 移出,现在我们使用的是KubernetesExecutor

现在,即使我们直接更改代码中的subdag_operator,我们也无法并行化某些 DAG 中的所有任务:https://github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/operators/subdag_operator.py#L38

我们的期望是通过这些修改和使用 Kubernetes 执行器,我们可以同时扇出所有任务的执行,但我们的行为与 SequentialExecutor 相同。

这是我们现在的行为:

我们希望使用KubernetesExecutor 同时执行所有这些。

【问题讨论】:

  • k8s 气流执行器在 DAG 中并行执行任务时为我工作。我建议您使用最新的气流版本重试,因为 k8s 执行器非常新
  • 你好@shawmzhu,他们在几个分支前修复了它,这个问题在以前的版本中仍然存在(2018 年 11 月)但是谢谢。
  • 您是否更改了 subdug 类以使用 KubernetesExecutor 作为默认值,而不是使用顺序执行器?

标签: kubernetes airflow airflow-scheduler


【解决方案1】:

Airflow 中的 Kubernetes Executor 会将所有第一级任务变成一个带有 Local Executor 的 worker pod。

这意味着您将让本地执行器执行您的SubDagOperator

为了在生成 worker pod 后运行 SubDagOperator 下的任务,您需要为 worker pod 指定配置 parallelism。因此,如果您对工作 pod 使用 YAML 格式,则需要将其编辑为类似的内容。

apiVersion: v1
kind: Pod
metadata:
  name: dummy-name
spec:
  containers:
    - args: []
      command: []
      env:
        ###################################
        # This is the part you need to add
        ###################################
        - name: AIRFLOW__CORE__PARALLELISM
          value: 10
        ###################################
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor
        # Hard Coded Airflow Envs
        - name: AIRFLOW__CORE__FERNET_KEY
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-fernet-key
              key: fernet-key
        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
        - name: AIRFLOW_CONN_AIRFLOW_DB
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
      envFrom: []
      image: dummy_image
      imagePullPolicy: IfNotPresent
      name: base
      ports: []
      volumeMounts:
        - mountPath: "/opt/airflow/logs"
          name: airflow-logs
        - mountPath: /opt/airflow/dags
          name: airflow-dags
          readOnly: false
        - mountPath: /opt/airflow/dags
          name: airflow-dags
          readOnly: true
          subPath: repo/tests/dags
  hostNetwork: false
  restartPolicy: Never
  securityContext:
    runAsUser: 50000
  nodeSelector:
    {}
  affinity:
    {}
  tolerations:
    []
  serviceAccountName: 'RELEASE-NAME-worker-serviceaccount'
  volumes:
    - name: dags
      persistentVolumeClaim:
        claimName: RELEASE-NAME-dags
    - emptyDir: {}
      name: airflow-logs
    - configMap:
        name: RELEASE-NAME-airflow-config
      name: airflow-config
    - configMap:
        name: RELEASE-NAME-airflow-config
      name: airflow-local-settings

然后,SubDagOperator 将按照指定的parallelism 并行运行任务。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-05-13
    • 2021-08-07
    • 1970-01-01
    • 2023-04-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多