【发布时间】:2021-04-15 20:35:29
【问题描述】:
我正在尝试为从 rds postgres 到另一个 postgres rds 的数据提取实现增量数据加载
我正在使用气流来实现 ETL。因此,在阅读了有关气流宏的一段时间后,我决定使用气流默认变量设置增量流。
所以,算法是这样的,
如果我之前的执行日期是 None 或 null 或 '': 从一开始就选择数据(在我们的例子中是一年前) 别的 选择上一个执行日期 结束如果
注意:下面的代码首先是为了理解默认变量,对于我上面提到的问题还没有实现
对应的代码如下所示。当我第一次运行 dag 时,我总是最终为 previoussuccessfulexecutiondate 变量打印“无”,而不是像我提到的那样的历史日期。我无法弄清楚这一点。对此的任何想法都会有很大帮助
from datetime import *
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
default_args={'owner':'airflow','start_date': days_ago(1),'depends_on_past':'False'}
dag = DAG('jinja_trial_10',default_args=default_args,schedule_interval=timedelta(minutes=5))
def printexecutiontimes(**kwargs):
executiondate = kwargs.get('execution_date')
previoussuccessfulexecutiondate = kwargs.get('prev_execution_date_success')
previousexecutiondate = kwargs.get('prev_ds_nodash')
if (previoussuccessfulexecutiondate == 'None' or previoussuccessfulexecutiondate is None):
previoussuccessfulexecutiondate = datetime.strftime(datetime.now() - timedelta(days = 365),'%Y-%m-%d')
print('Execution Date : {0}'.format(executiondate))
print('Previous successful execution date : {0}'.format(previoussuccessfulexecutiondate))
print('Previous execution date : {0}'.format(previousexecutiondate))
print('hello')
task_start = DummyOperator(task_id = 'start',dag=dag)
jinja_task= PythonOperator(task_id = 'TryingoutJinjatemplates',
python_callable =printexecutiontimes,
provide_context = 'True',
dag=dag )
task_end = DummyOperator(task_id = 'end',dag=dag)
task_start >>jinja_task >> task_end
【问题讨论】: