【问题标题】:SQLAlchemy 1.4 async event listenersSQLAlchemy 1.4 异步事件监听器
【发布时间】:2021-06-07 15:02:30
【问题描述】:

尝试使用事件监听器(https://docs.sqlalchemy.org/en/14/core/events.html#sqlalchemy.events.ConnectionEvents ) 在异步 sqlalchemy 引擎上并收到此错误: {NotImplementedError}asynchronous events are not implemented at this time. Apply synchronous listeners to the AsyncEngine.sync_engine or AsyncConnection.sync_connection attributes.

如果我理解正确,我不能在异步引擎上使用事件,如果我想要事件支持,我必须切换到同步引擎?

engine: Engine = create_async_engine(
    URL, echo=True, future=True
)

async_session = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False, future=True
)


event.listens_for(engine, "do_connect")(do_connect_listener)
event.listens_for(engine, "engine_connect")(engine_connect_listener)

【问题讨论】:

    标签: sqlalchemy


    【解决方案1】:

    从 github 上的 SQLAlchemy 讨论部分得到了答案。所有功劳归于 OP(https://github.com/sqlalchemy/sqlalchemy/discussions/6594#discussioncomment-836437):

    “嗨,

    正如异常所建议的,您可以在应用事件时使用 sync_engine/sync_connection。下面是一个例子:"

    
    
    import asyncio
    from sqlalchemy import event
    from sqlalchemy.ext.asyncio import create_async_engine
    
    engine = create_async_engine("sqlite+aiosqlite:///:memory:")
    
    @event.listens_for(engine.sync_engine, "do_connect")
    def do_connect(dialect, conn_rec, cargs, cparams):
        print("some-function")
    
    @event.listens_for(engine.sync_engine, "engine_connect")
    def engine_connect(conn, branch):
        print("engine_connect", conn.exec_driver_sql("select 1").scalar())
    
    async def go():
        async with engine.connect() as conn:
            res = await conn.exec_driver_sql("select 2")
            print("go", res.scalar())
    
    asyncio.run(go())
    

    【讨论】:

      猜你喜欢
      • 2018-05-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-10-14
      • 2012-03-19
      • 1970-01-01
      相关资源
      最近更新 更多