【问题标题】:Can Airflow Macros be used with the CloudSqlInstanceExportOperator?气流宏可以与 CloudSqlInstanceExportOperator 一起使用吗?
【发布时间】:2020-12-16 04:49:31
【问题描述】:

我们使用 Airflow 通过 CloudSqlInstanceExportOperator 安排每日数据库导出。这似乎不适用于气流宏。我们正在尝试使用执行日期宏或 where 子句中的 {{ ds }} 导出 1 天的数据。使用宏很重要,因为我们希望 DAG 回填。

示例代码由两部分组成。首先我们定义export context

    export_body = {
        "exportContext": {
            "fileType": "csv",
             "uri": "gs://"+GCP_BUCKET+'/'data.csv',
             "databases":["database"],
             "csvExportOptions": {
                 "selectQuery": """
                                select * from table 
                                where datetime BETWEEN "{{ ds }} 00:00:00" 
                                AND "{{ ds }} 23:59:59
                                """
             }
        }
    }

接下来,将导出上下文传递给任务:

    cloudsql_export_task = CloudSqlInstanceExportOperator(
            project_id=PROJECT_ID,
            body = export_body,
            instance='instance',
            task_id='cloudsql_export_task',
            dag=dag)

任务运行并被标记为成功,但是,创建的 Google Cloud Storage 文件中没有数据。当我们硬编码日期时,查询按预期工作。因此,我们知道问题是由未填充宏值引起的。

任何建议将不胜感激。如何解决此任务或实现相同目标的替代方法(注意:查询很大并且使用太多内存以使 MySqlToGoogleCloudStorageOperator 无法工作)

【问题讨论】:

    标签: airflow


    【解决方案1】:

    确保运算符在template_fields 中包含body

    您也可以将 Jinja 模板与嵌套字段一起使用,只要这些 嵌套字段在它们所属的结构中被标记为模板化: 在template_fields 属性中注册的字段将提交给 模板替换

    更多关于模板的信息:https://airflow.readthedocs.io/en/stable/concepts.html#jinja-templating

    你可以像下面这样扩展操作符

    class CloudSqlInstanceExportTemplatedOperator(CloudSqlInstanceExportOperator):
        template_fields = CloudSqlInstanceExportOperator.template_fields + ('body',)
    

    【讨论】:

    • 谢谢。我是否添加该代码以将运算符扩展到 DAG 文件中?
    • 回答上述问题:是的。但是您还需要更新任务以使用下一个扩展运算符。
    【解决方案2】:

    shankshera 的答案是正确的,但是您使用的是已弃用的运算符。在更新版本中,不需要建议的修改。

    CloudSqlInstanceExportOperator 已重命名为 CloudSQLExportInstanceOperator 并移至提供程序。

    对于Airflow ,您需要安装backport providers

    pip install apache-airflow-backport-providers-google
    

    对于 Airflow >=2.0,您需要安装 providers:

    pip install apache-airflow-providers-google
    

    您可以将运算符导入为:

    from airflow.providers.google.cloud.operators.cloud_sql import CloudSQLExportInstanceOperator
    

    由于运营商已经在templated fields 中列出了body,因此您可以开始了。

    【讨论】:

    • 这个解决方案看起来最好,但我们收到错误:ModuleNotFoundError: No module named 'airflow.providers.mysql'。我们在 Airflow 1.10.10 上并在虚拟环境中安装了 apache-airflow-backport-providers-google 包。
    • @Srule 注意 pypi 中与交叉依赖相关的部分:“交叉提供程序包依赖”。所以你可能还需要添加pip install apache-airflow-backport-providers-google[mysql]取决于你是否安装了所有依赖项或手动选择了一些。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2013-11-03
    • 2015-07-07
    • 1970-01-01
    • 1970-01-01
    • 2021-12-14
    • 2020-05-08
    • 1970-01-01
    相关资源
    最近更新 更多