【问题标题】:Does SQLAlchemy have an equivalent of Django's get_or_create?SQLAlchemy 是否与 Django 的 get_or_create 等效?
【发布时间】:2012-08-22 14:20:48
【问题描述】:

如果它已经存在(基于提供的参数),我想从数据库中获取一个对象,如果不存在则创建它。

Django 的get_or_create(或source)就是这样做的。 SQLAlchemy 中是否有等效的快捷方式?

我目前正在这样明确地写出来:

def get_or_create_instrument(session, serial_number):
    instrument = session.query(Instrument).filter_by(serial_number=serial_number).first()
    if instrument:
        return instrument
    else:
        instrument = Instrument(serial_number)
        session.add(instrument)
        return instrument

【问题讨论】:

标签: python django sqlalchemy


【解决方案1】:

这基本上就是这样做的方法,AFAIK 没有现成的捷径。

你当然可以概括它:

def get_or_create(session, model, defaults=None, **kwargs):
    instance = session.query(model).filter_by(**kwargs).one_or_none()
    if instance:
        return instance, False
    else:
        params = {k: v for k, v in kwargs.items() if not isinstance(v, ClauseElement)}
        params.update(defaults or {})
        instance = model(**params)
        try:
            session.add(instance)
            session.commit()
        except Exception:  # The actual exception depends on the specific database so we catch all exceptions. This is similar to the official documentation: https://docs.sqlalchemy.org/en/latest/orm/session_transaction.html
            session.rollback()
            instance = session.query(model).filter_by(**kwargs).one()
            return instance, False
        else:
            return instance, True

2020 年更新(仅限 Python 3.9+)

这是一个更简洁的版本,带有 Python 3.9 的 the new dict union operator (|=)

def get_or_create(session, model, defaults=None, **kwargs):
    instance = session.query(model).filter_by(**kwargs).one_or_none()
    if instance:
        return instance, False
    else:
        kwargs |= defaults or {}
        instance = model(**kwargs)
        try:
            session.add(instance)
            session.commit()
        except Exception:  # The actual exception depends on the specific database so we catch all exceptions. This is similar to the official documentation: https://docs.sqlalchemy.org/en/latest/orm/session_transaction.html
            session.rollback()
            instance = session.query(model).filter_by(**kwargs).one()
            return instance, False
        else:
            return instance, True

注意:

类似于 Django 版本,这将捕获重复的键约束和类似的错误。如果您的 get 或 create 不能保证返回单个结果,它仍然可能导致竞争条件。

要缓解其中的一些问题,您需要在 session.commit() 之后添加另一个 one_or_none() 样式提取。除非您还使用 with_for_update() 或可序列化事务模式,否则这仍然不能 100% 保证不会出现竞争条件。

【讨论】:

  • 我认为在你阅读“session.Query(model.filter_by(**kwargs).first()”的地方,你应该阅读“session.Query(model.filter_by(**kwargs)) .first()".
  • 是否应该在这周围加个锁,以便另一个线程在该线程有机会创建实例之前不会创建实例?
  • @EoghanM:通常你的会话是线程本地的,所以这无关紧要。 SQLAlchemy 会话并不是线程安全的。
  • @WolpH 它可能是另一个试图同时创建相同记录的进程。查看 Django 的 get_or_create 实现。它检查完整性错误,并依赖于正确使用唯一约束。
  • @IvanVirabyan:我以为@EoghanM 正在谈论会话实例。在这种情况下,session.add 块周围应该有一个 try...except IntegrityError: instance = session.Query(...)
【解决方案2】:

按照@WoLpH 的解决方案,这是对我有用的代码(简单版):

def get_or_create(session, model, **kwargs):
    instance = session.query(model).filter_by(**kwargs).first()
    if instance:
        return instance
    else:
        instance = model(**kwargs)
        session.add(instance)
        session.commit()
        return instance

有了这个,我可以获取或创建我模型的任何对象。

假设我的模型对象是:

class Country(Base):
    __tablename__ = 'countries'
    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True)

要获取或创建我的对象,我会写:

myCountry = get_or_create(session, Country, name=countryName)

