不确定你所说的“动态”是什么意思,但是当 yaml 文件更新时,如果读取文件过程在 dag 文件正文中,则会刷新 dag 以从 yaml 文件中申请新的 args。所以实际上,您不需要 XCOM 来获取参数。
只需简单地创建一个 params 字典,然后传递给 default_args:
CONFIGFILE = os.path.join(
os.path.dirname(os.path.realpath(\__file__)), 'your_yaml_file')
with open(CONFIGFILE, 'r') as ymlfile:
CFG = yaml.load(ymlfile)
default_args = {
'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
'project_id': CFG['section_A']['project_id'],
'zone': CFG['section_A']['zone'],
'mage_version': CFG['section_A']['image_version'],
'num_workers': CFG['section_A']['num_workers'],
'worker_machine_type': CFG['section_A']['worker_machine_type'],
# you can add all needs params here.
}
DAG = DAG(
dag_id=DAG_NAME,
schedule_interval=SCHEDULE_INTEVAL,
default_args=default_args, # pass the params to DAG environment
)
Task1 = DataprocClusterCreateOperator(
task_id='your_task_id',
dag=DAG
)
但如果您想要动态 dag 而不是参数,您可能需要其他策略,例如 this。
所以你可能需要弄清楚基本思想:
动态在哪个级别?任务级别? DAG 级别?
或者您可以创建自己的 Operator 来完成这项工作并获取参数。