【问题标题】:Scheduling Airflow DAGs to run exclusively Monday through Friday i.e only weekdays安排 Airflow DAG 仅在周一至周五运行,即仅在工作日运行
【发布时间】:2020-12-07 10:38:55
【问题描述】:

我有一个 DAG 执行一个 Python 脚本,该脚本接受一个日期参数(当前日期)。我将 DAG 安排在周一至周五上午 6:00 运行,即东部标准时间的工作日。 DAG 必须在星期一以星期一的日期作为参数运行 Python 脚本,从星期二一直到星期五,以星期五的日期作为参数也是如此。

我注意到使用 '0 6 * * 1-5' 的计划间隔不起作用,因为直到下周一才执行周五。

我将计划间隔更改为 '0 6 * * *' 以在每天早上 6:00 和我的一天开始时运行,过滤掉在 ‘0 6 * * 1-5’ 内的日期,因此有效的是周一到周五。对于周六和周日,应该跳过下游任务。

这是我的代码

from __future__ import print_function
import pendulum
import logging
from airflow.models import DAG
from airflow.models import Variable
from datetime import datetime, timedelta
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
from croniter import croniter


log = logging.getLogger(__name__)

def filter_processing_date(**context):
    execution_date = context['execution_date']
    cron = croniter('0 6 * * 1-5', execution_date)
    log.info('cron is: {}'.format(cron))
    log.info('execution date is: {}'.format(execution_date))
    #prev_date = cron.get_prev(datetime)
    #log.info('prev_date is: {}'.format(prev_date))
    return execution_date == cron.get_next(datetime).get_prev(datetime)


local_tz = pendulum.timezone("America/New_York")
# DAG parameters

default_args = {
    'owner': 'Managed Services',
    'depends_on_past': False,
    'start_date': datetime(2020, 8, 3, tzinfo=local_tz),
    'dagrun_timeout': None,
    'email': Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'provide_context': True,
    'retries': 12,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    'execute_python',
    schedule_interval='0 6 * * *',
    default_args=default_args
    ) as dag:

    start_dummy = DummyOperator(
        task_id='start',
        dag=dag
    )

    end_dummy = DummyOperator(
        task_id='end',
        trigger_rule=TriggerRule.NONE_FAILED,
        dag=dag
    )

    weekdays_only = ShortCircuitOperator(
        task_id='weekdays_only',
        python_callable=filter_processing_date,
        dag=dag
    )


    run_python = SSHOperator(
    ssh_conn_id="oci_connection",
    task_id='run_python',
    command='/usr/bin/python3  /home/sb/local/bin/runProcess.py -d {{ ds_nodash }}',
    dag=dag)


    start_dummy >> weekdays_only >> run_python >> end_dummy

不幸的是,weekdays_only 任务失败并出现以下错误消息。出了什么问题?

Airflow error message

Airflow error message continuation

气流版本:v1.10.9-composer

Python 3.

【问题讨论】:

    标签: python cron airflow airflow-scheduler pendulum


    【解决方案1】:

    我设法通过破解一些东西来解决我的问题。检查下一个执行日期是否是工作日,如果是则返回 true,否则返回 false。我在 ShortCircuitOperator 中调用该函数,如果为 true,则继续执行下游任务,如果为 false,则跳过它们。

    这是我下面的代码,但我愿意接受更好的解决方案。

    from __future__ import print_function
    import pendulum
    import logging
    from airflow.models import DAG
    from airflow.models import Variable
    from datetime import datetime, timedelta
    from airflow.contrib.operators.ssh_operator import SSHOperator
    from airflow.operators.python_operator import ShortCircuitOperator
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.utils.trigger_rule import TriggerRule
    
    
    log = logging.getLogger(__name__)
    
    
    def checktheday(**context):
        next_execution_date = context['next_execution_date']
        log.info('next_execution_date is: {}'.format(next_execution_date))
        date_check = next_execution_date.weekday()
        log.info('date_check is: {}'.format(date_check))
        if date_check == 0 or date_check == 1 or date_check == 2 or date_check == 3 or date_check == 4:
            decision = True
        else:
            decision = False
    
        log.info('decision is: {}'.format(decision))
        return decision
    
    
    local_tz = pendulum.timezone("America/New_York")
    # DAG parameters
    
    default_args = {
        'owner': 'Managed Services',
        'depends_on_past': False,
        'start_date': datetime(2020, 8, 3, tzinfo=local_tz),
        'dagrun_timeout': None,
        'email': Variable.get('email'),
        'email_on_failure': True,
        'email_on_retry': False,
        'provide_context': True,
        'retries': 12,
        'retry_delay': timedelta(minutes=5)
    }
    
    with DAG(
        'execute_python',
        schedule_interval='0 6 * * *',
        default_args=default_args
        ) as dag:
    
        start_dummy = DummyOperator(
            task_id='start',
            dag=dag
        )
    
        end_dummy = DummyOperator(
            task_id='end',
            trigger_rule=TriggerRule.NONE_FAILED,
            dag=dag
        )
    
        weekdays_only = ShortCircuitOperator(
            task_id='weekdays_only',
            python_callable=checktheday,
            dag=dag
        )
    
    
        run_python = SSHOperator(
        ssh_conn_id="oci_connection",
        task_id='run_python',
        command='/usr/bin/python3  /home/sb/local/bin/runProcess.py -d {{ macros.ds_format(macros.ds_add(ds, 1), "%Y-%m-%d", "%Y%m%d") }}',
        dag=dag)
    
    
        start_dummy >> weekdays_only >> run_python >> end_dummy
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-10-03
      • 2018-09-30
      • 1970-01-01
      • 1970-01-01
      • 2018-02-16
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多