【问题标题】:Airflow Scheduler throws error for DAGs with schedule_interval as NoneAirflow Scheduler 对 schedule_interval 为 None 的 DAG 抛出错误
【发布时间】:2020-04-08 08:09:36
【问题描述】:

我有气流问题。有一个客户生成器脚本,它接受来自 yaml 文件的输入并加载 DAG。当所有 DAG yaml 文件的计划间隔为非“无”时,它工作正常。有许多 DAG 的 schedule_interval 为 None,而其中很少有 @once。

YAML 文件示例是 -

cluster:
  nodes: 10
  subnet: "subnet-A"
  instance: "m4.2xlarge"
  configbucket: "bucketabc"
  jar: "s3://xxxxx.jar"
  conf: "app.conf"

schedule:
  state: "unpause"
  concurrency: 10
  startdate: "2050-08-05 00:00"
  cron: "None"

生成器脚本如下 -

            if "schedule" in project_settings:
                schedule_settings = project_settings["schedule"]
                concurrency = schedule_settings["concurrency"]
                cron =  schedule_settings["cron"]
                startdate =  datetime.strptime(schedule_settings["startdate"], "%Y-%m-%d %H:%M")

            #print "my projectname is: " + project

            dag = DAG(
                dag_id = project,
                default_args=args,
                user_defined_macros=user_macros,
                schedule_interval=cron,
                concurrency=concurrency,
                start_date=startdate
            )

schedule_interval=None 有很多 DAG 时出现的错误

INFO - [2020-04-08 12:30:45,529] {dagbag.py:302} ERROR - Failed to bag_dag: /home/deploy/airflow/dags/genertor.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models/dagbag.py", line 296, in process_file
    croniter(dag._schedule_interval)
  File "/usr/local/lib/python3.6/site-packages/croniter/croniter.py", line 91, in __init__
    self.expanded, self.nth_weekday_of_month = self.expand(expr_format)
  File "/usr/local/lib/python3.6/site-packages/croniter/croniter.py", line 468, in expand
    raise CroniterBadCronError(cls.bad_length)
croniter.croniter.CroniterBadCronError: Exactly 5 or 6 columns has to be specified for iteratorexpression.

有人遇到过这个问题吗?

【问题讨论】:

    标签: python airflow airflow-scheduler


    【解决方案1】:

    Airflow DAG schedule_interval 可以是cron espression 为string,也可以是None(注意不是string "None")。

    在您的设置中,您有:

    cron: "None"
    

    这是 Python 中的字符串。如果您无法将该 YAML 文件更改为:

    cron: None
    

    您仍然可以在 DAG 本身中检查该字符串:

    schedule_interval = None if cron == "None" else cron
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-09-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-06-06
      相关资源
      最近更新 更多