【问题标题】:Bulk insert with SQLAlchemy ORM使用 SQLAlchemy ORM 批量插入
【发布时间】:2011-04-09 05:13:52
【问题描述】:

有没有办法让 SQLAlchemy 进行批量插入而不是插入每个单独的对象。即,

在做:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

而不是:

INSERT INTO `foo` (`bar`) VALUES (1)
INSERT INTO `foo` (`bar`) VALUES (2)
INSERT INTO `foo` (`bar`) VALUES (3)

我刚刚将一些代码转换为使用 sqlalchemy 而不是原始 sql,虽然现在使用起来好多了,但现在似乎更慢了(高达 10 倍),我想知道这是否是原因。

也许我可以更有效地使用会话来改善这种情况。目前我有autoCommit=False 并在我添加了一些东西后做一个session.commit()。尽管如果在其他地方更改数据库,这似乎会导致数据过时,例如即使我执行新查询,我仍然会返回旧结果?

感谢您的帮助!

【问题讨论】:

  • 这可能会有所帮助:stackoverflow.com/questions/270879/…
  • 尼克,我知道这是一篇非常老帖子。是否可以将标题更新为正确,例如“使用 SQLAlchemy ORM 插入多条记录”。像您提供的多记录插入语句与数据库级别的批量加载操作完全不同。批量插入用于 1k+ 数据上传,通常来自大型数据集并由应用程序管理器完成,而不是 REST 操作或应用程序级代码......让我们正确使用我们的命名法。
  • 对于那些在 sqlalchemy Core(不是 ORM)中查找有关批量操作的信息时偶然发现此问题的人,请参阅my answer to another question

标签: python mysql database orm sqlalchemy


【解决方案1】:

Sqlalchemy 支持批量插入

bulk_list = [
    Foo(
        bar=1,
    ),
    Foo(
        bar=2,
    ),
    Foo(
        bar=3,
    ),
]
db.session.bulk_save_objects(bulk_list)
db.session.commit()

