【问题标题】:Airflow long running hourly DAG's missing few hours气流长时间运行的每小时 DAG 缺少几个小时
【发布时间】:2019-08-02 09:54:02
【问题描述】:

我的 DAG 计划每小时运行一次。我从 s3 源中提取每小时的数据并处理它们。有时任务需要一个多小时才能完成。那时,我错过了一个小时的数据。

示例: 下午 1:00 DAG 启动并运行了 2 小时。所以我的下一次 DAG 运行将参数设置为 3(3pm) 缺少 2pm 数据。换句话说,我如何调用任务并确保它每小时运行一次,一天运行 24 次

【问题讨论】:

  • 你能发布一个你的 DAG 的例子吗? Airflow 中的并发 DAG 没有问题,即您的 DAG 在下午 2 点应该运行得非常好,即使 DAG@1pm 仍在运行......
  • @dorvak 你是对的。这是我的逻辑。我想每小时运行一次 dag,并且我正在根据 current_time 过去一个小时。我的气流环境在队列中只占用 4 个实例。因此,有时我将小时作为参数传递的任务实例会延迟(由于其他长时间运行的作业)。示例:

标签: airflow


【解决方案1】:

这是我的 DAG

HOUR_PACIFIC = arrow.utcnow().shift(hours=-3).to('US/Pacific').format("HH")

dag = DAG(
    DAG_ID,
    catchup=False,
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=5),
    schedule_interval='0 * * * *')

start = DummyOperator(
    task_id='Start',
    dag=dag)

my_task = EMRStep(emr,
'stg',
HOUR_PACIFIC)

end = DummyOperator(
    task_id='End',
    dag=dag
)
start >> my_task >> end

【讨论】:

    【解决方案2】:

    您需要为 DAG 对象传递 catchup=True

    【讨论】:

      【解决方案3】:

      这似乎是使用TimeDeltaSensor 的完美场景


      注意:以下code-sn-p仅供参考,未经测试

      import datetime
      
      from airflow.models import DAG
      from airflow.operators.dummy_operator import DummyOperator
      from airflow.operators.python_operator import PythonOperator
      from airflow.sensors.time_delta_sensor import TimeDeltaSensor
      from airflow.utils.trigger_rule import TriggerRule
      
      # create DAG object
      my_dag: DAG = DAG(dag_id="my_dag",
                        start_date=datetime.datetime(year=2019, month=3, day=11),
                        schedule_interval="0 0 0 * * *")
      
      # create dummy begin & end tasks
      my_begin_task: DummyOperator = DummyOperator(dag=my_dag,
                                                   task_id="my_begin_task")
      my_end_task: DummyOperator = DummyOperator(dag=my_dag,
                                                 task_id="my_end_task",
                                                 trigger_rule=TriggerRule.ALL_DONE)
      
      # populate the DAG
      for i in range(1, 24, 1):
          # create sensors and actual tasks for all hours of the day
          my_time_delta_sensor: TimeDeltaSensor = TimeDeltaSensor(dag=my_dag,
                                                                  task_id=f"my_time_delta_sensor_task_{i}_hours",
                                                                  delta=datetime.timedelta(hours=i))
          my_actual_task: PythonOperator = PythonOperator(dag=my_dag,
                                                          task_id=f"my_actual_task_{i}_hours",
                                                          python_callable=my_callable
                                                          ..)
          # wire-up tasks together
          my_begin_task >> my_time_delta_sensor >> my_actual_task >> my_end_task
      

      参考文献

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2018-05-06
        • 1970-01-01
        • 1970-01-01
        • 2019-01-26
        • 2021-04-03
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多