【发布时间】:2019-09-02 19:46:04
【问题描述】:
我正在尝试使用 python 更新 Airflow 连接。我创建了一个 python 函数,它从 API 获取身份验证令牌并更新 Airflow 中的额外连接字段。
我得到如下 json 格式的令牌:
{
"token" : token_value
}
下面是我正在使用的python代码部分
def set_token():
# Get token from API & update the Airflow Variables
Variable.set("token", str(auth_token))
new_token = Variables.get("token")
get_conn = Connection(conn_id="test_conn")
auth_token = { "header" : new_token}
get_conn.set_extra(str(auth_token))
但是当我运行任务时,气流连接中的额外字段没有得到更新。我可以看到我的变量正在更新,但没有连接。谁能告诉我我错过了什么?
【问题讨论】:
-
Connection只是创建了一个类的实例。它不会从数据库中获取连接,并且 set_extra 不会更新数据库。观察github.com/apache/airflow/blob/v1-10-stable/airflow/hooks/… 的 base_hook 源代码。_get_connections_from_db函数显示了它是如何从数据库中获取的。 -
我的建议是使用
Session类并始终使用 sqlalchemy 方法。这是一种技巧,而不是正确的方法。 (我不知道任何正确的方法,因为它没有记录)
标签: connection airflow airflow-scheduler