【问题标题】:How to pass execution_date as parameter in SparkKubernetesOperator operator?如何在 SparkKubernetesOperator 运算符中将 execution_date 作为参数传递?
【发布时间】:2021-03-04 19:26:16
【问题描述】:

我正在尝试找到一种将 execution_Date 传递给 SparkKubernetesOperator 的方法。 任何方式都可以通过,因为我将使用 spark run 和 s3 分区的执行日期。

submit_compaction_to_spark = SparkKubernetesOperator(
        task_id="submit_compaction_to_spark",
        application_file="/k8s/compaction_s3.yml",
        namespace=kubernetes_namespace,
        kubernetes_conn_id="kubernetes",
        params={
            "warehouse_path": s3_path,
            "snapshot_expire_time": execution_date,
            "partition_filter": execution_date,
            "k8s_namespace": kubernetes_namespace,
            "docker_image_tag": docker_image_tag,
        }

【问题讨论】:

    标签: apache-spark kubernetes airflow


    【解决方案1】:

    不幸的是,params 仅向 jinja 公开自定义值,但不会在其中呈现 jinja 模板。

    例如,让我们看看这个 PythonOperator。

    op = PythonOperator(
        task_id="my_operator",
        python_callable=lambda **context: print(context['params']),
        params={
            "date": "{{ execution_date }}"
        },
        dag=dag
    )
    

    日期键的值是文字字符串"{{ execution_date }}",而不是呈现的值。

    [2021-03-05 01:24:26,770] {logging_mixin.py:103} INFO - {'date': '{{ execution_date }}'}
    

    BaseOperator 中的 params 钩子允许你传递一个字典 模板的参数和/或对象。请花时间 了解参数 my_param 如何传递到 模板。

    您可以在Airflow Documentation 中阅读更多关于 Jinja 模板与参数的信息。


    可以以其他方式使用execution_dateSparkKubernetesOperator 通过这些设置利用 jinja 模板。

    template_fields = ['application_file', 'namespace']  
    template_ext = ('yaml', 'yml', 'json')
    

    SparkKubernetesOperator 有两个模板化字段,application_filenamespace,这意味着您可以使用 jinja 模板作为值。如果您引用具有这些扩展名的文件,它将在其中呈现文件和 jinja 模板。

    让我们修改您提供的运算符。

    submit_compaction_to_spark = SparkKubernetesOperator(
            task_id="submit_compaction_to_spark",
            application_file="/k8s/compaction_s3.yml",
            namespace=kubernetes_namespace,
            kubernetes_conn_id="kubernetes",
            params={
                "k8s_namespace": kubernetes_namespace,
                "warehouse_path": s3_path,
            }
    )
    

    我要猜猜/k8s/compaction_s3.yml 长什么样子,然后添加一些 jinja 模板。

    ---
    apiVersion: "sparkoperator.k8s.io/v1beta2"
    kind: SparkApplication
    metadata:
      name: "spark-pi-{{ ds }}-{{ task_instance.try_number }}"
      namespace: "{{ params.k8s_namespace }}"
      labels:
        warehouse_path: "{{ params.k8s_namespace }}"
        date: "{{ ds }}"
    spec:
      type: Scala
      mode: cluster
      image: "gcr.io/spark-operator/spark:v2.4.4"
      imagePullPolicy: Always
      mainClass: org.apache.spark.examples.SparkPi
      mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.4.jar"
      sparkVersion: "2.4.4"
      restartPolicy:
        type: Never
      volumes:
        - name: "test-volume"
          hostPath:
            path: "/tmp"
            type: Directory
      driver:
        cores: 1
        coreLimit: "1200m"
        memory: "512m"
        labels:
          version: 2.4.4
        serviceAccount: spark
        volumeMounts:
          - name: "test-volume"
            mountPath: "/tmp"
      executor:
        cores: 1
        instances: 1
        memory: "512m"
        labels:
          version: 2.4.4
        volumeMounts:
          - name: "test-volume"
            mountPath: "/tmp"
    

    您可以检查 DAG 中任务实例的渲染模板视图。

    另请参考 Airflow 文档中的 example DAGsample application_file

    【讨论】:

    • 感谢您提供我用作此的详细信息,但我试图在 dag 中获取 execution_date 并获取年、月、日、小时并将其发送到 spark 应用程序中,所以该怎么做那个?
    • 我曾尝试像这样使用,但这与 execution_date 不同: execution_dt = new_dag.get_latest_execution_date() filter_dt = pendulum.parse(str(start_date if execution_dt is None else execution_dt)) partition_filter = ( f"year={filter_dt.year:02},month={filter_dt.month:02},date={filter_dt.day:02},hour={filter_dt.hour - 1:02}" ) 但在这里如果我想回填,请获取最新日期,然后最新执行将给出错误日期
    【解决方案2】:

    https://stackoverflow.com/questions/66481727/how-to-pass-execution-date-as-parameter-in-sparkkubernetesoperator-operator\\

    apiVersion: "sparkoperator.k8s.io/v1beta2"
    kind: SparkApplication
    metadata:  
        name: "sparkapp-test-{{ ts_nodash\|lower }}-{{ task_instance.try_number }}"
        namespace: "default"
    
    spec:
      type: Python
      pythonVersion: "3"
      ...
      timeToLiveSeconds: 3600 # delete sparkapplication after an hour
    

    需要一个唯一的 sparkapplication id,所以我将 |lower 设为小写 T

    $ k get pod | grep sparkapp-test
    # 1    sparkapp-test-20211122t070418-1-driver                    1/1     Running     0          15s
    
    $ k get pod | grep sparkapp-test
    # 2-1 sparkapp-test-20211122t070418-1-a882cc7d46760183-exec-1   1/1     Running     0          4s
    # 2-2 sparkapp-test-20211122t070418-1-a882cc7d46760183-exec-2   1/1     Running     0          4s
    # 2-3 sparkapp-test-20211122t070418-1-a882cc7d46760183-exec-3   1/1     Running     0          4s
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-03-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-11-18
      • 2018-03-27
      相关资源
      最近更新 更多