【问题标题】:How can I reference an external SQL file using Airflow's BigQuery operator?如何使用 Airflow 的 BigQuery 运算符引用外部 SQL 文件?
【发布时间】:2019-12-02 19:54:10
【问题描述】:

我目前正在使用 Airflow 和 BigQuery 运算符来触发各种 SQL 脚本。当 SQL 直接写入 Airflow DAG 文件时,这可以正常工作。例如:

bigquery_transform = BigQueryOperator(
        task_id='bq-transform',
        bql='SELECT * FROM `example.table`',
        destination_dataset_table='example.destination'
    )

但是,我想将 SQL 存储在一个单独的文件中,该文件保存到存储桶中。例如:

bql='gs://example_bucket/sample_script.sql'

调用此外部文件时,我收到“找不到模板”错误。

我已经看到一些示例将 SQL 文件加载到 Airflow DAG 文件夹中,但是,我真的很想访问保存到单独存储桶中的文件。这可能吗?

【问题讨论】:

    标签: google-bigquery google-cloud-storage airflow google-cloud-composer


    【解决方案1】:

    您可以引用 Google Cloud Storage Bucket 中的任何 SQL 文件。下面是一个示例,我在气流 dag 存储桶的 sql 目录中调用文件 Query_File.sql。

    CONNECTION_ID = 'project_name'
    
    with DAG('dag', schedule_interval='0 9 * * *', template_searchpath=['/home/airflow/gcs/dags/'], max_active_runs=15, catchup=True, default_args=default_args) as dag:
    
    battery_data_quality = BigQueryOperator(
      task_id='task-id',
      sql='/SQL/Query_File.sql',
      destination_dataset_table='project-name.DataSetName.TableName${{ds_nodash}}',
      write_disposition='WRITE_TRUNCATE',
      bigquery_conn_id=CONNECTION_ID,
      use_legacy_sql=False,
      dag=dag
    )
    

    【讨论】:

    • 这在将 SQL 文件存储在 Airflow 存储桶中时有效——但是,我想将 SQL 文件存储在与 Airflow 无关的完全不同的存储桶中。那可能吗?我应该注意我正在通过 Google Cloud Composer 使用 Airflow。另外,我注意到您在运算符中使用sql=,而我使用的是bql=。我在 Airflow 中收到另一个错误,需要 bql。我想知道这是否相关?
    • 我不认为您可以...Cloud Composer 使用 FUSE 驱动程序安装 GCS 存储桶。该文档指出“Cloud Composer 将您的工作流 (DAG) 的源代码及其依赖项存储在 Cloud Storage 的特定文件夹中......每个环境都有一个关联的 Cloud Storage 存储桶。Cloud Composer 仅在 Cloud Storage 存储桶中安排 DAG。 "也许您可以通过使用 BashOperator 将相关文件复制到气流环境存储桶来解决它?
    • 关于您的“bql vs sql”评论,“bql”现在在较新的气流版本中已被贬低,这是无关的。
    • 我一直在使用您的解决方案,但正在尝试通过 Jinja 模板变量填充 SQL 文件。例如sql=''{{dag_run.conf['sql_file']}}。该变量包含一个文件名字符串,例如“example.sql”。但是,当气流处理变量时,它会自动删除文件扩展名。如果我对带有扩展名的文件名进行硬编码,它就可以很好地工作。在运算符中使用 jinja 模板值时为什么会删除文件扩展名的任何想法?
    【解决方案2】:

    您还可以考虑使用gcs_to_gcs operator 将所需存储桶中的内容复制到作曲家可以访问的存储桶中。

    【讨论】:

      【解决方案3】:

      在 GoogleCloudStorageDownloadOperator 中,Airflow 版本 1.10.3 和 1.10.15 的下载工作方式不同。

       def execute(self, context):
      
              self.object = context['dag_run'].conf['job_name'] + '.sql'
              logging.info('filemname in GoogleCloudStorageDownloadOperator: %s', self.object)
              self.filename = context['dag_run'].conf['job_name'] + '.sql'
      
              self.log.info('Executing download: %s, %s, %s', self.bucket,
                            self.object, self.filename)
              hook = GoogleCloudStorageHook(
                  google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
                  delegate_to=self.delegate_to
              )
              file_bytes = hook.download(bucket=self.bucket,
                                         object=self.object)
              if self.store_to_xcom_key:
                  if sys.getsizeof(file_bytes) < 49344:
                      context['ti'].xcom_push(key=self.store_to_xcom_key, value=file_bytes.decode('utf-8'))
                  else:
                      raise RuntimeError(
                          'The size of the downloaded file is too large to push to XCom!'
      
                      )

      【讨论】:

        猜你喜欢
        • 2021-02-10
        • 1970-01-01
        • 2021-03-17
        • 2020-11-08
        • 2022-11-17
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多