【发布时间】:2021-01-05 04:33:58
【问题描述】:
我想在 DAG 中完成一堆任务后运行一些更新查询。因此,我将创建一个 python 函数,用于从之前的任务输出中获取必要的详细信息,然后使用 run_query 我想更新一个表。
这是我的python函数。
def metatable_update(dag,tablename,schedule,**kwargs):
bq_hook = BigQueryHook(bigquery_conn_id='bigquery_default',location='europe-west3',use_legacy_sql=False)
pg_conn = config[schedule][tablename][1]
ti = kwargs['ti']
pgmax_ts = str(ti.xcom_pull(task_ids='get_maxts_{}_{}'.format(tablename,pg_conn))[1])
bq_hook.run_query(sql="update bqadmin.tablesync_meta set max_value='{}' where tablename='{}'".format(pgmax_ts,tablename))
我的问题只是简单地使用此代码,还是需要添加 execute 或 return 之类的内容才能成功运行此任务?
【问题讨论】:
标签: google-bigquery airflow google-cloud-composer