【问题标题】:dynamic task id names in AirflowAirflow 中的动态任务 ID 名称
【发布时间】:2020-12-09 02:52:01
【问题描述】:

我有一个带有DataflowTemplateOperator 的 DAG,可以处理不同的 json 文件。当我触发 dag 时,我通过 {{dag_run.conf['param1']}} 传递了一些参数并且工作正常。

我遇到的问题是尝试根据 param1 重命名 task_id

task_id="df_operator_read_object_json_file_{{dag_run.conf['param1']}}",

它只抱怨字母数字字符 或

task_id="df_operator_read_object_json_file_{}".format(dag_run.conf['param1']), 它无法识别 dag_run 以及 alpha 问题。

这背后的整个想法是,当我在数据流作业控制台上看到作业失败时,我知道谁是基于 param1 的罪犯。数据流作业名称基于 task_id,如下所示:

df-operator-read-object-json-file-8b9eecec

我需要的是这个:

df-operator-read-object-param1-json-file-8b9eecec

如果可能的话,有什么想法吗?

【问题讨论】:

  • 你的意思是,对于每次运行,你的 DAG 都会根据你的 json 文件有不同的 task_id 是吗?
  • param1 只能取 5 个 diff 值任务 id 应该是字符串 + param1
  • 这是一个预定的 dag 还是只有手动输入,每次运行都需要手动输入?
  • 一旦文件进入 DAG 存储桶,它就会由一个云函数触发,该函数传递运行 DF 作业所需的所有参数
  • 这意味着如果您调用该函数 1000 次,它将创建 1000 个任务,每个任务都运行一次并且永远不会再次运行?这对 Airflow 来说不是一个好习惯。 DataflowTemplatedJobStartOperator(更新后的运算符)具有 job_name 参数,您可以使用它来随意配置。您不需要将 task_id 绑定到 job_name

标签: airflow google-cloud-composer


【解决方案1】:

无需为每个文件生成新的运算符。 DataflowTemplatedJobStartOperator 具有 job_name 参数,该参数也是模板化的,因此可以与 Jinja 一起使用。

我没有测试它,但这应该可以工作:

from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
op = DataflowTemplatedJobStartOperator(
        task_id="df_operator_read_object_json_file",
        job_name= "df_operator_read_object_json_file_{{dag_run.conf['param1']}}"
        template='gs://dataflow-templates/your_template',
        location='europe-west3',
    )

【讨论】:

    猜你喜欢
    • 2019-03-30
    • 2018-07-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-10-21
    • 1970-01-01
    • 2019-05-13
    • 1970-01-01
    相关资源
    最近更新 更多