【问题标题】:Airflow - Tell DAG to skip processing the 2nd of each month气流 - 告诉 DAG 跳过每月 2 日的处理
【发布时间】:2019-05-22 05:10:52
【问题描述】:

我构建了一个简单的 AirFlow 管道来处理传入的历史信息。由于数据的性质,每个月的第二天都有大量更高的交易量。在那些日子里,我想以 10 分钟为增量处理数据:*/10 * 2 * *

剩下的日子,我想正常处理,以2小时为增量'0 */2 * * *'

如何告诉我的“正常” DAG 跳过每月 2 日的处理?

DAG 本身非常简单:

dag = DAG(
dag_name,
catchup=True,
default_args=default_args,
schedule_interval=schedule_interval
)   


with dag:
    historical = HistoricalToS3Operator(
    task_id=dag_name + '_extract',
    model=HistoricalModel.INVOICES
    )



    redshift = S3ToRedshiftOperator(
        task_id=dag_name + '_load',
        load_type='upsert',
        type_check=False,
        primary_key=primary_key,
        distkey=distkey,
        sortkey=sortkey,
        incremental_key=incremental_key,
        data_type='json',
        dag=dag
        )
historical >> redshift

【问题讨论】:

    标签: python python-2.7 cron airflow airflow-scheduler


    【解决方案1】:

    Airflow 使用 python 库 croniter,这就是使用示例易于解释的原因:

    试试这个:

    iter = croniter('0 0 1,3-31 * *', base)
    
    In [27]: print(iter.get_next(datetime))
    2010-02-26 00:00:00
    
    In [28]: print(iter.get_next(datetime))
    2010-02-27 00:00:00
    
    In [29]: print(iter.get_next(datetime))
    2010-02-28 00:00:00
    
    In [30]: print(iter.get_next(datetime))
    2010-03-01 00:00:00
    
    In [31]: print(iter.get_next(datetime))
    2010-03-03 00:00:00
    

    【讨论】:

    • 所以我可以定义我的计划间隔,例如: schedule_interval = croniter('0 0 1,3-31 * *', base) ??
    • schedule_interval = "0 0 1,3-31 * *"
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-12-21
    • 2022-06-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多