【讨论】:

    【解决方案2】:

    SQLAlchemy 在1.0.0 版本中引入了这一点:

    Bulk operations - SQLAlchemy docs

    通过这些操作,您现在可以进行批量插入或更新!

    例如(如果您希望简单表插入的开销最低),您可以使用Session.bulk_insert_mappings()

    loadme = [(1, 'a'),
              (2, 'b'),
              (3, 'c')]
    dicts = [dict(bar=t[0], fly=t[1]) for t in loadme]
    
    s = Session()
    s.bulk_insert_mappings(Foo, dicts)
    s.commit()
    

    或者,如果您愿意,可以跳过 loadme 元组并将字典直接写入 dicts(但我发现将所有冗长的内容从数据中剔除并在循环中加载字典列表更容易) .

    【讨论】:

    • 亲爱的@juanitogan,您能举个例子说明您是如何创建这个 Foo 对象(或者它是一个类)吗?是否存在不必创建整个 Foo 类的情况?而是直接从远程数据库的现有表中下载它?如果是这样,你能详细说明它的用法吗?
    【解决方案3】:

    据我所知,没有办法让 ORM 发出批量插入。我相信根本原因是 SQLAlchemy 需要跟踪每个对象的身份(即新的主键),而批量插入会干扰这一点。例如,假设您的foo 表包含id 列并映射到Foo 类:

    x = Foo(bar=1)
    print x.id
    # None
    session.add(x)
    session.flush()
    # BEGIN
    # INSERT INTO foo (bar) VALUES(1)
    # COMMIT
    print x.id
    # 1
    

    由于 SQLAlchemy 在没有发出另一个查询的情况下获取了 x.id 的值,我们可以推断它直接从 INSERT 语句中获取了值。如果您不需要后续通过 same 实例访问创建的对象,则可以跳过 ORM 层进行插入:

    Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}])
    # INSERT INTO foo (bar) VALUES ((1,), (2,), (3,))
    

    SQLAlchemy 无法将这些新行与任何现有对象匹配,因此您必须重新查询它们以进行任何后续操作。

    就陈旧数据而言,记住会话没有内置方法可以知道何时在会话之外更改数据库是有帮助的。为了通过现有实例访问外部修改的数据,必须将实例标记为过期。这在session.commit() 上默认发生,但可以通过调用session.expire_all()session.expire(instance) 手动完成。一个例子(SQL省略):

    x = Foo(bar=1)
    session.add(x)
    session.commit()
    print x.bar
    # 1
    foo.update().execute(bar=42)
    print x.bar
    # 1
    session.expire(x)
    print x.bar
    # 42
    

    session.commit() 过期 x,因此第一个 print 语句隐式打开一个新事务并重新查询 x 的属性。如果您注释掉第一个打印语句,您会注意到第二个现在获取了正确的值,因为直到更新之后才会发出新查询。

    从事务隔离的角度来看,这是有道理的——您应该只在事务之间进行外部修改。如果这给您带来了麻烦,我建议您澄清或重新考虑您的应用程序的事务边界,而不是立即联系session.expire_all()

    【讨论】:

    • 感谢您的回复,我会试一试。 WRT 即将到期的问题,我看到的并不完全相同。我在 turbogears 中使用范围会话。执行 getSession().query(Foo).filter....all() 根据请求返回不同的东西,也没有返回数据库中的更新记录,直到我重新启动它。我通过执行 autocommit=True 并在请求完成后添加 .remove()d 会话来解决此问题(我认为无论如何你都应该这样做)。
    • 我猜它根据请求返回不同的东西,因为它在池中的每个线程都有一个范围会话,并且会话处于不同的状态?不过,在新请求后 sa 不会获得新数据似乎有点奇怪。我想我误解了 autocommit=False 在做什么
    • 对于autocommit=False,我相信您应该在请求完成时调用session.commit()(我不熟悉TurboGears,所以如果在框架级别为您处理,请忽略这个)。除了确保您对数据库所做的更改之外,这还会使会话中的所有内容过期。直到下次使用该会话时才会开始下一个事务,因此同一线程上的未来请求不会看到陈旧的数据。
    • 另类风格:session.execute(Foo.__table__.insert(), values)
    • 请注意,较新版本的 sqlalchemy 具有批量插入功能:docs.sqlalchemy.org/en/latest/orm/…
    【解决方案4】:

    sqlalchemy 文档有一个writeup,关于可用于批量插入的各种技术的性能:

    ORM 基本上不适用于高性能批量插入 - 这就是 SQLAlchemy 除了提供 Core 之外的全部原因 ORM 作为一流的组件。

    对于快速批量插入的用例,SQL 生成和 ORM 在其上构建的执行系统是核心的一部分。 直接使用这个系统,我们可以产生一个 INSERT 是 与直接使用原始数据库 API 相比具有竞争力。

    或者,SQLAlchemy ORM 提供批量操作套件 方法,提供工作单元子部分的挂钩 过程以发出核心级别的 INSERT 和 UPDATE 构造 少量基于 ORM 的自动化。

    下面的示例说明了几种不同的基于时间的测试 插入行的方法,从最自动化到最不自动化。 使用 cPython 2.7,观察到的运行时:

    classics-MacBook-Pro:sqlalchemy classic$ python test.py
    SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs
    SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs
    SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs
    SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs
    sqlite3: Total time for 100000 records 0.487842082977 sec
    

    脚本:

    import time
    import sqlite3
    
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String,  create_engine
    from sqlalchemy.orm import scoped_session, sessionmaker
    
    Base = declarative_base()
    DBSession = scoped_session(sessionmaker())
    engine = None
    
    
    class Customer(Base):
        __tablename__ = "customer"
        id = Column(Integer, primary_key=True)
        name = Column(String(255))
    
    
    def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'):
        global engine
        engine = create_engine(dbname, echo=False)
        DBSession.remove()
        DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
        Base.metadata.drop_all(engine)
        Base.metadata.create_all(engine)
    
    
    def test_sqlalchemy_orm(n=100000):
        init_sqlalchemy()
        t0 = time.time()
        for i in xrange(n):
            customer = Customer()
            customer.name = 'NAME ' + str(i)
            DBSession.add(customer)
            if i % 1000 == 0:
                DBSession.flush()
        DBSession.commit()
        print(
            "SQLAlchemy ORM: Total time for " + str(n) +
            " records " + str(time.time() - t0) + " secs")
    
    
    def test_sqlalchemy_orm_pk_given(n=100000):
        init_sqlalchemy()
        t0 = time.time()
        for i in xrange(n):
            customer = Customer(id=i+1, name="NAME " + str(i))
            DBSession.add(customer)
            if i % 1000 == 0:
                DBSession.flush()
        DBSession.commit()
        print(
            "SQLAlchemy ORM pk given: Total time for " + str(n) +
            " records " + str(time.time() - t0) + " secs")
    
    
    def test_sqlalchemy_orm_bulk_insert(n=100000):
        init_sqlalchemy()
        t0 = time.time()
        n1 = n
        while n1 > 0:
            n1 = n1 - 10000
            DBSession.bulk_insert_mappings(
                Customer,
                [
                    dict(name="NAME " + str(i))
                    for i in xrange(min(10000, n1))
                ]
            )
        DBSession.commit()
        print(
            "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) +
            " records " + str(time.time() - t0) + " secs")
    
    
    def test_sqlalchemy_core(n=100000):
        init_sqlalchemy()
        t0 = time.time()
        engine.execute(
            Customer.__table__.insert(),
            [{"name": 'NAME ' + str(i)} for i in xrange(n)]
        )
        print(
            "SQLAlchemy Core: Total time for " + str(n) +
            " records " + str(time.time() - t0) + " secs")
    
    
    def init_sqlite3(dbname):
        conn = sqlite3.connect(dbname)
        c = conn.cursor()
        c.execute("DROP TABLE IF EXISTS customer")
        c.execute(
            "CREATE TABLE customer (id INTEGER NOT NULL, "
            "name VARCHAR(255), PRIMARY KEY(id))")
        conn.commit()
        return conn
    
    
    def test_sqlite3(n=100000, dbname='sqlite3.db'):
        conn = init_sqlite3(dbname)
        c = conn.cursor()
        t0 = time.time()
        for i in xrange(n):
            row = ('NAME ' + str(i),)
            c.execute("INSERT INTO customer (name) VALUES (?)", row)
        conn.commit()
        print(
            "sqlite3: Total time for " + str(n) +
            " records " + str(time.time() - t0) + " sec")
    
    if __name__ == '__main__':
        test_sqlalchemy_orm(100000)
        test_sqlalchemy_orm_pk_given(100000)
        test_sqlalchemy_orm_bulk_insert(100000)
        test_sqlalchemy_core(100000)
        test_sqlite3(100000)
    

    【讨论】:

    • 谢谢。真的很有帮助,也很彻底。
    • 我看到了另一个使用 bindparams 的例子。语法看起来很简洁,这样好吗?
    【解决方案5】:

    SQLAlchemy 在1.0.0 版本中引入了这一点:

    Bulk operations - SQLAlchemy docs

    通过这些操作,您现在可以进行批量插入或更新!

    例如,你可以这样做:

    s = Session()
    objects = [
        User(name="u1"),
        User(name="u2"),
        User(name="u3")
    ]
    s.bulk_save_objects(objects)
    s.commit()
    

    在这里,将进行批量插入。

    【讨论】:

    • 您还需要 s.commit() 来实际保存记录(我花了一点时间才弄清楚这一点)。
    • 我用 sqlachemy 1.0.11 尝试过这个,它仍然生成 3 个插入语句。但它比正常的 orm 操作要快很多。
    • 虽然与 OP 问题无关,但值得一提的是,这确实破坏了 ORM 的某些特性。 docs.sqlalchemy.org/en/rel_1_0/orm/…
    • @dangel 是的,谢谢你发布这个。尽管 OP 的标题涉及“批量加载”,但他关于多记录插入语句的问题与 sqlalchemy 的批量加载功能无关。
    • 与使用 psql 从 CSV 插入相同的数据与\copy 相比(从同一客户端到同一服务器),我发现 服务器端的性能存在巨大差异 导致每秒插入次数增加约 10 倍。显然,使用\copy(或服务器上的COPY)进行批量加载,在从客户端到服务器的通信中使用打包比通过SQLAlchemy使用SQL要好很多。更多信息:Large bulk insert performance difference PostgreSQL vs ....
    【解决方案6】:

    条条大路通罗马,但其中一些穿越山脉,需要渡轮,但如果您想快速到达那里,就走高速公路。


    在这种情况下,高速公路将使用psycopg2execute_batch() 功能。文档说得最好:

    executemany() 的当前实现(使用极其仁慈的轻描淡写)表现不佳。这些函数可用于加速针对一组参数的语句的重复执行。通过减少服务器往返次数,性能可以比使用executemany() 好几个数量级。

    在我自己的测试中,execute_batch() 的速度大约是 executemany() 的两倍,并且提供了配置 page_size 以进行进一步调整的选项(如果您想压缩最后的 2-3%驱动程序的性能)。

    如果您使用 SQLAlchemy,则可以通过在使用 create_engine() 实例化引擎时将 use_batch_mode=True 设置为参数来轻松启用相同的功能

    【讨论】:

    • 注意:在进行批量插入时,psycopg2 的 execute_values 比 psycopg2 的 execute_batch
    【解决方案7】:

    到目前为止我找到的最佳答案是在 sqlalchemy 文档中:

    http://docs.sqlalchemy.org/en/latest/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow

    有一个完整的可能解决方案基准示例。

    如文档所示:

    bulk_save_objects 不是最好的解决方案,但它的性能是正确的。

    就可读性而言,我认为第二好的实现是使用 SQLAlchemy Core:

    def test_sqlalchemy_core(n=100000):
        init_sqlalchemy()
        t0 = time.time()
        engine.execute(
            Customer.__table__.insert(),
                [{"name": 'NAME ' + str(i)} for i in xrange(n)]
        )
    

    此函数的上下文在文档文章中给出。

    【讨论】:

      【解决方案8】:

      我通常使用add_all

      from app import session
      from models import User
      
      objects = [User(name="u1"), User(name="u2"), User(name="u3")]
      session.add_all(objects)
      session.commit()
      

      【讨论】:

      • 你确定这有效吗?它不只是相当于.add他们一次参加会议吗?
      • 鉴于方法名称,这将是反直觉的,文档没有详细说明:Add the given collection of instances to this Session.您有任何理由相信它不会进行批量插入吗?
      • 我不认为这太违反直觉——事实上它确实添加 所有你要求它做的事情。将所有内容添加到会话中似乎并没有暗示会发出什么底层 SQL 语句。查看源代码:github.com/zzzeek/sqlalchemy/blob/… 实际上似乎只是 .add 每个项目单独。
      • 它工作得很好,与bulk_save_objects()相比,使用flush(),我们可以获得对象的ID,但bulk_save_objects()不能(调用flush()的事件)。
      【解决方案9】:

      Piere 的回答是正确的,但一个问题是 bulk_save_objects 默认情况下不会返回对象的主键,如果您对此感到担心的话。将return_defaults 设置为True 以获取此行为。

      文档是here

      foos = [Foo(bar='a',), Foo(bar='b'), Foo(bar='c')]
      session.bulk_save_objects(foos, return_defaults=True)
      for foo in foos:
          assert foo.id is not None
      session.commit()
      

      【讨论】:

      • 谨慎使用该标志。它将按顺序一次插入一个对象,并且可能不会显着提高性能 [1]。就我而言,我怀疑由于开销而导致性能下降。 [1]:docs.sqlalchemy.org/en/13/orm/…
      【解决方案10】:

      这是一种方式:

      values = [1, 2, 3]
      Foo.__table__.insert().execute([{'bar': x} for x in values])
      

      这将像这样插入:

      INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)
      

      参考:SQLAlchemy FAQ 包括各种提交方法的基准。

      【讨论】:

        【解决方案11】:

        从 0.8 版起,SQLAlchemy 添加了直接支持

        根据docsconnection.execute(table.insert().values(data)) 应该可以解决问题。 (请注意,这connection.execute(table.insert(), data) 相同,后者通过调用executemany 导致许多单独的行插入)。除了本地连接之外,在任何情况下,性能差异都可能是巨大的。

        【讨论】:

          猜你喜欢
          • 2019-05-16
          • 1970-01-01
          • 2019-10-24
          • 2018-09-18
          • 1970-01-01
          • 2017-01-20
          • 2011-05-16
          • 2015-11-06
          • 2016-11-29
          相关资源
          最近更新 更多