【问题标题】:How to pass dynamic arguments Airflow operator?如何传递动态参数气流运算符?
【发布时间】:2019-03-24 01:55:51
【问题描述】:

我正在使用 Airflow 在 Google Cloud Composer 上运行 Spark 作业。我需要

  • 创建集群(YAML 参数由用户提供)
  • spark 作业列表(作业参数也由每个作业 YAML 提供)

使用 Airflow API - 我可以读取 YAML 文件,并使用 xcom 跨任务推送变量。

但是,考虑DataprocClusterCreateOperator()

  • cluster_name
  • project_id
  • zone

还有一些其他参数被标记为模板化。

如果我想将其他参数作为模板传递(目前不是这样)怎么办? - 喜欢image_versionnum_workersworker_machine_type 等等?

有什么解决方法吗?

【问题讨论】:

    标签: google-cloud-platform google-cloud-composer airflow


    【解决方案1】:

    不确定你所说的“动态”是什么意思,但是当 yaml 文件更新时,如果读取文件过程在 dag 文件正文中,则会刷新 dag 以从 yaml 文件中申请新的 args。所以实际上,您不需要 XCOM 来获取参数。 只需简单地创建一个 params 字典,然后传递给 default_args:

    CONFIGFILE = os.path.join(
        os.path.dirname(os.path.realpath(\__file__)), 'your_yaml_file')
    
    with open(CONFIGFILE, 'r') as ymlfile:
        CFG = yaml.load(ymlfile)
    
    default_args = {
        'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
        'project_id': CFG['section_A']['project_id'],
        'zone': CFG['section_A']['zone'],
        'mage_version': CFG['section_A']['image_version'],
        'num_workers': CFG['section_A']['num_workers'],
        'worker_machine_type': CFG['section_A']['worker_machine_type'],
        # you can add all needs params here.
    }
    
    DAG = DAG(
        dag_id=DAG_NAME,
        schedule_interval=SCHEDULE_INTEVAL,
        default_args=default_args, # pass the params to DAG environment
    )
    
    Task1 = DataprocClusterCreateOperator(
        task_id='your_task_id',
        dag=DAG
    )
    

    但如果您想要动态 dag 而不是参数,您可能需要其他策略,例如 this

    所以你可能需要弄清楚基本思想: 动态在哪个级别?任务级别? DAG 级别?

    或者您可以创建自己的 Operator 来完成这项工作并获取参数。

    【讨论】:

    猜你喜欢
    • 2019-07-10
    • 2019-12-16
    • 2021-12-10
    • 2023-03-15
    • 2021-07-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多