不幸的是,params 仅向 jinja 公开自定义值,但不会在其中呈现 jinja 模板。
例如,让我们看看这个 PythonOperator。
op = PythonOperator(
task_id="my_operator",
python_callable=lambda **context: print(context['params']),
params={
"date": "{{ execution_date }}"
},
dag=dag
)
日期键的值是文字字符串"{{ execution_date }}",而不是呈现的值。
[2021-03-05 01:24:26,770] {logging_mixin.py:103} INFO - {'date': '{{ execution_date }}'}
BaseOperator 中的 params 钩子允许你传递一个字典
模板的参数和/或对象。请花时间
了解参数 my_param 如何传递到
模板。
您可以在Airflow Documentation 中阅读更多关于 Jinja 模板与参数的信息。
可以以其他方式使用execution_date。SparkKubernetesOperator 通过这些设置利用 jinja 模板。
template_fields = ['application_file', 'namespace']
template_ext = ('yaml', 'yml', 'json')
SparkKubernetesOperator 有两个模板化字段,application_file 和 namespace,这意味着您可以使用 jinja 模板作为值。如果您引用具有这些扩展名的文件,它将在其中呈现文件和 jinja 模板。
让我们修改您提供的运算符。
submit_compaction_to_spark = SparkKubernetesOperator(
task_id="submit_compaction_to_spark",
application_file="/k8s/compaction_s3.yml",
namespace=kubernetes_namespace,
kubernetes_conn_id="kubernetes",
params={
"k8s_namespace": kubernetes_namespace,
"warehouse_path": s3_path,
}
)
我要猜猜/k8s/compaction_s3.yml 长什么样子,然后添加一些 jinja 模板。
---
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: "spark-pi-{{ ds }}-{{ task_instance.try_number }}"
namespace: "{{ params.k8s_namespace }}"
labels:
warehouse_path: "{{ params.k8s_namespace }}"
date: "{{ ds }}"
spec:
type: Scala
mode: cluster
image: "gcr.io/spark-operator/spark:v2.4.4"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.4.jar"
sparkVersion: "2.4.4"
restartPolicy:
type: Never
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 2.4.4
serviceAccount: spark
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 2.4.4
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
您可以检查 DAG 中任务实例的渲染模板视图。
另请参考 Airflow 文档中的 example DAG 和 sample application_file。