【发布时间】:2021-06-25 23:04:10
【问题描述】:
我需要一个 Airflow 宏值,但返回的字符串没有按预期读取,我得到的只是一个损坏的 DAG。我已经在终端测试了部分脚本,看看是否有问题,但似乎并非如此。
我的期望是像 '2016-06-28T16:51:45.978473-05:00' 这样的字符串变成 '2016-06-28T16:51'
这里是代码。这部分位于 DAG 装饰器作用域 with DAG(..) as dag: 之前。
exec_date = '{{ execution_date }}'
exec_date = re.findall(r"^[\d]{4}-[\d]{2}-[\d]{2}T[\d]{2}:[\d]{2}", exec_date)[0]
错误信息:
Broken DAG: [<path-to-dag>/processing_dag.py] Traceback (most recent call last):
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "<path-to-dag>/processing_dag.py", line 16, in <module>
exec_date = re.findall(r"^[\d]{4}-[\d]{2}-[\d]{2}T[\d]{2}:[\d]{2}", exec_date)[0]
IndexError: list index out of range
这意味着我没有以Airflow docs 指定的格式获得'{{ execution_date }}'。
从 Airflow 服务器运行 DAG 脚本不会激活宏并且 DAG 已损坏,所以我不知道如何调试代码。有没有办法打印'{{ execution_date }}' 的值,以便我了解发生了什么?
[编辑] 根据要求,这里是脚本的一些相关部分。导入的模块是:
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datamechanics_airflow_plugin.operator import DataMechanicsOperator
from airflow.models import Variable
from airflow.utils.dates import days_ago
from datetime import datetime
import pendulum
import re
脚本顶部:
local_tz = pendulum.timezone("America/Sao_Paulo")
exec_date = '{{ execution_date }}'
exec_date = re.findall(r"^[\d]{4}-[\d]{2}-[\d]{2}T[\d]{2}:[\d]{2}", exec_date)[0]
exec_date = datetime.strptime(exec_date, "%Y-%m-%dT%H:%M")
with DAG(
dag_id="processing_dag",
start_date=days_ago(0, second = 1).astimezone(tz=local_tz),
schedule_interval="@daily",
) as dag:
<tasks, etc>...
【问题讨论】:
-
分享你dag的相关部分
-
我已编辑问题以显示更多详细信息
-
你在哪里使用 exec_date?