【发布时间】:2021-12-06 03:45:42
【问题描述】:
我们有一个在 Pod 中运行的应用程序,我想用气流触发。该应用程序与许多实体一起运行并花费大量时间。我们设置的性质是其中一些可能会失败,我们希望能够仅使用一个或几个实体重新运行:
my_program # Run full application
my_program -e entity1 -e entity2 # Run application limited to entity1 and entety2.
我的计划是允许用户使用 Airflow UI 中的“带配置触发器”的实体列表再次触发 DAG,并使用 {{ dag_run.conf }} 选项限制 DAG。
我现在面临的问题是KubernetesPodOperator 需要一个字符串列表,我不明白如何使用 jinja 构造一个我以前不知道它的长度的列表。
这是我尝试过的,但是当然 jinja 不会被模板化。我了解如何将模板化字符串插入列表中,但现在当我事先不知道列表的长度时,我该怎么做。
with DAG(
"my_dag",
description="Run my dag",
schedule_interval="@daily",
start_date=datetime.datetime(2021, 10, 14),
default_args=default_args,
) as dag:
entities = """{%- for entity in dag_run.conf['entities'] -%} -p {{ entity }} {% endfor %}"""
arguments = list(filter(None, ['my_program', *entities.split(' ')]))
t1 = KubernetesPodOperator(
task_id="my_task_id",
image="url_to_docker_image:latest",
name="my_task_name",
arguments=arguments,
is_delete_operator_pod=True,
env_vars={"AIRFLOW_RUN_ID": "{{ run_id }}"},
)
编辑:这是我第二次尝试使用 jinja 和 render_template_as_native_obj=True,
with DAG(
"my_dag",
description="Run my dag",
schedule_interval="@daily",
start_date=datetime.datetime(2021, 10, 14),
default_args=default_args,
render_template_as_native_obj=True,
) as dag:
arguments = """['my_program', {% if entities is defined %}
{%- for entity in entities-%} '-p', '{{ entity }}', {% endfor %}
{%- endif %}]
"""
t1 = KubernetesPodOperator(
task_id="my_task_id",
image="url_to_docker_image:latest",
name="my_task_name",
arguments=arguments, # type: ignore
is_delete_operator_pod=True,
env_vars={"AIRFLOW_RUN_ID": "{{ run_id }}"},
)
但这似乎没有正确转换为列表:
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Pod in version \"v1\" cannot be handled as a Pod: v1.Pod.Spec: v1.PodSpec.Containers: []v1.Container: v1.Container.Args: []string: decode slice: expect [ or n, but found \", error found in #10 byte of ...|{\"args\": \"['my_program|...
【问题讨论】:
-
只是为了确认一下:当您调用运算符时,参数变量包含类似
["{%-", "for", "entity", "in", "dag_run.conf['entities']", "-%}", "-p" ....etc ] -
我收到此错误:
jinja2.exceptions.TemplateSyntaxError: tag name expected,我相信这是由于 jinja 试图在上面的示例中运行模板。
标签: python templates arguments jinja2 airflow