【问题标题】:For Apache Airflow, How can I pass the parameters when manually trigger DAG via CLI?对于 Apache Airflow,如何通过 CLI 手动触发 DAG 时传递参数?
【发布时间】:2019-05-08 20:47:11
【问题描述】:

我使用 Airflow 来管理 ETL 任务的执行和计划。一个 DAG 已创建并且工作正常。但是通过cli手动触发dag时是否可以传递参数。

例如: 我的 DAG 每天 01:30 运行,处理昨天的数据(时间范围从昨天 01:30 到今天 01:30)。数据源可能存在一些问题。我需要重新处理这些数据(手动指定时间范围)。

那么我可以创建这样一个气流 DAG,当它被安排时,默认时间范围是从昨天 01:30 到今天 01:30。那么如果数据源有问题,我需要手动触发 DAG 并手动将时间范围作为参数传递。

据我所知 airflow test-tp 可以将参数传递给任务。但这仅用于测试特定任务。而airflow trigger_dag 没有-tp 选项。那么有没有什么办法可以通过tigger_dag向DAG传递参数,然后Operator可以读取这些参数呢?

谢谢!

【问题讨论】:

    标签: airflow


    【解决方案1】:
    key: ['param1=somevalue1', 'param2=somevalue2']
    

    第一种方式:

    "{{ dag_run.conf["key"] }}"
    

    这会将传递的值呈现为字符串"['param1=somevalue1', 'param2=somevalue2']"

    第二种方式:

    def get_parameters(self, **kwargs):
        dag_run = kwargs.get('dag_run')
        parameters = dag_run.conf['key']
        return parameters
    

    在这种情况下,正在传递一个字符串列表并将其呈现为列表['param1=somevalue1', 'param2=somevalue2']

    【讨论】:

      【解决方案2】:

      您可以使用 --conf '{"key":"value"}' 从 CLI 传递参数,然后在 DAG 文件中将其用作模板字段中的 "{{ dag_run.conf["key"] }}"

      CLI

      airflow trigger_dag 'example_dag_conf' -r 'run_id' --conf '{"message":"value"}'
      

      DAG 文件

      args = {
          'start_date': datetime.utcnow(),
          'owner': 'airflow',
      }
      
      dag = DAG(
          dag_id='example_dag_conf',
          default_args=args,
          schedule_interval=None,
      )
      
      def run_this_func(ds, **kwargs):
          print("Remotely received value of {} for key=message".
                format(kwargs['dag_run'].conf['message']))
      
      
      run_this = PythonOperator(
          task_id='run_this',
          provide_context=True,
          python_callable=run_this_func,
          dag=dag,
      )
      
      # You can also access the DagRun object in templates
      bash_task = BashOperator(
          task_id="bash_task",
          bash_command='echo "Here is the message: '
                       '{{ dag_run.conf["message"] if dag_run else "" }}" ',
          dag=dag,
      )
      

      【讨论】:

      • 有没有办法在非模板字段中传递参数?
      • @AshuGG 当你设置 provide_context=True 时,运行参数会在 kwargs['dag_run'].conf ,如代码示例所示
      • 我也需要访问该参数值,因为我需要循环到该值并创建任务。如何在非模板字段之外访问该参数值?
      • 你不能这样做,你需要使用气流变量:)
      【解决方案3】:

      根据气流文档,这应该可以工作:https://airflow.apache.org/cli.html#trigger_dag

      airflow trigger_dag -c '{"key1":1, "key2":2}' dag_id

      确保-c 的值是一个有效的json 字符串,所以这里需要用双引号包裹键。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2021-12-08
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-01-16
        相关资源
        最近更新 更多