【问题标题】:memory-efficient built-in SqlAlchemy iterator/generator?内存高效的内置 SqlAlchemy 迭代器/生成器?
【发布时间】:2011-11-15 10:16:19
【问题描述】:

我有一个 ~10M 记录 MySQL 表,我使用 SqlAlchemy 与之交互。我发现对这个表的大子集的查询会消耗太多的内存,即使我认为我正在使用一个内置的生成器来智能地获取数据集的小块:

for thing in session.query(Things):
    analyze(thing)

为了避免这种情况,我发现我必须构建自己的迭代器,它会分块进行:

lastThingID = None
while True:
    things = query.filter(Thing.id < lastThingID).limit(querySize).all()
    if not rows or len(rows) == 0: 
        break
    for thing in things:
        lastThingID = row.id
        analyze(thing)

这是正常的还是我缺少关于 SA 内置生成器的东西?

this question 的答案似乎表明内存消耗不是预期的。

【问题讨论】:

  • 我有一些非常相似的东西,除了它产生“东西”。比所有其他解决方案效果更好
  • 不是 Thing.id > lastThingID 吗?什么是“行”?

标签: python mysql sqlalchemy


【解决方案1】:

大多数 DBAPI 实现在获取行时会完全缓冲行 - 所以通常,在 SQLAlchemy ORM 甚至获得一个结果之前,整个结果集都在内存中。

但是,Query 的工作方式是默认情况下完全加载给定的结果集,然后再将对象返回给您。这里的基本原理是关于不仅仅是简单的 SELECT 语句的查询。例如,在连接到可能在一个结果集中多次返回相同对象标识的其他表时(常见于急切加载),完整的行集需要在内存中,以便可以返回正确的结果,否则集合等可能只有部分人口。

所以Query 提供了一个通过yield_per() 更改此行为的选项。此调用将导致Query 批量生成行,您可以在其中给它批量大小。正如文档所述,这仅在您不进行任何急切加载集合的情况下才适用,因此基本上是在您真正知道自己在做什么的情况下。此外,如果底层 DBAPI 预缓冲行,仍然会有内存开销,因此该方法的扩展性仅比不使用它略好。

我几乎从不使用yield_per();相反,我使用您在上面使用窗口函数建议的 LIMIT 方法的更好版本。 LIMIT 和 OFFSET 有一个巨大的问题,即非常大的 OFFSET 值会导致查询变得越来越慢,因为 N 的 OFFSET 会导致它翻阅 N 行 - 这就像执行相同的查询五十次而不是一次,每次读取一个越来越多的行数。使用窗口函数方法,我预先获取一组“窗口”值,这些值引用我要选择的表的块。然后我发出单独的 SELECT 语句,每个语句一次从其中一个窗口中提取。

窗口函数方法是on the wiki,我使用它非常成功。

另请注意:并非所有数据库都支持窗口函数;您需要 Postgresql、Oracle 或 SQL Server。恕我直言,至少使用 Postgresql 绝对值得 - 如果您使用的是关系数据库,您不妨使用最好的。

【讨论】:

  • 您提到查询实例化了所有内容以比较身份。是否可以通过对主键进行排序并仅比较连续结果来避免这种情况?
  • 问题是,如果您生成一个标识为 X 的实例,应用程序会获取它,然后根据该实体做出决策,甚至可能对其进行变异。稍后,也许(实际上通常)甚至在下一行,结果中也会返回相同的身份,也许是为了向其集合中添加更多内容。因此,应用程序收到了处于不完整状态的对象。排序在这里没有帮助,因为最大的问题是急切加载的工作原理 - “加入”和“子查询”加载都有不同的问题。
  • 当您确信您发出的查询与交付部分结果集兼容时,yield_per() 选项始终存在。我花了几天的马拉松式会议试图在所有情况下启用这种行为,总是有一些模糊的,也就是说,直到你的程序使用其中一个,边缘失败。特别是,不能假设依赖于排序。与往常一样,欢迎我提供实际的代码贡献。
  • 这主要是一个问题,即包含每个可能的用例必须变得多复杂,包括当同一组类可能会在结果中遇到多次、断开连接的时间时,以及如何执行上述“检查连接结果的结尾”逻辑,该逻辑必须递归工作。一旦你将行为隐式化,那么你就会被所有未来的错误所困扰。如果我背后有一个开发团队的资源,也许它是可以支持的,但它变得非常复杂,没有太多好处。
  • 由于我使用的是 postgres,看起来可以使用可重复读取只读事务并在该事务中运行所有窗口查询。
【解决方案2】:

