【发布时间】:2021-11-27 11:46:30
【问题描述】:
我对气流非常陌生,我正在尝试根据以下要求创建一个 DAG。
- 任务 1 - 运行 Bigquery 查询以获取我需要推送到 dag 中的第二个任务的值
- 任务 2 - 使用上述查询中的值并运行另一个查询并将数据导出到谷歌云存储桶中。
我已阅读与此相关的其他答案,并且我了解我们不能在气流的 bigqueryoperator 中使用 xcom_pull 或 xcom_push。所以我正在做的是使用 python 运算符,我可以通过使用“provide_context=True”来使用 jinja 模板变量。
以下是我的代码片段。只是我想要执行“task_instance.xcom_push”的任务 1,以便在日志 xcom 下查看气流中的值。
def get_bq_operator(dag, task_id, configuration, table_params=None, trigger_rule='all_success'):
bq_operator = BigQueryInsertJobOperator(
task_id=task_id,
configuration=configuration,
gcp_conn_id=gcp_connection_id,
dag=dag,
params=table_params,
trigger_rule=trigger_rule,
**task_instance.xcom_push(key='yr_wk', value=yr_wk),**
)
return bq_operator
def get_bq_wm_yr_wk():
get_bq_operator(dag,app_name,bigquery_util.get_bq_job_configuration(
bq_query,
query_params=None))
get_wm_yr_wk = PythonOperator(task_id='get_wm_yr_wk',
python_callable=get_bq_wm_yr_wk,
provide_context=True,
on_failure_callback=failure_callback,
on_retry_callback=failure_callback,
dag=dag)
“bq_query”是我传递包含我的查询的 sql 文件的那个,查询返回我需要在第二个任务中使用的 yr_wk 的值。
get_bq_operator 中突出显示的 task_instance.xcom_push(key='yr_wk', value=yr_wk), 失败,我得到的错误如下
raise KeyError(f'Variable {key} does not exist')
KeyError: '变量 ei_migration_hour 不存在'
如果我评论上面的行,DAG 运行良好。但是,如何验证 yr_wk 的值?我想推送它,以便我可以查看日志中的值。
【问题讨论】:
标签: python google-bigquery airflow