【问题标题】:Problem updating the connections in Airflow programatically以编程方式更新 Airflow 中的连接时出现问题
【发布时间】:2019-09-02 19:46:04
【问题描述】:

我正在尝试使用 python 更新 Airflow 连接。我创建了一个 python 函数,它从 API 获取身份验证令牌并更新 Airflow 中的额外连接字段。

我得到如下 json 格式的令牌:

{
   "token" : token_value
}

下面是我正在使用的python代码部分

def set_token():
    # Get token from API & update the Airflow Variables
    Variable.set("token", str(auth_token))
    new_token = Variables.get("token")
    get_conn = Connection(conn_id="test_conn")
    auth_token = { "header" : new_token}
    get_conn.set_extra(str(auth_token))

但是当我运行任务时,气流连接中的额外字段没有得到更新。我可以看到我的变量正在更新,但没有连接。谁能告诉我我错过了什么?

【问题讨论】:

  • Connection 只是创建了一个类的实例。它不会从数据库中获取连接,并且 set_extra 不会更新数据库。观察github.com/apache/airflow/blob/v1-10-stable/airflow/hooks/… 的 base_hook 源代码。 _get_connections_from_db 函数显示了它是如何从数据库中获取的。
  • 我的建议是使用Session 类并始终使用 sqlalchemy 方法。这是一种技巧,而不是正确的方法。 (我不知道任何正确的方法,因为它没有记录)

标签: connection airflow airflow-scheduler


【解决方案1】:

我怀疑您是否以正确的方式从 Airflow 的元数据库中获取连接。

  • 除此之外,如果您通过Variable.get() method 获取Variable,不应该Connection 接受相同的处理(尽管Connection class 没有get() 函数,但必须有一个解决方法)?
  • 在这里,您只是使用给定的 conn_id 参数实例化 Connection 对象(并没有真正从 db 获取该 conn_id 的 Connection)

每当我必须利用底层的SQLAlchemy 模型时,我都会查看cli.py。从connections() 函数中获取线索,这是我认为应该起作用的方法

from airflow.models import Connection
from airflow.settings import Session
from airflow.utils.db import provide_session
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import exc

@provide_session
def update_conn_extra(conn_id: str, new_extra: Any, session: Optional[Session] = None) -> Optional[Connection]:
    try:
        my_conn: Optional[Connection] = (session
                                         .query(Connection)
                                         .filter(Connection.conn_id == conn_id)
                                         .one())
    except exc.NoResultFound:
        my_conn: Optional[Connection] = None
    except exc.MultipleResultsFound:
        my_conn: Optional[Connection] = None
    if my_conn:
        my_conn.extra: Any = new_extra
        session.add(my_conn)
        session.commit()

请注意,这里我们只是简单地用更新的字段覆盖连接(无需先删除现有的字段),我发现这是可行的。如果您遇到一些问题,您可以在使用session.delete(my_conn)

编写更新之前删除现有连接

【讨论】:

  • No Variable 仅用于设置和获取令牌。然后我使用连接来更新其中的额外字段。在您的代码中,您只是获得连接。如何更新连接中的额外字段?是否有任何更新功能或 CLI 命令?
  • 好的,所以我尝试了这个并使用连接进行了 set_extra 并且它有效。谢谢!
猜你喜欢
  • 1970-01-01
  • 2021-09-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-06-30
  • 1970-01-01
相关资源
最近更新 更多