【问题标题】:Fetch results from BigQueryOperator in airflow从气流中的 BigQueryOperator 获取结果
【发布时间】:2019-05-03 02:22:41
【问题描述】:

我正在尝试使用气流从BigQueryOperator 获取结果,但我找不到方法。我尝试在bq_cursor 成员(1.10 中可用)中调用next() 方法,但它返回None。这就是我尝试的方式

import datetime
import logging

from airflow import models
from airflow.contrib.operators import bigquery_operator
from airflow.operators import python_operator


yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time()
)

def MyChequer(**kwargs):
    big_query_count = bigquery_operator.BigQueryOperator(
        task_id='my_bq_query',
        sql='select count(*) from mydataset.mytable'
    )

    big_query_count.execute(context=kwargs)

    logging.info(big_query_count)
    logging.info(big_query_count.__dict__)
    logging.info(big_query_count.bq_cursor.next())

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'project_id': 'myproject'
}

with models.DAG(
        'bigquery_results_execution',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    myoperator = python_operator.PythonOperator(
        task_id='threshold_operator',
        provide_context=True,
        python_callable=MyChequer
    )

    # Define DAG
    myoperator

查看bigquery_hook.pybigquery_operator.py 似乎是获取结果的唯一可用方法。

【问题讨论】:

    标签: python google-bigquery airflow


    【解决方案1】:

    您可以使用BigQueryOperator 将结果保存在临时目标表中,然后使用BigQueryGetDataOperator 获取如下结果,然后使用BigQueryTableDeleteOperator 删除该表:

    get_data = BigQueryGetDataOperator(
        task_id='get_data_from_bq',
        dataset_id='test_dataset',
        table_id='Transaction_partitions',
        max_results='100',
        selected_fields='DATE',
        bigquery_conn_id='airflow-service-account'
    )
    

    文档:

    【讨论】:

    • 我看到了这个选项,但我想一步完成,而不需要删除临时表
    • 然后你可以使用各个算子的钩子并为此创建一个算子。
    • 它本质上是复制和粘贴 bigquery 运算符,但在我看来这不是我想要的方式,如果需要,我可能最终会创建一个 PR 来解决这个问题。
    【解决方案2】:

    每当我需要从 BigQuery 查询中获取数据并将其用于某些用途时,我都会使用 BigQuery 挂钩创建自己的运算符。我通常将其称为 BigQueryToXOperator,我们有一堆用于将 BigQuery 数据发送到其他内部系统的运算符.

    例如,我有一个 BigQueryToPubSub 运算符,您可能会发现它作为一个示例很有用,用于说明如何查询 BigQuery,然后逐行处理结果,将它们发送到 Google PubSub。请考虑以下通用示例代码,了解如何自行执行此操作:

    class BigQueryToXOperator(BaseOperator):
        template_fields = ['sql']
        ui_color = '#000000'
    
        @apply_defaults
        def __init__(
                self,
                sql,
                keys,
                bigquery_conn_id='bigquery_default',
                delegate_to=None,
                *args,
                **kwargs):
            super(BigQueryToXOperator, self).__init__(*args, **kwargs)
            self.sql = sql
            self.keys = keys # A list of keys for the columns in the result set of sql
            self.bigquery_conn_id = bigquery_conn_id
            self.delegate_to = delegate_to
    
    
        def execute(self, context):
            """
            Run query and handle results row by row.
            """
            cursor = self._query_bigquery()
            for row in cursor.fetchall():
                # Zip keys and row together because the cursor returns a list of list (not list of dicts)
                row_dict = dumps(dict(zip(self.keys,row))).encode('utf-8')
    
                # Do what you want with the row...
                handle_row(row_dict)
    
    
        def _query_bigquery(self):
            """
            Queries BigQuery and returns a cursor to the results.
            """
            bq = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
                              use_legacy_sql=False)
            conn = bq.get_conn()
            cursor = conn.cursor()
            cursor.execute(self.sql)
            return cursor
    

    【讨论】:

    • 这正是我一直在寻找的迈克,非常感谢。那么你如何在你的 dag 中调用这个任务?你介意举个例子吗
    • 您还知道如何将 x_com 值(从 bq 检索到的单个单元格)传递给 dataflowpython 作业吗?
    【解决方案3】:

    感谢@kaxil 和@Mike 的回答。我发现了问题。 BigQueryCursor 中有一种错误(在我看来)。作为run_with_configuration 的一部分,running_job_id 被返回,但从未分配给job_idjob_id 用于在next 方法中提取结果。一种解决方法(如果您不想重新实现所有内容,则不是很优雅,但很好),是根据钩子中的running_job_id 分配job_id,如下所示

    big_query_count.execute(context=kwargs)
    #workaround
    big_query_count.bq_cursor.job_id = big_query_count.bq_cursor.running_job_id
    logging.info(big_query_count.bq_cursor.next())
    

    【讨论】:

      【解决方案4】:

      分享一个关于如何使用大查询钩子获取数据的小例子:

      下面是我的 demo_bigquery_hook 文件:

      from airflow import DAG
      from airflow.contrib.hooks.bigquery_hook import BigQueryHook
      from airflow.utils.dates import days_ago
      from airflow.operators.python_operator import PythonOperator
      from datetime import *
      import logging
      
      logger = logging.getLogger("airflow.task")
      
      # default arguments
      default_args = {
          'owner': 'Airflow',
          'depends_on_past': False,
          'start_date': days_ago(0),
          'email': ['airflow@example.com'],
          'email_on_failure': False,
          'email_on_retry': False,
          'retries': 1,
          'retry_delay': timedelta(minutes=2)
      }
      
      # initializing dag
      dag = DAG(
          'test_bigquery_hook',
          default_args=default_args,
          catchup=False,
          schedule_interval=None,
          max_active_runs=1
      )
      
      def get_data_from_bq(**kwargs):
          hook = BigQueryHook(bigquery_conn_id='bigquery_default', delegate_to=None, use_legacy_sql=False)
          conn = hook.get_conn()
          cursor = conn.cursor()
          cursor.execute('SELECT owner_display_name, title, view_count FROM `bigquery-public-data.stackoverflow.posts_questions` WHERE creation_date > "2020-09-09" ORDER BY view_count DESC LIMIT 2')
          result = cursor.fetchall()
          print('result', result)
          return result
      
      fetch_data = PythonOperator(
          task_id='fetch_data_public_dataset',
          provide_context=True,
          python_callable=get_data_from_bq,
          dag=dag
      )
      
      fetch_data
      

      要在本地测试它,请将上述内容保存在“demo_bigquery_hook.py”文件中,并将其复制到您的 dags 文件夹中。打开命令提示符并执行以下命令:

      1. export AIRFLOW_CONN_BIGQUERY_DEFAULT="google-cloud-platform://?extra__google_cloud_platform__project=<gcp_project_Id>"。将 gcp_project_id 替换为任何 Gcp 项目 ID。此命令将设置默认的 gcp 帐户。

      2. export GOOGLE_APPLICATION_CREDENTIALS=<path_to_your_sa_key>。其中 是您的 Gcp 项目的服务帐户密钥的路径。

      3. 最后运行以下命令:airflow test test_bigquery_hook fetch_data_public_dataset 2020-09-02T05:38:00+00:00。运行后,您将看到以下结果。

      注意几点:

      1. 本示例中使用的查询从 Gcp 提供的公共数据集中获取结果。

      2. Bigquery 游标对象还提供了很多其他功能。打开这个link 浏览它们。

      3. 本示例使用的 Airflow 版本是 1.10.12。

      【讨论】:

        【解决方案5】:

        Google 提供的操作员使用 BigQueryHook 来获得到 BigQuery 的经过身份验证的连接。该类是解析 Airflow Connection 并创建 Google Cloud 凭据的类。你导入它:

        from airflow.contrib.hooks.bigquery_hook import BigQueryHook
        

        latest docs 说它有一个方法“get_client()”,应该返回经过身份验证的底层客户端。这在我的 Airflow 版本上不起作用,所以我使用 this answer 从钩子的内部字段直接创建 bigquery.client.Client 连接。逻辑如下:

        def read_files_loaded_from_bq(bigquery_conn_id, sql):
            hook = BigQueryHook(bigquery_conn_id=bigquery_conn_id, delegate_to=None, use_legacy_sql=False)
            # this should work with latest version:
            # client = hook.get_client()
            # instead directly create a client using the internals of the hook:
            client = bigquery.Client(project=hook._get_field("project"), credentials=hook._get_credentials())
            query_job = client.query(sql)
            files = []
            for row in query_job:
                file_name = row["file_name"]
                files.append(file_name)
            result = ','.join(files)
            print('found files:', result)
            return result
        
        read_files_loaded = PythonOperator(
            task_id='read_files_loaded',
            provide_context=False,
            python_callable=read_files_loaded_from_bq,
            op_kwargs = {
                "bigquery_conn_id": BIGQUERY_CONN_ID,
                "sql": "select file_name from myproj.mydataset.files_loaded"
            },
            dag=dag
        )
        

        这将运行查询并将数据加载到字符串中。然后,您可以使用以下命令通过 xcom 读取结果:

        "{{ task_instance.xcom_pull(task_ids='read_files_loaded') }}"
        

        如果您只是加载少量元数据来驱动 DAG 的逻辑,这种方法似乎很简单。如果有任何重要的数据,最好使用操作员在表和存储桶之间移动数据,而不是每次都将其拉入实际的气流任务流程。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多