【问题标题】:How to properly handle Daylight Savings Time in Apache Airflow?如何正确处理 Apache Airflow 中的夏令时?
【发布时间】:2017-09-25 12:45:20
【问题描述】:

在气流中,一切都应该是 UTC(不受 DST 影响)。

但是,我们的工作流会根据受 DST 影响的时区交付内容。

一个示例场景:

  • 我们安排了一项工作,开始日期为东部时间上午 8:00,计划间隔为 24 小时。
  • 每天东部时间上午 8 点,调度程序发现距离上次运行已过去 24 小时,并运行该作业。
  • DST 发生,我们损失了一个小时。
  • 今天东部时间上午 8 点,调度程序发现它只有 23 小时,因为机器上的时间是 UTC,并且直到东部时间上午 9 点才运行作业,这是延迟交付

有没有办法安排 dag,以便它们在时间更改后在正确的时间运行?

【问题讨论】:

标签: dst airflow


【解决方案1】:

在我的头顶:

如果您的机器支持时区,请将您的 DAG 设置为在美国东部标准时间上午 8 点在美国东部标准时间上午 8 点运行。像0 11,12 * * * 这样的东西。第一个任务是 ShortCircuit 操作员。然后使用 pytz 之类的东西来定位当前时间。如果在您要求的时间内,请继续(即:运行 DAG)。否则,返回 False。您每天将有 2 个额外任务的微小开销,但只要您的机器不超载,延迟应该是最小的。

草率的例子:

from datetime import datetime
from pytz import utc, timezone

# ...

def is8AM(**kwargs):
    ti = kwargs["ti"]
    curtime = utc.localize(datetime.utcnow())
    # If you want to use the exec date:
    # curtime = utc.localize(ti.execution_date)
    eastern = timezone('US/Eastern') # From docs, check your local names
    loc_dt = curtime.astimezone(eastern)
    if loc_dt.hour == 8:
        return True
    return False

start_task = ShortCircuitOperator(
                task_id='check_for_8AM',
                python_callable=is8AM,
                provide_context=True,
                dag=dag
            )

希望对你有帮助

编辑:运行时错误,减去而不是添加。此外,由于跑步的启动方式,如果您希望他们在 8 点跑步,您最终可能会希望以每小时的时间表安排早上 7 点。

【讨论】:

  • 感谢您抽出宝贵时间回答。我认为这实际上会起作用,但它有点绕过气流功能。我真的在寻找一种方法,我可以按预期更多地使用 start_date 和 schedule_interval。这种方式会导致 dag 运行的每一天都额外运行一次,这会在 UI 上乱扔只有第一个操作员运行的运行。
  • 问题出在 Python 和 cron 上。确实没有用于 python 的内置系统以一种时区感知的方式检查时间。另一种方法是编写另一个 dag(哈哈)来动态生成这些 dag,其开始和结束日期(和执行时间)等于当前日光或标准时间。然后,您每天运行的 dag 会少一个,但您的列表中会再显示一个 dag。在时间更改前一天运行它可能会起作用。
【解决方案2】:

我们使用了@apathyman 解决方案,但我们只是使用了 PythonOperator 而不是 ShortCircuit,如果不是我们想要的时间,它会失败,并且重试 timedelta 为 1 小时。 这样我们每天只有 1 次跑步而不是 2 次。

并且计划间隔设置为仅在第一个小时运行

所以基本上,类似的东西(大部分代码取自上述答案,感谢@apathyman):

from datetime import datetime
from datetime import timedelta
from pytz import utc, timezone


def is8AM(**kwargs):
    ti = kwargs["ti"]
    curtime = utc.localize(datetime.utcnow())
    # If you want to use the exec date:
    # curtime = utc.localize(ti.execution_date)
    eastern = timezone('US/Eastern') # From docs, check your local names
    loc_dt = curtime.astimezone(eastern)
    if loc_dt.hour == 8:
        return True
    exit("Not the time yet, wait 1 hour")

start_task = PythonOperator(
            task_id='check_for_8AM',
            python_callable=is8AM,
            provide_context=True,
            retries=1,
            retry_delay=timedelta(hours=1),
            dag=dag
        )

【讨论】:

    【解决方案3】:

    当气流在 1.8.x 版本时被问到这个问题。

    这个功能现在是内置的,从气流 1.10 开始。

    https://airflow.apache.org/timezone.html

    airflow.cfg中设置时区,应该正确处理dst。

    【讨论】:

    • 但请注意schedule_interval 仍然不会考虑夏令时。正如文档中所写:In case you set a cron schedule, Airflow assumes you will always want to run at the exact same time. It will then ignore day light savings time. Thus, if you have a schedule that says run at end of interval every day at 08:00 GMT+1 it will always run end of interval 08:00 GMT+1, regardless if day light savings time is in place. 解决方法是使用时区感知 datetime 对象,请参见此处:stackoverflow.com/q/52668410/1201003
    • 谢谢@Dalar 我认为这是一个重要的澄清。
    【解决方案4】:

    我相信我们只需要一个 PythonOperator 来处理这种情况。

    如果 DAG 需要在 DST TZ 中运行(例如:America/New_York、Europe/London、Australia/Sydney),那么以下是我可以考虑的解决方法步骤:

    1. 将 DAG 时间表转换为 UTC TZ。
      因为TZ有夏令时,所以我们需要选择更大的偏移量 在进行转换时。例如:
      • 对于 America/New_York TZ:我们必须使用偏移量-4。所以计划*/10 11-13 * * 1-5 将转换为*/10 15-17 * * 1-5
      • 对于欧洲/伦敦:我们必须使用偏移量+1。所以计划35 */4 * * * 将转换为35 3-23/4 * * *
      • 对于澳大利亚/悉尼:我们必须使用偏移量+11。所以计划15 8,9,12,18 * * * 将转换为15 21,22,1,7 * * *
    2. 使用PythonOperator 在所有主要任务之前创建一个任务。此任务将检查当前时间是否在指定 TZ 的 DST 中。如果是,则任务将在 1 小时内休眠。 这样我们就可以处理DST TZ的情况了。

      def is_DST(zonename):
          tz = pytz.timezone(zonename)
          now = pytz.utc.localize(datetime.utcnow())
          return now.astimezone(tz).dst() != timedelta(0)
      
      
      def WQ_DST_handler(TZ, **kwargs):
          if is_DST(TZ):
              print('Currently is daily saving time (DST) in {0}, will process to next task now'.format(TZ))
          else:
              print('Currently is not daily saving time (DST) in {0}, will sleep 1 hour...'.format(TZ))
              time.sleep(60 * 60)
      
      
      DST_handler = PythonOperator(
          task_id='DST_handler',
          python_callable=WQ_DST_handler,
          op_kwargs={'TZ': TZ_of_dag},
          dag=dag
      )
      
      DST_handler >> main_tasks
      

    这种解决方法有一个缺点:对于需要在 DST TZ 中运行的任何 DAG,我们必须创建 1 个进一步的任务(上例中的 DST_handler),并且该任务仍然需要发送到工作节点来执行(尽管它几乎只是一个睡眠命令)。

    【讨论】:

      猜你喜欢
      • 2014-04-21
      • 1970-01-01
      • 2021-01-20
      • 2019-12-16
      • 2020-07-14
      • 2013-03-18
      • 2022-07-20
      • 1970-01-01
      • 2013-06-08
      相关资源
      最近更新 更多