我不是数据库专家,但是当使用 SQLAlchemy 作为简单的 Python 抽象层(即,不使用 ORM 查询对象)时,我想出了一个令人满意的解决方案来查询一个 300M 行的表,而不会增加内存使用量...

这是一个虚拟示例:

from sqlalchemy import create_engine, select

conn = create_engine("DB URL...").connect()
q = select([huge_table])

proxy = conn.execution_options(stream_results=True).execute(q)

然后,我使用 SQLAlchemy fetchmany() 方法在无限 while 循环中迭代结果:

while 'batch not empty':  # equivalent of 'while True', but clearer
    batch = proxy.fetchmany(100000)  # 100,000 rows at a time

    if not batch:
        break

    for row in batch:
        # Do your stuff here...

proxy.close()

这种方法让我可以在没有任何危险的内存开销的情况下进行各种数据聚合。

NOTE stream_results 适用于 Postgres 和 pyscopg2 适配器,但我猜它不适用于任何 DBAPI,也不适用于任何数据库驱动程序...

blog post 中有一个有趣的用例启发了我的上述方法。

【讨论】:

  • 如果有人在使用 postgres 或 mysql(使用pymysql),恕我直言,这应该是公认的答案。
  • 救了我的命,我发现我的查询运行速度越来越慢。我已经在 pyodbc(从 sql server 到 postgres)上对上述内容进行了检测,它的运行就像做梦一样。
  • 这对我来说是最好的方法。当我使用 ORM 时,我需要将 SQL 编译为我的方言(Postgres),然后直接从连接(而不是会话)执行,如上所示。我在另一个问题stackoverflow.com/questions/4617291 中找到了编译“如何”。速度提升很大。从 JOINS 更改为 SUBQUERIES 也大大提高了性能。还推荐使用 sqlalchemy_mixins,使用 smart_query 有助于构建最有效的查询。 github.com/absent1706/sqlalchemy-mixins
【解决方案3】:

我一直在研究使用 SQLAlchemy 进行有效的遍历/分页,并希望更新此答案。

我认为您可以使用 slice 调用来适当地限制查询的范围,并且可以有效地重用它。

例子:

window_size = 10  # or whatever limit you like
window_idx = 0
while True:
    start,stop = window_size*window_idx, window_size*(window_idx+1)
    things = query.slice(start, stop).all()
    if things is None:
        break
    for thing in things:
        analyze(thing)
    if len(things) < window_size:
        break
    window_idx += 1

【讨论】:

  • 这看起来非常简单和快速。我不确定.all() 是否必要。我注意到第一次通话后速度提高了很多。
  • @hamx0r 我意识到这是一条旧评论,所以留给后代吧。如果没有.all(), things 变量是一个不支持 len() 的查询
【解决方案4】:

本着乔尔回答的精神,我使用以下内容:

WINDOW_SIZE = 1000
def qgen(query):
    start = 0
    while True:
        stop = start + WINDOW_SIZE
        things = query.slice(start, stop).all()
        if len(things) == 0:
            break
        for thing in things:
            yield thing
        start += WINDOW_SIZE

【讨论】:

  • things = query.slice(start, stop).all() 将在最后返回 [] 并且 while 循环永远不会中断
【解决方案5】:

使用 LIMIT/OFFSET 不好,因为您需要先找到所有 {OFFSET} 列,因此 OFFSET 越大 - 您获得的请求就越长。 对我使用窗口化查询也会在包含大量数据的大型表上产生不好的结果(您等待第一个结果的时间太长,在我的情况下,对于分块的 Web 响应来说这并不好)。

这里给出了最佳方法https://stackoverflow.com/a/27169302/450103。在我的情况下,我仅使用 datetime 字段上的索引并使用 datetime>=previous_datetime 获取下一个查询来解决问题。愚蠢,因为我之前在不同情况下使用过该索引,但认为获取所有数据窗口查询会更好。就我而言,我错了。

【讨论】:

    【解决方案6】:

    AFAIK,第一个变体仍然从表中获取所有元组(使用一个 SQL 查询),但在迭代时为每个实体构建 ORM 表示。因此,它比在迭代之前构建所有实体的列表更有效,但您仍然必须将所有(原始)数据提取到内存中。

    因此,对我来说,在大表上使用 LIMIT 听起来是个好主意。

    【讨论】:

      猜你喜欢
      • 2014-04-28
      • 1970-01-01
      • 2016-01-16
      • 2011-10-30
      • 1970-01-01
      • 2018-05-16
      • 2017-11-11
      • 2014-09-17
      • 2017-12-13
      相关资源
      最近更新 更多