【问题标题】:Airflow BigQuery Hook - Run update query via run_queryAirflow BigQuery Hook - 通过 run_query 运行更新查询
【发布时间】:2021-01-05 04:33:58
【问题描述】:

我想在 DAG 中完成一堆任务后运行一些更新查询。因此,我将创建一个 python 函数,用于从之前的任务输出中获取必要的详细信息,然后使用 run_query 我想更新一个表。

这是我的python函数。

def metatable_update(dag,tablename,schedule,**kwargs):
    bq_hook = BigQueryHook(bigquery_conn_id='bigquery_default',location='europe-west3',use_legacy_sql=False)
    pg_conn = config[schedule][tablename][1]
    ti = kwargs['ti']
    pgmax_ts = str(ti.xcom_pull(task_ids='get_maxts_{}_{}'.format(tablename,pg_conn))[1])
    bq_hook.run_query(sql="update bqadmin.tablesync_meta set max_value='{}' where tablename='{}'".format(pgmax_ts,tablename))
   

我的问题只是简单地使用此代码,还是需要添加 executereturn 之类的内容才能成功运行此任务?

【问题讨论】:

    标签: google-bigquery airflow google-cloud-composer


    【解决方案1】:

    没关系,我已经使用 BQ hook 本身来解决这个问题

    并且不需要提及执行或返回。

    def metatable_update(dag,tablename,schedule,**kwargs):
        bq_hook = BigQueryHook(bigquery_conn_id='bigquery_default',location='europe-west3',use_legacy_sql=False)
        pg_conn = config[schedule][tablename][1]
        ti = kwargs['ti']
        pgmax_ts = str(ti.xcom_pull(task_ids='get_maxts_{}_{}'.format(tablename,pg_conn))[1])
        bq_hook.run(sql="update bqadmin.tablesync_meta set max_value='{}' where tablename='{}' and datasource_dbconn='{}'".format(pgmax_ts,tablename,pg_conn))
        return 'Executed the update query'
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-07-18
      • 1970-01-01
      • 2018-05-02
      • 2012-01-25
      • 2021-11-17
      • 2012-11-24
      相关资源
      最近更新 更多