【问题标题】:SqlAlchemy asyncio orm: How to query the databaseSqlAlchemy asyncio orm:如何查询数据库
【发布时间】:2021-09-22 09:45:12
【问题描述】:

在 SqlAlchemy 异步 orm 引擎中如何查询表并获取值或全部?

我知道我可以做的非异步方法

SESSION.query(TableClass).get(x)

但尝试使用异步方法会引发下一个错误:

AttributeError: 'AsyncSession' object has no attribute 'query'.

这是我定义的 SESSION 变量。 LOOP 变量只是 asyncio.get_event_loop() 用于在我的 sql 模块加载时启动异步方法并填充用作缓存的变量以避免每次我需要某些东西时缓存数据库:

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session

from .. import CONFIG, LOOP

def _build_async_db_uri(uri):
    if "+asyncpg" not in uri:
        return '+asyncpg:'.join(uri.split(":", 1))
    return uri

async def start() -> declarative_base:
    engine = create_async_engine(_build_async_db_uri(CONFIG.general.sqlalchemy_db_uri))
    async with engine.begin() as conn:
        BASE.metadata.bind = engine
        await conn.run_sync(BASE.metadata.create_all)
    return scoped_session(sessionmaker(bind=engine, autoflush=False, class_=AsyncSession))


BASE = declarative_base()
SESSION = LOOP.run_until_complete(start())

这是一个表和缓存函数的例子:

class TableClass:
    __tablename__ = "tableclass"
    id = Column(Integer, primary_key = True)
    alias = Column(Integer)

CACHE = {}
async def _load_all():
    global CACHE
    try:
        curr = await SESSION.query(TableClass).all()
        CACHE = {i.id: i.alias for i in curr}

LOOP.run_until_complete(_load_all())

【问题讨论】:

    标签: python-3.x sqlalchemy python-asyncio


    【解决方案1】:

    session.query 是旧的 API。异步版本使用 select 和附带的方法。

    from sqlalchemy.future import select
    from sqlalchemy.ext.asyncio import AsyncSession
    from sqlalchemy.orm import sessionmaker
    
    
    engine = create_async_engine(_build_async_db_uri(CONFIG.general.sqlalchemy_db_uri))
    async_session = sessionmaker(
        engine, expire_on_commit=False, class_=AsyncSession
    )
    
    
    CACHE = {}
    async def _load_all():
        global CACHE
        try:
            async with async_session() as session:
                q = select(TableClass)
                result = await session.execute(q)
                curr = result.scalars()
                CACHE = {i.id: i.alias for i in curr}
        except:
            pass
    
    LOOP.run_until_complete(_load_all())
    

    您可以阅读更多关于 SqlAlchemy 异步 I/O here

    【讨论】:

    • 感谢文档链接。我注意到那里有东西。在另一个示例中,有一个在可等待对象中使用遗留 API(非异步 API)的小实现。如果我使用 run_async,非 async 函数会执行 asyncio 吗?
    • 是的,当然。您始终可以将旧 API 与同步引擎和会话一起使用
    • 这到底叫什么?如果我知道这个q = select(TableClass)这种查询的确切名称,搜索起来会更容易。
    • @HarshitSinghai 这是 sqlalchemy 核心。这是文档:docs.sqlalchemy.org/en/14/core
    猜你喜欢
    • 2022-01-07
    • 2015-04-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-05
    • 1970-01-01
    • 2017-02-18
    相关资源
    最近更新 更多