【问题标题】:BigQuery parameterized queries in AirflowAirflow 中的 BigQuery 参数化查询
【发布时间】:2020-12-02 12:22:25
【问题描述】:

我正在尝试使用 Airflow 在 BigQuery 中创建参数化查询。将在 Airflow 中设置的参数,并将更改将在 BigQuery 中运行的查询中的值。

例如,这是我的查询:

select id from table where id = {{params.number_days}}

气流代码包括:

( task_id='表', use_legacy_sql=假, write_disposition='WRITE_TRUNCATE', allow_large_results=真, bql=merchant_rank_query, destination_dataset_table='prod.table_result', 参数 = {'number_days': 1}, dag=dag)

这样不行,正确的方法是什么?

【问题讨论】:

    标签: parameters google-bigquery airflow


    【解决方案1】:

    对于 Airflow,具体如下所示:

    params = {
        'number_days': 'value_abc'
    }
    
    dag = DAG(
        'example_dag',
        schedule_interval='@daily',
        default_args=default_args,
        params=params,
    )
    

    确保在您的 sql 查询中设置 jinja 模板。

    select id from table where id = {{params.number_days}}
    
    

    一篇不太针对操作员的中等文章,但一般解释了一些技巧,还有参数用法。

    (4) “参数”参数

    https://medium.com/datareply/airflow-lesser-known-tips-tricks-and-best-practises-cf4d4a90f8f

    第二个选项 - BigQuery 查询参数

    如果您的 BigQuery 运算符支持 query_params,您可以使用如下内容:

    #on top of your code, can be before declaring the dag
    dag_exec_date_time = '{{ ts }}'
    ...
    ...
    ...
    #then your bigquery operator, if supports query_params
    
      query_params=[
          {
              "name": "dag_exec_date_time",
              "parameterType": { "type": "STRING" },
              "parameterValue": { "value": dag_exec_date_time}
          }
      ],
    
    
    

    sql查询

    select cast(@dag_exec_date_time as timestamp) as airflow_timestamp
    

    【讨论】:

      【解决方案2】:

      要运行parameterized query,代码如下所示:

      sql = "select id from table where id = @days"
      job_config = bigquery.QueryJobConfig()
      job_config.query_parameters = [bigquery.ScalarQueryParameter('days','INT64',params.number_days)]
      query_job = bigquery.Client().query(command=sql, job_config=job_config)
      results = query_job.result()
      if query_job.errors:
          # log all errors then raise first error
          for e in query_job.errors:
              logging.error(e)
          raise Exception(query_job.errors[0])
      

      Bigquery 不是普通的关系数据库。没有“计划缓存”或其他优化可以使参数化查询更好。因此,除非存在某种可能是 SQL 注入的不安全输入,否则我建议不要使用参数并将值放入 SQL 中,因为它非常简单:

      sql = f"select id from table where id = {params.number_days}"
      query_job = bigquery.Client().query(command=sql)
      ...
      

      【讨论】:

      • 我认为您的回答脱离了 Apache-Airflow 的上下文。我相信他想将 Airflow 提供的 Jinja 模板用于参数化查询。 params 是 Airflow 传递给每个操作员的字典的名称。
      • @Luis 你说得对,这段代码在气流之外有效。我也在气流中使用它。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-12-07
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多