【问题标题】:FastApi Sqlalchemy how to manage transaction (session and multiple commits)FastApi Sqlalchemy 如何管理事务(会话和多次提交)
【发布时间】:2021-04-18 08:24:51
【问题描述】:

我有一个带有插入和更新函数的 CRUD,每个函数末尾都带有 commit,如下所示:

@staticmethod
def insert(db: Session, item: Item) -> None:
    db.add(item)
    db.commit()
   
   
@staticmethod
def update(db: Session, item: Item) -> None:
    ...
    db.commit()

我有一个端点,它接收来自 FastAPI 依赖项的 sqlalchemy 会话,需要以原子方式插入和更新(数据库事务)。

处理事务的最佳做法是什么?我不能使用 CRUD,因为它不止一个 commit

我应该如何处理交易?你在哪里提交你的会话?在 CRUD 中?还是每个请求只在 FastAPI 依赖函数中使用一次?

【问题讨论】:

    标签: python database sqlalchemy transactions fastapi


    【解决方案1】:

    我在使用 FastAPI 时遇到了同样的问题。我找不到在单独的方法中使用commit 并让它们以事务方式运行的方法。 我最终做的是flush 而不是commit,它将更改发送到数据库,但不提交事务。

    需要注意的一点是,在 FastAPI 中,每个请求都会打开一个新会话,并在完成后关闭它。这将是使用 SQLAlchemy 文档中的 example 发生的事情的粗略示例。

    def run_my_program():
        # This happens in the `database = SessionLocal()` of the `get_db` method below
        session = Session()
        try:
            ThingOne().go(session)
            ThingTwo().go(session)
    
            session.commit()
        except:
            session.rollback()
            raise
        finally:
            # This is the same as the `get_db` method below
            session.close()
    

    为请求生成的会话已经是一个事务。当您提交该会话时,实际上正在做的是这个

    当使用 Session 的默认模式 autocommit=False 时,新事务将在提交后立即开始,但请注意,新开始的事务在实际发出第一个 SQL 之前不会使用任何连接资源。

    在阅读后我认为在端点范围内处理 commitrollback 是有意义的。

    我创建了一个虚拟示例来说明这将如何工作。我使用 FastAPI guide 中的所有内容。

    def create_user(db: Session, user: UserCreate):
        """
        Create user record
        """
        fake_hashed_password = user.password + "notreallyhashed"
        db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
        db.add(db_user)
        db.flush() # Changed this to a flush
        return db_user
    

    然后在endpoint中使用crud操作如下

    from typing import List
    from fastapi import Depends, HTTPException
    from sqlalchemy.orm import Session
    
    ...
    
    def get_db():
        """
        Get SQLAlchemy database session
        """
        database = SessionLocal()
        try:
            yield database
        finally:
            database.close()
    
    @router.post("/users", response_model=List[schemas.User])
    def create_users(user_1: schemas.UserCreate, user_2: schemas.UserCreate, db: Session = Depends(get_db)):
        """
        Create two users
        """
        try:
            user_1 = crud.create_user(db=db, user=user_1)
            user_2 = crud.create_user(db=db, user=user_2)
            db.commit()
            return [user_1, user_2]
        except:
            db.rollback()
            raise HTTPException(status_code=400, detail="Duplicated user")
    

    将来我可能会研究将其移至中间件,但我不认为使用 commit 可以获得所需的行为。

    【讨论】:

    • 感谢您的详细回答!我现在将采用您的解决方案@Guillermo Aguirre
    • @JosephAsafGardin 很高兴听到!如果是这种情况,您可以将答案标记为已接受吗?干杯!
    • 感谢分享!我一直在寻找一种更好的方法,以及如何对这些进行测试。
    猜你喜欢
    • 2011-07-12
    • 2022-08-11
    • 2018-08-30
    • 2015-07-01
    • 1970-01-01
    • 2013-03-10
    • 1970-01-01
    • 2016-09-08
    • 1970-01-01
    相关资源
    最近更新 更多