【问题标题】:Does airflow support the cloud-sql-python-connector for connecting to CloudSQL Postgres?气流是否支持 cloud-sql-python-connector 连接到 CloudSQL Postgres?
【发布时间】:2022-01-27 17:47:55
【问题描述】:

Airflow 使用 SQLAlchemy 连接数据库:https://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri

Google 官方支持通过 cloud-sql-python-connector 包连接到 CloudSQL:https://cloud.google.com/sql/docs/postgres/connect-connectors#examples

看起来它甚至支持 Postgres 的 IAM 集成,这让事情变得更加无缝:https://github.com/GoogleCloudPlatform/cloud-sql-python-connector/commit/6703232d6ea624f868e750c8c49c3bb1151f1f1e#diff-f5a708e43b65d84f59aa4fd685978087acc7e903b740f3e45c68356dbb2fd7b4R77

这样您就不必使用 CloudSQL 代理(或使用私有网络/防火墙)并且可以像这样直接在 python 中连接:

# The Cloud SQL Python Connector can be used along with SQLAlchemy using the
# 'creator' argument to 'create_engine'
def init_connection_engine() -> sqlalchemy.engine.Engine:
    def getconn() -> pg8000.dbapi.Connection:
        conn: pg8000.dbapi.Connection = connector.connect(
            os.environ["POSTGRES_CONNECTION_NAME"],
            "pg8000",
            user=os.environ["POSTGRES_USER"],
            password=os.environ["POSTGRES_PASS"],
            db=os.environ["POSTGRES_DB"],
        )
        return conn

    engine = sqlalchemy.create_engine(
        "postgresql+pg8000://",
        creator=getconn,
    )
    engine.dialect.description_encoding = None
    return engine

我可以在气流中安装cloud-sql-python-connector 并在我的sql_alchemy_conn 设置中设置连接以使用它吗?

【问题讨论】:

    标签: python airflow google-cloud-sql


    【解决方案1】:

    要回答您的问题,是的,您可以在气流中安装cloud-sql-python-connector。我也试过了。

    下面是如何设置sql_alchemy_conn的示例配置。

    def init_connection_engine() -> sqlalchemy.engine.Engine:
            def getconn() -> pg8000.dbapi.Connection:
                conn: pg8000.dbapi.Connection = connector.connect(
                    CONNECTION_NAME,
                    "pg8000",
                    user=MYSQL_USER,
                    password=MYSQL_PASS,
                    db=DB_NAME,
                )
                return conn
    
            engine = sqlalchemy.create_engine(
                "postgresql+pg8000://",
                creator=getconn,
            )
            engine.dialect.description_encoding = None
    
            query = sqlalchemy.text("SELECT * FROM entries")
    
            with engine.connect() as db_conn:
                results = db_conn.execute(query)
                for result in results:
                    logging.info(result)
           
           process_data = python.PythonOperator(
            task_id='query_postgre',
            python_callable=init_connection_engine,
            provide_context=True
            )

    【讨论】:

    • 我很困惑你把上面的sn-p放在哪里?我以为您只能使用设置文件配置连接,并且它必须只是一个字符串。另外,您是如何在气流环境中安装软件包的?我不会在 DAG 代码中手动建立连接,而是使用设置为所有 DAG 配置连接。
    • 片段被放置在 PythonOperator 中。您也可以参考此 GCP doc。最后,getConn() 不接受 1 班轮环境变量。更新了我的代码 sn-p。 @red888
    • 我认为这不是我想要的。我想使用 cloud-sql-python-connector 和 IAM 集成设置一个 postgres 数据库后端。此外,您链接到我没有使用的作曲家文档。
    猜你喜欢
    • 1970-01-01
    • 2020-08-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-06-07
    • 2019-07-09
    • 2017-04-09
    • 1970-01-01
    相关资源
    最近更新 更多