【讨论】:

  • 对于那些像我一样搜索的人,如果行不存在,这是创建行的正确解决方案。
  • 您不需要将新实例添加到会话中吗?否则,如果您在调用代码中发出 session.commit(),则不会发生任何事情,因为新实例未添加到会话中。
  • 谢谢你。我发现这非常有用,因此我创建了一个要点以供将来使用。 gist.github.com/jangeador/e7221fc3b5ebeeac9a08
  • 鉴于您将会话作为参数传递,最好避免使用commit(或者至少只使用flush)。这将会话控制权留给了此方法的调用者,并且不会冒发出过早提交的风险。此外,使用one_or_none() 而不是first() 可能会更安全一些。
  • 我同意@exhuma - 在函数中提交可能会导致意外的冲突和/或性能问题。
【解决方案3】:

This SQLALchemy recipe 做得很好,很优雅。

要做的第一件事是定义一个函数,该函数被赋予一个 Session 以使用,并将一个字典与 Session() 相关联,该 Session() 跟踪当前的 unique 键。

def _unique(session, cls, hashfunc, queryfunc, constructor, arg, kw):
    cache = getattr(session, '_unique_cache', None)
    if cache is None:
        session._unique_cache = cache = {}

    key = (cls, hashfunc(*arg, **kw))
    if key in cache:
        return cache[key]
    else:
        with session.no_autoflush:
            q = session.query(cls)
            q = queryfunc(q, *arg, **kw)
            obj = q.first()
            if not obj:
                obj = constructor(*arg, **kw)
                session.add(obj)
        cache[key] = obj
        return obj

使用这个函数的一个例子是在一个 mixin 中:

class UniqueMixin(object):
    @classmethod
    def unique_hash(cls, *arg, **kw):
        raise NotImplementedError()

    @classmethod
    def unique_filter(cls, query, *arg, **kw):
        raise NotImplementedError()

    @classmethod
    def as_unique(cls, session, *arg, **kw):
        return _unique(
                    session,
                    cls,
                    cls.unique_hash,
                    cls.unique_filter,
                    cls,
                    arg, kw
            )

最后创建独特的 get_or_create 模型:

from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

engine = create_engine('sqlite://', echo=True)

Session = sessionmaker(bind=engine)

class Widget(UniqueMixin, Base):
    __tablename__ = 'widget'

    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True, nullable=False)

    @classmethod
    def unique_hash(cls, name):
        return name

    @classmethod
    def unique_filter(cls, query, name):
        return query.filter(Widget.name == name)

Base.metadata.create_all(engine)

session = Session()

w1, w2, w3 = Widget.as_unique(session, name='w1'), \
                Widget.as_unique(session, name='w2'), \
                Widget.as_unique(session, name='w3')
w1b = Widget.as_unique(session, name='w1')

assert w1 is w1b
assert w2 is not w3
assert w2 is not w1

session.commit()

该配方更深入地研究了这个想法并提供了不同的方法,但我已经成功地使用了这个。

【讨论】:

  • 如果只有一个 SQLAlchemy Session 对象可以修改数据库,我喜欢这个秘诀。我可能是错的,但是如果其他会话(SQLAlchemy 与否)同时修改数据库,我看不出这如何防止在事务进行时可能由其他会话创建的对象。在这些情况下,我认为依赖 session.add() 之后的刷新和stackoverflow.com/a/21146492/3690333 之类的异常处理的解决方案更可靠。
【解决方案4】:

我一直在解决这个问题,最终得到了一个相当强大的解决方案:

def get_one_or_create(session,
                      model,
                      create_method='',
                      create_method_kwargs=None,
                      **kwargs):
    try:
        return session.query(model).filter_by(**kwargs).one(), False
    except NoResultFound:
        kwargs.update(create_method_kwargs or {})
        created = getattr(model, create_method, model)(**kwargs)
        try:
            session.add(created)
            session.flush()
            return created, True
        except IntegrityError:
            session.rollback()
            return session.query(model).filter_by(**kwargs).one(), False

我刚刚写了一个fairly expansive blog post 的所有细节,但我对我使用它的原因提出了一些想法。

  1. 它解压成一个元组,告诉你对象是否存在。这在您的工作流程中通常很有用。

  2. 该函数使您能够使用 @classmethod 修饰的创建者函数(以及特定于它们的属性)。

  3. 当您有多个进程连接到数据存储时,该解决方案可以防止出现竞争条件。

编辑:我已将session.commit() 更改为session.flush(),如this blog post 中所述。请注意,这些决策特定于所使用的数据存储(本例中为 Postgres)。

