【问题标题】:PyMongo cursor batch_sizePyMongo 游标 batch_size
【发布时间】:2019-07-15 21:17:37
【问题描述】:

在 PyMongo 3.7.2 中,我正在尝试通过在 MongoDB 游标上使用 batch_size 以块的形式读取集合,如 here 所述。基本思想是在集合对象上使用 find() 方法,以 batch_size 作为参数。但无论我尝试什么,光标总是返回我集合中的所有文档。

我的代码的基本 sn-p 如下所示(该集合有超过 10K 的文档):

import pymongo as pm

client = pm.MongoClient()
coll = client.get_database('db').get_collection('coll')

cur = coll.find({}, batch_size=500)

但是,游标总是立即返回完整的集合大小。我按照文档中的描述使用它。

有人知道如何正确地批量迭代集合吗?有一些方法可以遍历 find() 方法的输出,但这仍然会首先获取完整的集合,并且只会遍历内存中已经提取的文档。 batch_size 参数应该是每次获取一个batch并往返于服务器,以节省内存空间。

【问题讨论】:

    标签: python mongodb pymongo


    【解决方案1】:

    Pymongo 有一些Cursor 类的生活质量助手,所以它会自动为你做批处理,并将结果以文档的形式返回给你。

    设置了batch_size 设置,但想法是您只需要在find() 方法中设置它,而不必进行手动低级调用或迭代批次。

    例如,如果我的收藏中有 100 个文档:

    > db.test.count()
    100
    

    然后我将profiling level 设置为记录所有查询:

    > db.setProfilingLevel(0,-1)
    {
      "was": 0,
      "slowms": 100,
      "sampleRate": 1,
      "ok": 1,
    ...
    

    然后我使用 pymongo 指定batch_size of 10:

    import pymongo
    import bson
    
    conn = pymongo.MongoClient()
    cur = conn.test.test.find({}, {'txt':0}, batch_size=10)
    print(list(cur))
    

    运行该查询,我在 MongoDB 日志中看到:

    2019-02-22T15:03:54.522+1100 I COMMAND  [conn702] command test.test command: find { find: "test", filter: {} ....
    2019-02-22T15:03:54.523+1100 I COMMAND  [conn702] command test.test command: getMore { getMore: 266777378048, collection: "test", batchSize: 10, .... 
    (getMore repeated 9 more times)
    

    所以查询是以指定的批次从服务器获取的。它只是通过 Cursor 类对您隐藏。

    编辑

    如果你真的需要批量获取文档,Collection下有个函数find_raw_batches()(doc link)。此方法的工作原理与find() 类似,并接受相同的参数。但是请注意,它将返回原始 BSON,应用程序需要在单独的步骤中对其进行解码。值得注意的是,此方法不支持sessions

    话虽如此,如果目标是降低应用程序的内存使用量,那么值得考虑修改查询以使其使用范围。例如:

    find({'$gte': <some criteria>, '$lte': <some other criteria>})
    

    范围查询更容易优化,可以使用索引,并且(在我看来)更容易调试并且在查询中断时更容易重新启动。这在使用批处理时不太灵活,您必须从头开始重新启动查询,并在它被中断时再次检查所有批处理。

    【讨论】:

    • 谢谢,很高兴知道。但这意味着,即使分批获取文档,它也会运行所有批次,然后返回输出,然后我有一个包含所有文档的对象?我试图从 mongo 获取一批,在代码中处理该批,例如将其写在一个文件中,只有在完成之后,才能从 mongo 检索下一批。这样每次迭代在内存中只有一个批量大小。
    • 我更新了答案。希望它能解答您的疑虑。
    【解决方案2】:

    我就是这样做的,它有助于将数据分块,但我认为会有更直接的方法来做到这一点。我创建了一个 yield_rows 函数,它可以让你生成块并生成块,它确保删除使用的块。

    import pymongo as pm
    
    CHUNK_SIZE = 500
    client = pm.MongoClient()
    coll = client.get_database('db').get_collection('coll')
    cursor = coll.find({}, batch_size=CHUNK_SIZE)
    
    def yield_rows(cursor, chunk_size):
        """
        Generator to yield chunks from cursor
        :param cursor:
        :param chunk_size:
        :return:
        """
        chunk = []
        for i, row in enumerate(cursor):
            if i % chunk_size == 0 and i > 0:
                yield chunk
                del chunk[:]
            chunk.append(row)
        yield chunk
    
    chunks = yield_rows(cursor, CHUNK_SIZE)
    for chunk in chunks:
        # do processing here
        pass
    

    如果我找到一种更清洁、更有效的方法,我会更新我的答案。

    【讨论】:

      猜你喜欢
      • 2021-11-12
      • 2016-02-08
      • 2021-07-17
      • 1970-01-01
      • 2012-10-02
      • 2021-12-15
      • 1970-01-01
      • 2011-06-23
      • 2014-10-01
      相关资源
      最近更新 更多