【问题标题】:Airflow default variables - Incremental load setup气流默认变量 - 增量负载设置
【发布时间】:2021-04-15 20:35:29
【问题描述】:

我正在尝试为从 rds postgres 到另一个 postgres rds 的数据提取实现增量数据加载

我正在使用气流来实现 ETL。因此,在阅读了有关气流宏的一段时间后,我决定使用气流默认变量设置增量流。

所以,算法是这样的,

如果我之前的执行日期是 None 或 null 或 '': 从一开始就选择数据(在我们的例子中是一年前) 别的 选择上一个执行日期 结束如果

注意:下面的代码首先是为了理解默认变量,对于我上面提到的问题还没有实现

对应的代码如下所示。当我第一次运行 dag 时,我总是最终为 previoussuccessfulexecutiondate 变量打印“无”,而不是像我提到的那样的历史日期。我无法弄清楚这一点。对此的任何想法都会有很大帮助

from datetime import *
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

default_args={'owner':'airflow','start_date': days_ago(1),'depends_on_past':'False'}


dag = DAG('jinja_trial_10',default_args=default_args,schedule_interval=timedelta(minutes=5))



def printexecutiontimes(**kwargs):
    executiondate = kwargs.get('execution_date')
    previoussuccessfulexecutiondate =  kwargs.get('prev_execution_date_success')
    previousexecutiondate = kwargs.get('prev_ds_nodash')

    if (previoussuccessfulexecutiondate == 'None' or previoussuccessfulexecutiondate is None):
        previoussuccessfulexecutiondate = datetime.strftime(datetime.now() - timedelta(days = 365),'%Y-%m-%d')


    print('Execution Date : {0}'.format(executiondate))
    print('Previous successful execution date : {0}'.format(previoussuccessfulexecutiondate))
    print('Previous execution date : {0}'.format(previousexecutiondate))


    print('hello')




task_start  = DummyOperator(task_id = 'start',dag=dag)

jinja_task= PythonOperator(task_id = 'TryingoutJinjatemplates',
                           python_callable =printexecutiontimes,
                           provide_context = 'True',
                           dag=dag )

task_end = DummyOperator(task_id = 'end',dag=dag)


task_start >>jinja_task >> task_end

【问题讨论】:

    标签: macros airflow


    【解决方案1】:

    我最近不得不做一些非常相似的事情,下面的代码是我最终使用 DagRun 详细信息创建自定义函数的代码。

    Refer to this answer - 如果您只想获得最后一次 DAG 运行(无论状态如何)。

    对我来说,我必须获得成功运行的最后日期,因此在下面创建了函数:

    def get_last_dag_run(dag_id):
        dag_runs = DagRun.find(dag_id=dag_id)
        dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
    
        for dag_run in dag_runs:
            #print all dag runs - debug only
            print(f"All ----- state: {dag_run.state} , run_id: {dag_run.run_id} , execution_date: {dag_run.execution_date}")
    
        print('Success runs ---------------------------------')
        dag_runs = list(filter(lambda x: x.state == 'success', dag_runs))
        for dag_run in dag_runs:
            #print successfull dag runs - debug only
            print(f"Success - state: {dag_run.state} , run_id: {dag_run.run_id} , execution_date: {dag_run.execution_date}")
        
        # return last execution run or default value (01-01-1970) 
        return dag_runs[0].execution_date if dag_runs else datetime(1970, 1, 1)
    

    【讨论】:

    • 感谢谢里夫的回复。但是,当我尝试您的解决方案时,您能帮我理解为什么我的代码不起作用吗?
    • 不知道,这里还有一些你可以参考的 SO 链接,其中大多数最终都编写了自定义代码。 google.com/…
    • 嗨,Sharif,我确实使用了您提供的 stackoverflow 链接(由查理回答)中的逻辑。但是,我的理解是第一次跑步,根据查理的逻辑,我必须得到'no prev runs'。但我从来没有得到它,我总是得到一个约会。我的期望是我必须得到“没有 prev 运行”,这样我才能传递正确的日期来选择增量数据
    • 所以,我看到的是,当我从 Charlie 的解决方案中检查 last_dag_run 的类型和从我的代码中检查 previoussuccessfulexecutiondate 的类型时,它会显示 。和 'is None' 的检查,对它不起作用......
    【解决方案2】:

    经过几次实验和大量阅读,我想出了以下代码,它对我有用

    • 在 Airflow UI 中创建一个变量并将其赋值为 0
    • 使用 Airflow 的预定义变量,来确定它是否是完整的 加载或增量加载
    • 伪代码-
    If
        value of Variable created = 0
    then 
        set Variable = 1
        set the start data to point in time in the past(a date-time from the inception of a certain process)
        set the end date to the value of the "execution_date" (defined as a part of airflow's predefined variables)
    else
        set the start date to "prev_execution_date_success" (defined as a part of airflow's predefined variables)
        set the end date to "execution_date" (defined as a part of airflow's predefined variables)
    end
    

    下面是相同的代码sn-p

    from datetime import *
    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.utils.dates import days_ago
    from airflow.models import Variable
    
    default_args={'owner':'airflow','start_date': datetime(2020,11,3,12,5),'depends_on_past':'False'}
    
    
    dag = DAG('airflow_incremental_load_setup',default_args=default_args,schedule_interval=timedelta(minutes=5))
    
    def printexecutiontimes(**kwargs):
        # Variable to be created before the running of dag
        full_load_check = Variable.get('full_load_completion')
        print('full_load_check : {0}'.format(full_load_check))
        if full_load_check == '0':
            print('First execution')
            print('Execution date : {0}'.format(kwargs.get('execution_date')))
            print('Actual start date : {0}'.format(kwargs.get('ds')))
            print('Previous successful execution date : {0}'.format(kwargs.get('prev_execution_date_success')))
            print('Calculated field : {0}'.format(datetime.strftime(datetime.now() - timedelta(days=365), '%Y-%m-%d')))
            Variable.set('full_load_check', '1')
            start_date = datetime.strftime(datetime.now() - timedelta(days=365), '%Y-%m-%d')
            end_date = datetime.strftime(kwargs.get('execution_date'), '%Y-%m-%d')
        else:
            print('After the first execution ..')
            print('Execution date : {0}'.format(kwargs.get('execution_date')))
            print('Actual start date : {0}'.format(kwargs.get('ds')))
            print('Previous successful execution date : {0}'.format(kwargs.get('prev_execution_date_success')))
            print('Calculated field : {0}'.format(kwargs.get('prev_execution_date_success')))
            start_date = kwargs.get('prev_execution_date_success')
            start_date = parse(str(start_date))
            end_date = kwargs.get('execution_date')
            end_date = parse(str(end_date))
            print('Type of start_date_check : {0}'.format(type(start_date)))
            start_date = datetime.strftime(start_date, '%Y-%m-%d')
            end_date = datetime.strftime(end_date, '%Y-%m-%d')
    
    task_start  = DummyOperator(task_id = 'start',dag=dag)
    
    main_task= PythonOperator(task_id = 'IncrementalJobTask',
                                python_callable =printexecutiontimes,
                                provide_context = 'True',
                                dag=dag )
    
    task_end = DummyOperator(task_id = 'end',dag=dag)
    
    
    task_start >>main_task >> task_end
    

    【讨论】:

      【解决方案3】:

      它帮助了我:

      if isinstance(context['prev_execution_date_success'], type(None)):
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2018-10-18
        • 1970-01-01
        • 1970-01-01
        • 2017-12-31
        • 1970-01-01
        • 2019-09-20
        • 2021-10-19
        相关资源
        最近更新 更多