【发布时间】:2020-12-16 04:49:31
【问题描述】:
我们使用 Airflow 通过 CloudSqlInstanceExportOperator 安排每日数据库导出。这似乎不适用于气流宏。我们正在尝试使用执行日期宏或 where 子句中的 {{ ds }} 导出 1 天的数据。使用宏很重要,因为我们希望 DAG 回填。
示例代码由两部分组成。首先我们定义export context:
export_body = {
"exportContext": {
"fileType": "csv",
"uri": "gs://"+GCP_BUCKET+'/'data.csv',
"databases":["database"],
"csvExportOptions": {
"selectQuery": """
select * from table
where datetime BETWEEN "{{ ds }} 00:00:00"
AND "{{ ds }} 23:59:59
"""
}
}
}
接下来,将导出上下文传递给任务:
cloudsql_export_task = CloudSqlInstanceExportOperator(
project_id=PROJECT_ID,
body = export_body,
instance='instance',
task_id='cloudsql_export_task',
dag=dag)
任务运行并被标记为成功,但是,创建的 Google Cloud Storage 文件中没有数据。当我们硬编码日期时,查询按预期工作。因此,我们知道问题是由未填充宏值引起的。
任何建议将不胜感激。如何解决此任务或实现相同目标的替代方法(注意:查询很大并且使用太多内存以使 MySqlToGoogleCloudStorageOperator 无法工作)
【问题讨论】:
标签: airflow