编辑 2:我在函数中使用 {} 作为默认值进行了更新,因为这是典型的 Python 陷阱。感谢the comment,奈杰尔!如果您对此问题感到好奇,请查看this StackOverflow questionthis blog post

【讨论】:

  • 与 spencer says 相比,这个解决方案是一个很好的解决方案,因为它可以防止竞争条件(通过提交/刷新会话,当心)并完美地模仿 Django 所做的事情。
  • @kiddouk 不,它没有“完美地”模仿。 Django 的get_or_create 线程安全的。它不是原子的。此外,如果创建了实例,则 Django 的 get_or_create 返回 True 标志,否则返回 False 标志。
  • @Kate 如果您查看 Django 的 get_or_create,它的作用几乎完全相同。此解决方案还返回 True/False 标志以指示对象是否已创建或获取,并且也不是原子的。但是,线程安全和原子更新是数据库的一个问题,而不是 Django、Flask 或 SQLAlchemy,并且在这个解决方案和 Django 中,都是通过数据库上的事务来解决的。
  • 假设为新记录提供了一个非空字段空值,它将引发 IntegrityError。整个事情搞砸了,现在我们不知道实际发生了什么,我们得到另一个错误,即没有找到记录。
  • 不应该 IntegrityError 的情况下返回 False 因为这个客户端没有创建对象?
【解决方案5】:

语义上最接近的可能是:

def get_or_create(model, **kwargs):
    """SqlAlchemy implementation of Django's get_or_create.
    """
    session = Session()
    instance = session.query(model).filter_by(**kwargs).first()
    if instance:
        return instance, False
    else:
        instance = model(**kwargs)
        session.add(instance)
        session.commit()
        return instance, True

不知道在 sqlalchemy 中依赖全局定义的 Session 是多么的洁净,但是 Django 版本不接受连接所以......

返回的元组包含实例和一个布尔值,指示实例是否已创建(即,如果我们从数据库中读取实例,则为 False)。

Django 的get_or_create 经常用于确保全局数据可用,因此我会尽早提交。

【讨论】:

  • 只要 Session 由 scoped_session 创建和跟踪,这应该可以工作,这应该实现线程安全的会话管理(这在 2014 年存在吗?)。
【解决方案6】:

根据您采用的隔离级别,上述解决方案均无效。 我发现的最佳解决方案是以下形式的 RAW SQL:

INSERT INTO table(f1, f2, unique_f3) 
SELECT 'v1', 'v2', 'v3' 
WHERE NOT EXISTS (SELECT 1 FROM table WHERE f3 = 'v3')

无论隔离级别和并行度如何,这都是事务安全的。

注意:为了提高效率,最好为唯一列设置一个索引。

