【问题标题】:Construct list using jinja2 for KubernetesPodOperator in Airflow在 Airflow 中使用 jinja2 为 KubernetesPodOperator 构建列表
【发布时间】:2021-12-06 03:45:42
【问题描述】:

我们有一个在 Pod 中运行的应用程序,我想用气流触发。该应用程序与许多实体一起运行并花费大量时间。我们设置的性质是其中一些可能会失败,我们希望能够仅使用一个或几个实体重新运行:

my_program # Run full application

my_program -e entity1 -e entity2 # Run application limited to entity1 and entety2.

我的计划是允许用户使用 Airflow UI 中的“带配置触发器”的实体列表再次触发 DAG,并使用 {{ dag_run.conf }} 选项限制 DAG。

我现在面临的问题是KubernetesPodOperator 需要一个字符串列表,我不明白如何使用 jinja 构造一个我以前不知道它的长度的列表。

这是我尝试过的,但是当然 jinja 不会被模板化。我了解如何将模板化字符串插入列表中,但现在当我事先不知道列表的长度时,我该怎么做。

with DAG(
    "my_dag",
    description="Run my dag",
    schedule_interval="@daily",
    start_date=datetime.datetime(2021, 10, 14),
    default_args=default_args,
) as dag:

    entities = """{%- for entity in dag_run.conf['entities'] -%} -p {{ entity }} {% endfor %}"""
    arguments = list(filter(None, ['my_program', *entities.split(' ')]))

    t1 = KubernetesPodOperator(
        task_id="my_task_id",
        image="url_to_docker_image:latest",
        name="my_task_name",
        arguments=arguments,
        is_delete_operator_pod=True,
        env_vars={"AIRFLOW_RUN_ID": "{{ run_id }}"},
    )

编辑:这是我第二次尝试使用 jinja 和 render_template_as_native_obj=True,

with DAG(
    "my_dag",
    description="Run my dag",
    schedule_interval="@daily",
    start_date=datetime.datetime(2021, 10, 14),
    default_args=default_args,
    render_template_as_native_obj=True,
) as dag:


    arguments = """['my_program', {% if entities is defined %}
      {%- for entity in entities-%} '-p', '{{ entity }}', {% endfor %}
      {%- endif %}]
    """

    t1 = KubernetesPodOperator(
        task_id="my_task_id",
        image="url_to_docker_image:latest",
        name="my_task_name",
        arguments=arguments, # type: ignore
        is_delete_operator_pod=True,
        env_vars={"AIRFLOW_RUN_ID": "{{ run_id }}"},
    )

但这似乎没有正确转换为列表:

HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Pod in version \"v1\" cannot be handled as a Pod: v1.Pod.Spec: v1.PodSpec.Containers: []v1.Container: v1.Container.Args: []string: decode slice: expect [ or n, but found \", error found in #10 byte of ...|{\"args\": \"['my_program|...

【问题讨论】:

  • 只是为了确认一下:当您调用运算符时,参数变量包含类似["{%-", "for", "entity", "in", "dag_run.conf['entities']", "-%}", "-p" ....etc ]
  • 我收到此错误:jinja2.exceptions.TemplateSyntaxError: tag name expected,我相信这是由于 jinja 试图在上面的示例中运行模板。

标签: python templates arguments jinja2 airflow


【解决方案1】:

第二种方法进行了微调。当然,该变量在示例中不可用(已被剥离),参数是从dag_run.conf['entities'] 获取的,而不仅仅是entities

第二个问题是 jinja 转换为 python 对象的有效输入,我必须删除字符串末尾的空格以及换行符:

arguments = """['my_program', {% if dag_run.conf['entities'] is defined %}
  {%- for entity in dag_run.conf['entities']-%} '-p', '{{ entity }}', {% endfor %}
  {%- endif %}]
""".replace('\n','').strip()

【讨论】:

    【解决方案2】:

    您的第二次尝试是正确的,但是您的参数变量中的模板在最后一个实体的末尾有一个额外的逗号 (',')。

    import jinja2
    from jinja2.nativetypes import NativeEnvironment
    
    env = NativeEnvironment()
    template = env.from_string(arguments)
    print (template.render(entities=range(5)) )
    

    输出:['my_program', '-p', '0', '-p', '1', '-p', '2', '-p', '3', '-p', '4', ]

    如果您将参数变量更改为此:

    arguments = """
       ['my_program' {% if entities is defined %}
       {%- for entity in entities-%}, '-p', '{{ entity }}' {% endfor %}
       {%- endif %}]
       """
    

    现在的输出是一个字符串,Jinja 可以将其转换为 python 数组: ['my_program' , '-p', '0' , '-p', '1' , '-p', '2' , '-p', '3' , '-p', '4' ]

    【讨论】:

      猜你喜欢
      • 2021-02-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-06-15
      相关资源
      最近更新 更多