【发布时间】:2019-07-23 08:56:45
【问题描述】:
我有一个名为 dss_controller 的控制器 dag
dag = DAG(
dag_id='dss_controller',
default_args={
"owner": "dss admin",
"start_date": datetime.utcnow(),
},
schedule_interval=None,
)
还有一个名为 dss_trigger_target_dag 的目标 dag
dag = DAG(
dag_id='dss_trigger_target_dag',
default_args=args,
schedule_interval=None,
)
在控制器和目标 dag 中都定义了任务,就像在默认的可用示例中一样。
当 dss_controller 的 schedule_interval 设置为“@once”时,此系统将按预期工作。
然后我将其设置为 None 并在外部触发。它会触发控制器 dag 并将其移至 running 状态,然后将其移至 success 状态。
但它不会触发控制器 dag 的 dss_trigger_dagrun 任务。 这种行为的原因是什么?
设置 schedule_interval=None 是原因,如果是,怎么办?
这是我的控制器,
import pprint
import pprint
from datetime import datetime
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
pp = pprint.PrettyPrinter(indent=4)
def conditionally_trigger(context, dag_run_obj):
"""This function decides whether or not to Trigger the remote DAG"""
c_p = context['params']['condition_param']
print("Controller DAG : conditionally_trigger = {}".format(c_p))
if context['params']['condition_param']:
dag_run_obj.payload = {'message': context['params']['message']}
pp.pprint(dag_run_obj.payload)
return dag_run_obj
# Define the DAG
dag = DAG(
dag_id='dss_controller',
default_args={
"owner": "dss admin",
"start_date": datetime.utcnow(),
},
schedule_interval=None,
)
# Define the single task in this controller example DAG
trigger = TriggerDagRunOperator(
task_id='dss_trigger_dagrun',
trigger_dag_id="dss_trigger_target_dag",
python_callable=conditionally_trigger,
params={'condition_param': True, 'message': 'Hello Hasitha'},
dag=dag,
)
这是我的目标,
import pprint
from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
pp = pprint.PrettyPrinter(indent=4)
args = {
'start_date': datetime.utcnow(),
'owner': 'dss admin',
}
dag = DAG(
dag_id='dss_trigger_target_dag',
default_args=args,
schedule_interval=None,
)
def run_this_func(ds, **kwargs):
print("Remotely received value of {} for key=message".
format(kwargs['dag_run'].conf['message']))
run_this = PythonOperator(
task_id='target_run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag,
)
【问题讨论】:
标签: python-3.x airflow airflow-scheduler