【问题标题】:Accessing configuration parameters passed to Airflow through CLI访问通过 CLI 传递给 Airflow 的配置参数
【发布时间】:2022-01-05 22:19:58
【问题描述】:

我尝试在触发 dag 运行时将以下配置参数传递给 Airflow CLI。以下是我正在使用的 trigger_dag 命令。

airflow trigger_dag  -c '{"account_list":"[1,2,3,4,5]", "start_date":"2016-04-25"}'  insights_assembly_9900 

我的问题是如何访问在 dag 运行中的运算符内部传递的 con 参数。

【问题讨论】:

    标签: python airflow


    【解决方案1】:

    这可能是devj提供的答案的延续。

    1. airflow.cfg 应将以下属性设置为 true: dag_run_conf_overrides_params=True

    2. 在定义 PythonOperator 时,传递以下参数 provide_context=True。例如:

    get_row_count_operator = PythonOperator(task_id='get_row_count',python_callable=do_work,dag=dag,provide_context=True)
    1. 定义python callable(注意**kwargs的使用):
    def do_work(**kwargs): table_name = kwargs['dag_run'].conf.get('table_name') # 其余代码
    1. 从命令行调用 dag:
    气流 trigger_dag read_hive --conf '{"table_name":"my_table_name"}'

    我发现this 讨论很有帮助。

    【讨论】:

    • 我收到错误 dag_run not fount on Airflow 服务器
    • 如何循环作为参数传递的 account_list?
    • 如果使用 docker 操作符而不是 python 操作符会怎样?是否仍然可以将 dar_run.cong 参数传递给 docker 操作员的命令?
    • 也可以使用:{{ params.table_name}}
    【解决方案2】:

    有两种方法可以访问airflow trigger_dag 命令中传递的参数。

    1. 在PythonOperator中定义的callable方法中,可以通过kwargs['dag_run'].conf.get('account_list')访问参数

    2. 鉴于你使用这个东西的字段是模板字段,可以使用{{ dag_run.conf['account_list'] }}

    外部可触发 DAG 的 schedule_interval 设置为 None 以实现上述工作方式

    【讨论】:

    • 有没有办法从with DAG() as dag: 块中访问dag_run?我想根据是否存在 conf 键将 params 值解析为任务,如果不存在,则取 default_arg 值(而不是在 jinja 模板本身中放入太多逻辑)。
    • 也许是conf = dag.get_dagrun(execution_date=dag.latest_execution_date).conf
    • @yee379 不;在 Dag 文件中,您正在定义 DAG,但 Run 仅在计划或触发后才存在。就像定义了一个任务,但一个任务实例只在运行期间存在。不要在每次运行时动态更改 dag 结构。而是根据条件制作无操作或跳过自身的任务。
    • @dlamblin,等等,为什么我们不希望能够从 DAG 上下文中访问每次运行的配置?通过将上下文推送给操作符,这意味着如果有人想为同一个 DAG 使用插件模型(例如,每个客户有一个配置文件),这意味着您必须隐藏操作符中的常量,而不是在 DAG 中公开它们定义。从根本上说,它们都可以工作,但如果你想的话,它会消除明确表达的能力。
    • @yee379 你能解决这个问题吗?即使我想访问从 DAG 中的 cmd/UI 传递的 conf
    【解决方案3】:

    如果您尝试访问 Airflow 系统范围的配置(而不是 DAG 配置),以下可能会有所帮助:

    首先,导入这个

    from airflow.configuration import conf
    

    其次,在某处获取价值

    conf.get("core", "my_key")
    

    可能,设置一个值

    conf.set("core", "my_key", "my_val")
    

    【讨论】:

    • 这是一个完全不同的配置,与 DAG 运行配置无关。
    • 我认为问题中的区别并不那么明显。这仍然对我有帮助!谢谢
    【解决方案4】:

    对于我的用例,我必须使用 API 将参数传递给气流工作流(或任务)。我的工作流程如下:当新文件落入 S3 存储桶时触发 Lambda,Lambda 反过来触发气流 DAG 并传递存储桶名称和文件的密钥。

    这是我的解决方案:

    s3 = boto3.client('s3')
    mwaa = boto3.client('mwaa')
    
    def lambda_handler(event, context):
        # print("Received event: " + json.dumps(event, indent=2))
    
        # Get the object from the event and show its content type
        bucket = event['Records'][0]['s3']['bucket']['name']
        key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
        
        mwaa_cli_token = mwaa.create_cli_token(
            Name=mwaa_env_name
        )
        
        mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken']
        mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname'])
        
        conf = {'bucket': bucket, 'key': key}
        raw_data = """{0} {1} --conf '{2}'""".format(mwaa_cli_command, dag_name, json.dumps(conf))
        
        # pass the key and bucket name to airflow to initiate the workflow
        requests.post(
                mwaa_webserver_hostname,
                headers={
                    'Authorization': mwaa_auth_token,
                    'Content-Type': 'text/plain'
                    },
                data=raw_data
                )
    

    【讨论】:

      猜你喜欢
      • 2018-05-03
      • 2016-08-07
      • 1970-01-01
      • 2019-05-08
      • 1970-01-01
      • 2017-08-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多