【问题标题】:Externally trigger dag which has schedule_interval=None外部触发具有 schedule_interval=None 的 dag
【发布时间】:2019-07-23 08:56:45
【问题描述】:

我有一个名为 dss_controller 的控制器 dag

dag = DAG(
dag_id='dss_controller',
default_args={
    "owner": "dss admin",
    "start_date": datetime.utcnow(),
},
schedule_interval=None,
)

还有一个名为 dss_trigger_target_dag 的目标 dag

dag = DAG(
dag_id='dss_trigger_target_dag',
default_args=args,
schedule_interval=None,
)

在控制器和目标 dag 中都定义了任务,就像在默认的可用示例中一样。

dss_controllerschedule_interval 设置为“@once”时,此系统将按预期工作。

然后我将其设置为 None 并在外部触发。它会触发控制器 dag 并将其移至 running 状态,然后将其移至 success 状态。

但它不会触发控制器 dag 的 dss_trigger_dagrun 任务。 这种行为的原因是什么?

设置 schedule_interval=None 是原因,如果是,怎么办?

这是我的控制器,

import pprint
import pprint
from datetime import datetime

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator

pp = pprint.PrettyPrinter(indent=4)


def conditionally_trigger(context, dag_run_obj):
   """This function decides whether or not to Trigger the remote DAG"""
   c_p = context['params']['condition_param']
   print("Controller DAG : conditionally_trigger = {}".format(c_p))
   if context['params']['condition_param']:
      dag_run_obj.payload = {'message': context['params']['message']}
      pp.pprint(dag_run_obj.payload)
      return dag_run_obj

   # Define the DAG
   dag = DAG(
      dag_id='dss_controller',
    default_args={
        "owner": "dss admin",
        "start_date": datetime.utcnow(),
    },
    schedule_interval=None,
)

# Define the single task in this controller example DAG
trigger = TriggerDagRunOperator(
    task_id='dss_trigger_dagrun',
    trigger_dag_id="dss_trigger_target_dag",
    python_callable=conditionally_trigger,
    params={'condition_param': True, 'message': 'Hello Hasitha'},
    dag=dag,
)

这是我的目标,

import pprint
from datetime import datetime

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

pp = pprint.PrettyPrinter(indent=4)

args = {
    'start_date': datetime.utcnow(),
    'owner': 'dss admin',
}

dag = DAG(
    dag_id='dss_trigger_target_dag',
    default_args=args,
    schedule_interval=None,
)


def run_this_func(ds, **kwargs):
    print("Remotely received value of {} for key=message".
          format(kwargs['dag_run'].conf['message']))


run_this = PythonOperator(
    task_id='target_run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag,
)

【问题讨论】:

    标签: python-3.x airflow airflow-scheduler


    【解决方案1】:

    您已将“start_date”作为 now() 添加到默认参数中,用于每个任务。看来,这才是真正的罪魁祸首。 Airflow 建议不要这样做,因为它可以防止触发任务。 尝试将开始日期设置为过去的日期,例如气流.utils.dates.days_ago(1)。


    参考:https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date

    【讨论】:

    • 是 - 如果你想限制外部触发器,你设置 schedule_interval=None 是正确的,但你必须包含一个有效的 start_date 。可以按照建议使用days_ago 或类似名称进行设置,也可以是任意日期。有一个 PR 可以让 start_date 对于外部触发器用例可选,但据我所知,目前这是正确的解决方法。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-01-05
    • 1970-01-01
    • 1970-01-01
    • 2022-07-18
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多