【问题标题】:Using Airflow template files and template_searchpath in Google Cloud Composer在 Google Cloud Composer 中使用 Airflow 模板文件和 template_searchpath
【发布时间】:2018-11-18 12:58:56
【问题描述】:

我在 Google Cloud Composer 上的 Airflow DAG 中广泛使用 BigQueryOperator

对于较长的查询,最好将每个查询放在其自己的 .sql 文件中,而不是用它弄乱 DAG。 Airflow 似乎支持所有 SQL Query 运算符,包括 BigQueryOperator,正如您在 the documentation 中看到的那样。

我的问题:在 .sql 模板文件中编写了我的 sql 语句后,如何将其添加到 Google Cloud Composer 并在 DAG 中引用它?

【问题讨论】:

    标签: airflow google-cloud-composer


    【解决方案1】:

    在谷歌搜索并找到this related question 之后。我找到了一种方法来完成这项工作(尽管这不是理想的解决方案,我们将看到)。这是一个包含三个部分的工作示例:

    1. 带有一点jinja模板的sql模板文件,
    2. DAG 和
    3. gcloud 命令需要将模板上传到正确的位置。

    (1)sql模板文件 这只是一个文件名以.sql 扩展名结尾的文本文件。假设这个文件名为 my-templated-query.sql 并包含:

    SELECT COUNT(1)
    FROM mytable
    WHERE _PARTITIONTIME = TIMESTAMP('{{ ds }}')
    

    (2) 在 DAG 文件中引用模板 要引用此模板,请创建如下操作符:

    count_task = BigQueryOperator(
      task_id='count_rows',
      sql='/my-templated-query.sql')
    

    (3) 将模板文件添加到 Google Cloud Composer 事实证明,默认情况下,airflow 会在 dags 文件夹中查找模板文件。要将我们的模板文件上传到 dags 文件夹,我们运行

    gcloud beta composer environments storage dags import --environment my-env-name --location us-central1 --source path/to/my-templated-query.sql
    

    您必须相应地替换环境名称、位置和源路径。

    将所有这些模板上传到 dag 文件夹似乎并不正确。更好的 Airflow 做法是将模板放在自己的文件夹中,并将 template_searchpath 参数指定为 point to it when you create your DAG。但是,我不确定如何使用 Google Cloud Composer 执行此操作。

    更新:我意识到可以将子文件夹放在 DAG 文件夹中,这对于组织大量 SQL 模板很有用。假设我在DAG_FOLDER/dataset1/table1.sql 中放入了一个SQL 模板文件,在BigQueryOperator 中,Ithen 可以使用sql=/dataset1/table1.sql 引用这个。如果您有一个包含大量文件的子文件夹和许多其他子文件夹,您还可以使用上面显示的dag import 递归上传整个子文件夹 - 只需将其指向子文件夹即可。

    【讨论】:

      【解决方案2】:

      我找到了解决这个问题的理想方法。在您的 dag 声明中,您可以设置 template_searchpath,这是 Airflow 查找 jinja 模板文件的默认路径。

      为了在您的 Cloud Composer 实例中进行这项工作,您必须将其设置为如下

      dag = DAG(
          ...
          template_searchpath=["/home/airflow/gcs/plugins"],
      )
      

      请注意,我在此示例中使用了 plugins 文件夹。您可以改用您的数据文件夹,也可以使用您希望在存储桶中包含的任何文件夹。

      【讨论】:

      • 如果我输入的路径不适合您,您需要找到您的 Cloud Composer 实例的路径。这并不难找到。在任何 DAG 中,您都可以简单地记录 sys.path 变量并查看打印的路径。
      【解决方案3】:

      我们最近使用类似的策略解决了这个问题。步骤是:

      1. 将所有 SQL 文件放入 Google Cloud Source Repository
      2. 在每次 DAG 运行开始时,将文件克隆到自动与您的 Airflow 环境共享的 Cloud Storage Bucket 中的“data”目录中。
      3. 在执行时使用BigQueryOperator 中的模板读取查询。

      这是一个最小的解决方案:

      from airflow.operators import bash_operator
      from airflow.contrib.operators import bigquery_operator
      
      with models.DAG(
              'bigquery_dag',
              schedule_interval = None ,
              template_searchpath = ['/home/airflow/gcs/data/repo/queries/'],
              default_args = default_dag_args
              ) as dag:
      
          t1_clean_repo = bash_operator.BashOperator(
              task_id = 'clean_repo',
              bash_command = 'rm -rf /home/airflow/gcs/data/repo'
          )
      
          clone_command = """
              gcloud source repos clone repo --project=project_id
              cp -R repo /home/airflow/gcs/data
          """
      
          t2_clone_repo = bash_operator.BashOperator(
              task_id='clone_repo',
              bash_command=clone_command
              )
      
          t3_query = bigquery_operator.BigQueryOperator(
              task_id='query',
              sql= 'query.sql',
              use_legacy_sql = False,
              bigquery_conn_id='conn_id'
          )
      
      

      我们在这里利用了一些重要的概念:

      1. Cloud Storage Bucket 中的数据目录会通过Fuse 自动与您的 Airflow 实例共享。大多数操作员都可以访问此处输入​​的任何内容。
      2. 只要您的 Google Cloud Source 存储库与 Cloud Composer 位于同一个项目中,您的 Airflow 实例就不需要对 git clone 文件的额外权限。
      3. 我们在 DAG 参数中设置 template_searchpath,扩展搜索范围以将 data 目录包含在 Cloud Storage Bucket 中。

      【讨论】:

      • 您撰写了有关 Google Cloud Storage 的文章,但您的解决方案 (DAG) 中的该服务在哪里。您将 SQL 查询存储在 Google Repo 服务中,将它们克隆到 Composer 集群中的 VM,并在 DAG 中使用它们的路径。 GCS 在哪里?
      • @DenisOgr /home/airflow/gcs 中的所有内容实际上都存在于 Cloud Storage 中,并有效地“镜像”到您的 Airflow 环境中。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-07-05
      • 1970-01-01
      • 2022-08-15
      • 1970-01-01
      • 2018-10-13
      • 2019-03-19
      相关资源
      最近更新 更多