【发布时间】:2020-12-07 10:38:55
【问题描述】:
我有一个 DAG 执行一个 Python 脚本,该脚本接受一个日期参数(当前日期)。我将 DAG 安排在周一至周五上午 6:00 运行,即东部标准时间的工作日。 DAG 必须在星期一以星期一的日期作为参数运行 Python 脚本,从星期二一直到星期五,以星期五的日期作为参数也是如此。
我注意到使用 '0 6 * * 1-5' 的计划间隔不起作用,因为直到下周一才执行周五。
我将计划间隔更改为 '0 6 * * *' 以在每天早上 6:00 和我的一天开始时运行,过滤掉在 ‘0 6 * * 1-5’ 内的日期,因此有效的是周一到周五。对于周六和周日,应该跳过下游任务。
这是我的代码
from __future__ import print_function
import pendulum
import logging
from airflow.models import DAG
from airflow.models import Variable
from datetime import datetime, timedelta
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
from croniter import croniter
log = logging.getLogger(__name__)
def filter_processing_date(**context):
execution_date = context['execution_date']
cron = croniter('0 6 * * 1-5', execution_date)
log.info('cron is: {}'.format(cron))
log.info('execution date is: {}'.format(execution_date))
#prev_date = cron.get_prev(datetime)
#log.info('prev_date is: {}'.format(prev_date))
return execution_date == cron.get_next(datetime).get_prev(datetime)
local_tz = pendulum.timezone("America/New_York")
# DAG parameters
default_args = {
'owner': 'Managed Services',
'depends_on_past': False,
'start_date': datetime(2020, 8, 3, tzinfo=local_tz),
'dagrun_timeout': None,
'email': Variable.get('email'),
'email_on_failure': True,
'email_on_retry': False,
'provide_context': True,
'retries': 12,
'retry_delay': timedelta(minutes=5)
}
with DAG(
'execute_python',
schedule_interval='0 6 * * *',
default_args=default_args
) as dag:
start_dummy = DummyOperator(
task_id='start',
dag=dag
)
end_dummy = DummyOperator(
task_id='end',
trigger_rule=TriggerRule.NONE_FAILED,
dag=dag
)
weekdays_only = ShortCircuitOperator(
task_id='weekdays_only',
python_callable=filter_processing_date,
dag=dag
)
run_python = SSHOperator(
ssh_conn_id="oci_connection",
task_id='run_python',
command='/usr/bin/python3 /home/sb/local/bin/runProcess.py -d {{ ds_nodash }}',
dag=dag)
start_dummy >> weekdays_only >> run_python >> end_dummy
不幸的是,weekdays_only 任务失败并出现以下错误消息。出了什么问题?
Airflow error message continuation
气流版本:v1.10.9-composer
Python 3.
【问题讨论】:
标签: python cron airflow airflow-scheduler pendulum