【发布时间】:2020-12-09 02:52:01
【问题描述】:
我有一个带有DataflowTemplateOperator 的 DAG,可以处理不同的 json 文件。当我触发 dag 时,我通过 {{dag_run.conf['param1']}} 传递了一些参数并且工作正常。
我遇到的问题是尝试根据 param1 重命名 task_id。
即task_id="df_operator_read_object_json_file_{{dag_run.conf['param1']}}",
它只抱怨字母数字字符 或
task_id="df_operator_read_object_json_file_{}".format(dag_run.conf['param1']),
它无法识别 dag_run 以及 alpha 问题。
这背后的整个想法是,当我在数据流作业控制台上看到作业失败时,我知道谁是基于 param1 的罪犯。数据流作业名称基于 task_id,如下所示:
df-operator-read-object-json-file-8b9eecec
我需要的是这个:
df-operator-read-object-param1-json-file-8b9eecec
如果可能的话,有什么想法吗?
【问题讨论】:
-
你的意思是,对于每次运行,你的 DAG 都会根据你的 json 文件有不同的 task_id 是吗?
-
param1 只能取 5 个 diff 值任务 id 应该是字符串 + param1
-
这是一个预定的 dag 还是只有手动输入,每次运行都需要手动输入?
-
一旦文件进入 DAG 存储桶,它就会由一个云函数触发,该函数传递运行 DF 作业所需的所有参数
-
这意味着如果您调用该函数 1000 次,它将创建 1000 个任务,每个任务都运行一次并且永远不会再次运行?这对 Airflow 来说不是一个好习惯。 DataflowTemplatedJobStartOperator(更新后的运算符)具有 job_name 参数,您可以使用它来随意配置。您不需要将 task_id 绑定到 job_name
标签: airflow google-cloud-composer