【讨论】:

    【解决方案7】:

    erik优秀answer的修改版

    def get_one_or_create(session,
                          model,
                          create_method='',
                          create_method_kwargs=None,
                          **kwargs):
        try:
            return session.query(model).filter_by(**kwargs).one(), True
        except NoResultFound:
            kwargs.update(create_method_kwargs or {})
            try:
                with session.begin_nested():
                    created = getattr(model, create_method, model)(**kwargs)
                    session.add(created)
                return created, False
            except IntegrityError:
                return session.query(model).filter_by(**kwargs).one(), True
    
    • 使用nested transaction 仅回滚添加的新项目,而不是回滚所有内容(请参阅此answer 以使用 SQLite 的嵌套事务)
    • 移动create_method。如果创建的对象具有关系并且通过这些关系分配了成员,则它会自动添加到会话中。例如。创建一个book,它有user_iduser作为对应关系,然后在create_method里面做book.user=<user object>会添加book到会话中。这意味着create_method 必须在with 内才能从最终回滚中受益。请注意,begin_nested 会自动触发刷新。

    请注意,如果使用 MySQL,事务隔离级别必须设置为 READ COMMITTED 而不是 REPEATABLE READ 才能正常工作。 Django 的 get_or_create(和 here)使用相同的策略,另请参见 Django documentation

    【讨论】:

    • 我喜欢这样可以避免回滚不相关的更改,但是如果会话之前查询过模型,IntegrityError 重新查询可能仍会失败,NoResultFound 使用 MySQL 默认隔离级别 REPEATABLE READ在同一笔交易中。我能想出的最佳解决方案是在此查询之前调用session.commit(),这也不理想,因为用户可能不会期望它。引用的答案不存在这个问题,因为 session.rollback() 具有启动新事务的相同效果。
    • 嗯,直到。将查询放入嵌套事务中会起作用吗?你是对的,这个函数内部的commit 可以说比rollback 更糟糕,即使对于特定的用例它是可以接受的。
    • 是的,将初始查询放在嵌套事务中至少可以使第二个查询工作。如果用户之前在同一个事务中明确查询过模型,它仍然会失败。我已经决定这是可以接受的,并且应该警告用户不要这样做或以其他方式捕获异常并决定是否commit() 自己。如果我对代码的理解是正确的,这就是 Django 所做的。
    • 在 django documentation 中,他们说使用`READ COMMITTED, so it does not look like they try to handle this. Looking at the [source](https://github.com/django/django/blob/master/django/db/models/query.py#L491) confirms this. I'm not sure I understand your reply, you mean the user should put his/her query in a nested transaction? It's not clear to me how a SAVEPOINT` 会影响REPEATABLE READ 的读取。如果没有效果那么情况似乎无法挽救,如果效果那么最后一个查询可以嵌套?
    • READ COMMITED 很有趣,也许我应该重新考虑我不触及数据库默认值的决定。我已经测试过,从查询之前恢复SAVEPOINT 使得该查询好像从未在REPEATABLE READ 中发生过。因此,我发现有必要将查询包含在嵌套事务中的 try 子句中,以便 IntegrityError except 子句中的查询完全可以工作。
    【解决方案8】:

    我稍微简化了@Kevin。避免将整个函数包装在 if/else 语句中的解决方案。这样就只有一个return,我觉得更干净:

    def get_or_create(session, model, **kwargs):
        instance = session.query(model).filter_by(**kwargs).first()
    
        if not instance:
            instance = model(**kwargs)
            session.add(instance)
    
        return instance
    

    【讨论】:

      【解决方案9】:

      有一个 Python 包,其中包含 @erik 的解决方案以及 update_or_create() 的版本。 https://github.com/enricobarzetti/sqlalchemy_get_or_create

      【讨论】:

        【解决方案10】:

        我经常遇到的一个问题是,当一个字段有一个最大长度(例如,STRING(40))并且您想执行一个带有大长度字符串的 get or create 时,上述解决方案将失败。

        在上述解决方案的基础上,这是我的方法:

        from sqlalchemy import Column, String
        
        def get_or_create(self, add=True, flush=True, commit=False, **kwargs):
            """
        
            Get the an entity based on the kwargs or create an entity with those kwargs.
        
            Params:
                add: (default True) should the instance be added to the session?
                flush: (default True) flush the instance to the session?
                commit: (default False) commit the session?
                kwargs: key, value pairs of parameters to lookup/create.
        
            Ex: SocialPlatform.get_or_create(**{'name':'facebook'})
                returns --> existing record or, will create a new record
        
            ---------
        
            NOTE: I like to add this as a classmethod in the base class of my tables, so that
            all data models inherit the base class --> functionality is transmitted across
            all orm defined models.
        
            """
        
        
            # Truncate values if necessary
            for key, value in kwargs.items():
        
                # Only use strings
                if not isinstance(value, str):
                    continue
        
                # Only use if it's a column
                my_col = getattr(self.__table__.columns, key)
        
                if not isinstance(my_col, Column):
                    continue
        
                # Skip non strings again here
                if not isinstance(my_col.type, String):
                    continue
        
                # Get the max length
                max_len = my_col.type.length
        
                if value and max_len and len(value) > max_len:
        
                    # Update the value
                    value = value[:max_len]
                    kwargs[key] = value
        
            # -------------------------------------------------
        
            # Make the query...
            instance = session.query(self).filter_by(**kwargs).first()
        
            if instance:
                return instance
        
            else:
                # Max length isn't accounted for here.
                # The assumption is that auto-truncation will happen on the child-model
                # Or directtly in the db
                instance = self(**kwargs)
        
            # You'll usually want to add to the session
            if add:
                session.add(instance)
        
            # Navigate these with caution
            if add and commit:
                try:
                    session.commit()
                except IntegrityError:
                    session.rollback()
        
            elif add and flush:
                session.flush()
        
        
            return instance
        

        【讨论】:

          猜你喜欢
          • 2012-08-26
          • 2010-12-20
          • 2018-04-17
          • 1970-01-01
          • 2021-06-01
          • 2014-12-19
          相关资源
          最近更新 更多