【问题标题】:Airflow Jinja Templating in params参数中的气流 Jinja 模板
【发布时间】:2020-05-02 07:00:31
【问题描述】:

我有一个 Airflow 运算符,它允许我查询接受 Jinja 模板文件作为查询输入的 Athena。通常,我将表/数据库名称等变量传递给模板以创建表并添加分区语句。这适用于已定义的字符串。

我的任务定义如下所示:

        db = 'sample_db'
        table = 'sample_table'
        out = 's3://sample'
        p1='2020'
        p2='1'

        add_partition_task= AWSAthenaOperator(
            task_id='add_partition,
            query='add_partition.sql',
            params={'database': db,
                    'table_name': table,
                    'p1': p1
                    'p2': p2},
            database=db,
            output_location=out
        )

被模板化的 SQL 文件如下所示:

ALTER TABLE {{ params.database }}.{{ params.table_name }} ADD IF NOT EXISTS
PARTITION (partition1= '{{ params.p1 }}', partition2 = '{{ params.p2 }}')

这个模板工作正常。

对此的扩展是允许 'partition1' 和 'partition2' 由 jinja 模板变量定义,该变量包含从早期任务中提取的 XCOM,该任务将日期转换为财政年度和期间。使用日期作为分区是可能的,但我对是否可以强制参数接受 Jinja 模板感兴趣。

我想使用的代码如下所示:

        db = 'sample_db'
        table = 'sample_table'
        out = 's3://sample'
        p1 = '{{ task_instance.xcom_pull(task_ids="convert_to_partition", key="p1") }}'
        p2 = '{{ task_instance.xcom_pull(task_ids="convert_to_partition", key="p2") }}'

        add_partition_task= AWSAthenaOperator(
            task_id='add_partition,
            query='add_partition.sql',
            params={'database': db,
                    'table_name': table,
                    'p1': p1
                    'p2': p2},
            database=db,
            output_location=out
        )

所以现在 params.p1 和 params.p2 包含一个 Jinja 模板。显然,params 不支持 jinja 模板,因为呈现的 SQL 包含字符串文字“{{ task_instance....”而不是呈现的 XCOM 值。

在 operator 实现中向 template_fields 添加参数并不足以强制它渲染模板。我的操作员仅扩展 BaseOperator 并使用扩展 AwsHook 的 AthenaHook。 有没有人有在结构或替代方法中传递模板变量的经验?

【问题讨论】:

  • 你找到解决办法了吗?

标签: python jinja2 amazon-athena airflow


【解决方案1】:

由于AWSAthenaOperatorquery 作为模板字段并接受文件扩展名.sql,因此您可以在文件本身中包含jinja 模板。

我稍微修改了您的 AWSAthenaOperator 以适应示例。

add_partition_task= AWSAthenaOperator(
    task_id='add_partition,
    query='add_partition.sql',
    params={
        'database': db,
        'table_name': table,
    }
)

这是add_partition.sql 的样子。

INSERT OVERWRITE TABLE {{ params.database }}.{{ params.table_name }} (day, month, year) 
SELECT * FROM db.table
WHERE p1 = "{{ task_instance.xcom_pull(task_ids='convert_to_partition', key='p1') }}" 
  AND p2 = "{{ task_instance.xcom_pull(task_ids='convert_to_partition', key='p2') }}"
;

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2022-07-01
    • 2021-02-06
    • 1970-01-01
    • 2020-01-31
    • 1970-01-01
    • 2022-11-13
    • 2023-01-18
    • 1970-01-01
    相关资源
    最近更新 更多