【问题标题】:SQLAlchemy Core bulk insert slowSQLAlchemy Core 批量插入慢
【发布时间】:2017-01-20 04:45:50
【问题描述】:

我正在尝试使用 SQLAlchemy 截断表并仅插入约 3000 行数据,而且速度非常慢(约 10 分钟)。

我遵循了doc 上的建议,并利用 sqlalchemy 核心进行插入,但它的运行速度仍然非常缓慢。我要看哪些可能的罪魁祸首?数据库是一个 postgres RDS 实例。谢谢!

engine = sa.create_engine(db_string, **kwargs, pool_recycle=3600)
with engine.begin() as conn:
            conn.execute("TRUNCATE my_table")
            conn.execute(
                MyTable.__table__.insert(),
                data #where data is a list of dicts
            )

【问题讨论】:

  • 你有主键吗?这是很多交叉引用的表吗?插入 3k 行的 10 分钟似乎非常多……一行有多大?您的数据库正在远程运行(即不是本地主机……)(这肯定会减慢速度……多少,可能取决于网络延迟) ?
  • 是的,我有一个主键,我通过 sa/alembic 将其设置为大小为 100 的自然键字符串。但是我也尝试使用代理整数键将其切换出来,并且遇到了类似的性能问题。我的行是 6 列字符串。数据库是远程的(在 AWS 中),因此网络可能与它有关
  • 这使用了executemany 功能,对于psycopg2,它是executefor 循环中的美化。见this question

标签: python postgresql orm sqlalchemy


【解决方案1】:

当我看到这没有答案时,我感到很沮丧...前几天我遇到了完全相同的问题:尝试使用 CORE 将大约数百万行批量插入 Postgres RDS 实例。这需要 几个小时

作为一种解决方法,我最终编写了自己的批量插入脚本来生成原始 sql:

bulk_insert_str = []
for entry in entry_list:
    val_str = "('{}', '{}', ...)".format(entry["column1"], entry["column2"], ...)
    bulk_insert_str.append(val_str)

engine.execute(
    """
    INSERT INTO my_table (column1, column2 ...)
    VALUES {}
    """.format(",".join(bulk_insert_str))
)

虽然丑陋,但这给了我我们需要的性能(~500,000 行/分钟)

您是否找到了基于 CORE 的解决方案?如果没有,希望这会有所帮助!

更新:最终将我的旧脚本移动到我们没有使用的备用 EC2 实例中,这实际上解决了性能缓慢的问题。不确定您的设置是什么,但显然从外部(非 AWS)连接与 RDS 通信会产生网络开销。

【讨论】:

  • 哦,嗯。我试图确保我的数据安全并在我的insert 周围使用sqlalchemy.text(),然后参数化这些值(例如engine.execute(sqlalchemy.text(insert_str), **parameters))但似乎sqlalchemy.text() 需要大量时间......也许要小心去风和只是打耳光价值观还有路要走吗? 畏缩(要明确:不是批评;这实际上可能是我最好的解决方案。只是让我有点害怕)
  • 那么实际的问题是网络延迟?还是 sqlalchemy 的插入方式有问题?
  • @Chris 据我所知,这是一个网络延迟问题,再加上一些 SQLAlchemy 行为的额外开销。
【解决方案2】:

前段时间我在公司工作时一直在努力解决这个问题,所以我们创建了一个库,其中包含批量插入和更新的功能。希望我们已经考虑到所有性能和安全问题。这个库是开源的,可以在 PyPI 上使用,它的名字:bulky

让我给你看一些用法的例子:

插入:

import bulky
from your.sqlalchemy.models import Model
from your.sqlalchemy.session import Session

data = [
    {Model.column_float: random()}
    for _ in range(100_000_000)
]

rows_inserted = bulky.insert(
    session=Session,
    table_or_model=Model,
    values_series=data,
    returning=[Model.id, Model.column_float]
)

new_items = {row.id: row.column_float for row in rows_inserted}

更新:

import bulky
from your.sqlalchemy.models import ManyToManyTable
from your.sqlalchemy.session import Session

data = [
    {
        ManyToManyTable.fk1: i,
        ManyToManyTable.fk2: j,
        ManyToManyTable.value: i + j,
    }
    for i in range(100_000_000)
    for j in range(100_000_000)
]

rows_updated = bulky.update(
    session=Session,
    table_or_model=ManyToManyTable,
    values_series=data,
    returning=[
        ManyToManyTable.fk1,
        ManyToManyTable.fk2,
        ManyToManyTable.value,],
    reference=[
        ManyToManyTable.fk1,
        ManyToManyTable.fk2,],
)

updated_items = {(row.fk1, row.fk2): row.value for row in rows_updated}

不确定是否允许链接,所以我将它们置于剧透之下

ReadmePyPI

【讨论】:

  • 这种方法确实将我的速度提高了几个数量级。谢谢你。
猜你喜欢
  • 2020-05-14
  • 2018-01-11
  • 2015-07-03
  • 2011-04-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多