【问题标题】:Airflow XCOM pull and push for a BigQueryInsertJobOperator and BigQueryOperatorAirflow XCOM 拉取和推送 BigQueryInsertJobOperator 和 BigQueryOperator
【发布时间】:2021-11-27 11:46:30
【问题描述】:

我对气流非常陌生,我正在尝试根据以下要求创建一个 DAG。

  • 任务 1 - 运行 Bigquery 查询以获取我需要推送到 dag 中的第二个任务的值
  • 任务 2 - 使用上述查询中的值并运行另一个查询并将数据导出到谷歌云存储桶中。

我已阅读与此相关的其他答案,并且我了解我们不能在气流的 bigqueryoperator 中使用 xcom_pull 或 xcom_push。所以我正在做的是使用 python 运算符,我可以通过使用“provide_context=True”来使用 jinja 模板变量。

以下是我的代码片段。只是我想要执行“task_instance.xcom_push”的任务 1,以便在日志 xcom 下查看气流中的值。

def get_bq_operator(dag, task_id, configuration, table_params=None, trigger_rule='all_success'):
    bq_operator = BigQueryInsertJobOperator(
        task_id=task_id,
        configuration=configuration,
        gcp_conn_id=gcp_connection_id,
        dag=dag,
        params=table_params,
        trigger_rule=trigger_rule,
        **task_instance.xcom_push(key='yr_wk', value=yr_wk),**
    )
    return bq_operator


def get_bq_wm_yr_wk():
    get_bq_operator(dag,app_name,bigquery_util.get_bq_job_configuration(
                                             bq_query,
                                             query_params=None))

get_wm_yr_wk = PythonOperator(task_id='get_wm_yr_wk',
                                        python_callable=get_bq_wm_yr_wk,
                                        provide_context=True,
                                        on_failure_callback=failure_callback,
                                        on_retry_callback=failure_callback,
                                        dag=dag)

“bq_query”是我传递包含我的查询的 sql 文件的那个,查询返回我需要在第二个任务中使用的 yr_wk 的值。

get_bq_operator 中突出显示的 task_instance.xcom_push(key='yr_wk', value=yr_wk), 失败,我得到的错误如下

raise KeyError(f'Variable {key} does not exist')

KeyError: '变量 ei_migration_hour 不存在'

如果我评论上面的行,DAG 运行良好。但是,如何验证 yr_wk 的值?我想推送它,以便我可以查看日志中的值。

【问题讨论】:

    标签: python google-bigquery airflow


    【解决方案1】:

    我不完全理解您的代码 :),但是如果您想对 BigQuery 查询的结果做一些事情,那么更好的方法是在您的 python 可调用对象中使用 BigQueryHook。

    Airflow 中的操作符通常是真正提供“完整”任务的 Hooks 的薄包装器(例如,您可以使用它来运行更新操作)但是如果您想对它的结果做一些事情并且您已经通过Python 运算符,直接使用 Hooks 会好得多,因为您不会做出运算符在 execute 方法中的所有假设。

    在您的情况下,它应该类似于(我在这里使用新的 TaskFlow 语法,它更适合执行此类操作。有关 Task Flow API 的教程,请参阅https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html。特别是在 Airflow 2 中,它成为编写任务的事实上的默认方式。

    @task(.....) 
    def my_task():
       hook = BigQueryHook(....)  # initialize it with the right parameters
       result = hook.run(sql='YOUR_QUERY', ...)  # add other necessary params
       processed_result = process_result(result) # do something with the result
       return processed_result
    

    这样您就不必运行 xcom_push(task_flow API 会自动为您执行此操作,其他任务只需执行以下操作即可使用:

    @task
    next_task(input):
       pass
    
    

    然后:

    result = my_task()
    next_task(result)
    

    然后所有的xcom push/pull都会通过TaskFlow自动为你处理。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-11-26
      • 2019-01-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-03-14
      • 2021-09-06
      相关资源
      最近更新 更多