【发布时间】:2020-03-26 14:44:35
【问题描述】:
我正在创建一个具有多个 'for' 循环的动态 DAG。它正确地启动了流动,但在下游连接得不是很好。任务“dummy_ender_0_a”按预期连接到“toto_a”。但我期待“dummy_ender_1_a”和“dummy_ender_2_a”也连接到下游任务“toto_a”。我不确定我在这里缺少什么。代码如下:
import datetime
from airflow import models
from airflow.utils import dates
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import python_operator
ENV = 'DEV'
META_DELTA_DAYS=3
tables=['a','b','c','d']
with models.DAG(
'test_'+ENV,
schedule_interval = None ,
concurrency = 1 ,
max_active_runs = 1,
catchup = False, # for now manualy catchup
default_args={
"start_date": datetime.datetime.strptime('2019-09-01 23:10:00', '%Y-%m-%d %H:%M:%S'),
"retries": 1,
"retry_delay": datetime.timedelta(minutes=1),
"execution_timeout": datetime.timedelta(minutes=14), # safty check
# notifications
"email_on_failure": True,
"email_on_retry": False
'email': [models.Variable.get('alert_email')],
"project_id": models.Variable.get('project'),
'provide_context': True
}
) as dag:
def parsing_ts(**kwargs):
etl_run_time_nodash = kwargs['ts_nodash'].replace('+','').replace('T','')[:-2]
etl_run_time = datetime.datetime.fromtimestamp(kwargs['execution_time'].timestamp())
print('>>> START DATE >>>>',etl_run_time_nodash)
for i,dt in enumerate([etl_run_time - datetime.timedelta(days=x) for x in range(META_DELTA_DAYS)]):
kwargs['ti'].xcom_push(key='curr_yr_minus_'+str(i), value=dt.year)
kwargs['ti'].xcom_push(key='curr_mth_minus_'+str(i), value=dt.month)
kwargs['ti'].xcom_push(key='curr_day_minus_'+str(i), value=dt.dayofmonth)
def printval(**kwargs):
etl_run_time_nodash = kwargs['ti'].xcom_pull(key='etl_run_time_nodash')
print('>>> START DATE NO DASH>>>>',etl_run_time_nodash)
def start(table,**kwargs):
return DummyOperator(task_id = "dummy_start_%s"%(table) )
def starter(i,table,**kwargs):
return DummyOperator(task_id = "dummy_starter_%s_%s"%(i,table) )
def ender(i,table,**kwargs):
return DummyOperator(task_id = "dummy_ender_%s_%s"%(i,table) )
def toto(table,**kwargs):
return DummyOperator(task_id = "toto_%s"%(table) )
def end(**kwargs):
return DummyOperator(task_id = "dummy_end")
parsing_ts_op = python_operator.PythonOperator(
task_id='parsing_ts_operator',
provide_context=True,
python_callable=parsing_ts)
printval_op = python_operator.PythonOperator(
task_id='printval_operator',
provide_context=True,
python_callable=printval)
for table in tables:
parsing_ts_op >> start(table)
for i in range(META_DELTA_DAYS):
start(table) >> starter(i,table)
starter(i,table) >> ender(i,table)
ender(i,table) >> toto(table)
toto(table) >> printval_op
printval_op >> end()
【问题讨论】:
标签: google-cloud-platform airflow google-cloud-composer