【问题标题】:MongoDB BulkWrite Memory CostMongoDB BulkWrite 内存开销
【发布时间】:2019-12-06 12:00:14
【问题描述】:

我正在使用官方的 mongodb 驱动程序进行 Go。我取一个 CSV 文件并逐行读取,直到达到 1000 行,然后解析数据并将其插入数据库。我假设它需要一个恒定的内存,因为批量写入的数据总是相同的(1000 个联系人)。但是,情况并非如此,因为内存显着增加。以下是有关上述查询的一些数据:

batchSize = 1000

Contacts - Memory consumed by bulkwrite
10k - 14 MB
20K - 30MB
30K - 59MB
40K - 137 MB
50K -241 MB

谁能解释一下原因?

代码如下:

func (c *csvProcessor) processCSV(r io.Reader, headerMap map[string]int, emailDB *mongo.Database) error {
    //some code...
    csvReader := csv.NewReader(r)
    for {
        eofReached, err := c.processCSVBatch(csvReader, emailHash, smsHash, headerMap, emailDB)
        if err != nil {
            return errors.Wrap(err, "process CSV batch")
        }
        if eofReached {
            break
        }
    }
    return nil
}

func (c *csvProcessor) processCSVBatch(csvReader *csv.Reader, emailHash map[string]*userData, smsHash map[string]*userData, headerMap map[string]int, emailDB *mongo.Database) (bool, error) {
    var insertUsers, updateUsers, deleteUsers []interface{}
    var isEOFReached bool
    for i := 0; i < processCSVBatchSize; i++ {
        line, err := csvReader.Read()
        if err != nil {
            if err != io.EOF {
                return false, errors.Wrap(err, "read from input")
            }
            isEOFReached = true
            break
        }
        //some code
        insert, update, delete := c.dataMerger.mergeData(
            c.parseUser(line, headerMap),
            emailHash[stringToMD5(line[headerMap["email"]])],
            smsHashVal,
        )
        if insert != nil {
            insertUsers = append(insertUsers, insert)
        }
        if update != nil {
            updateUsers = append(updateUsers, update)
        }
        if delete != nil {
            deleteUsers = append(deleteUsers, delete)
        }
    }
    //update DB
    err := c.mongoDBUserHandler.saveUsers(emailDB, insertUsers, updateUsers, deleteUsers)
    if err != nil {
        return false, errors.Wrap(err, "save users")
    }
    return isEOFReached, nil
}

func (m *mongoDBUserHandler) saveUsers(emailDB *mongo.Database, insert, update, delete []interface{}) error {
    ctx := context.Background()
    // create the slice of write models
    var writes []mongo.WriteModel
    if len(insert) > 0 {
        writes = append(writes, m.getInsertWrites(insert)...)
    }
    if len(update) > 0 {
        writes = append(writes, m.getUpdateWrites(update)...)
    }
    if len(delete) > 0 {
        writes = append(writes, m.getDeleteWrites(delete)...)
    }
    if len(writes) == 0 {
        return nil
    }
    // run bulk write
    _, err := emailDB.
        Collection(userCollection).
        BulkWrite(ctx, writes, options.BulkWrite().SetOrdered(false))
    if err != nil {
        return errors.Wrap(err, "bulk write")
    }
    return nil
}

【问题讨论】:

  • 你能分享代码吗?可能是运行时动态堆栈增长或 goroutine 堆分配
  • 你配置切片容量了吗?
  • @mh-cbon 每次我们尝试将数据保存到数据库时都会分配一个新切片。可以参考上面的代码。
  • @mh-cbon 再次阅读代码 - 每个 1000 行批次都是新切片,所以除非您能指出切片可能无限增长的其他地方,否则我不知道如何这是相关的。

标签: mongodb go memory-leaks bulkinsert


【解决方案1】:

下面是copy伪装的:

    if len(insert) > 0 {
        writes = append(writes, m.getInsertWrites(insert)...)
    }
    if len(update) > 0 {
        writes = append(writes, m.getUpdateWrites(update)...)
    }
    if len(delete) > 0 {
        writes = append(writes, m.getDeleteWrites(delete)...)
    }

非常容易实现的目标:删除上面的那些行,将 BulkWrite 更改为接受 writesinterface{},这样您就可以重用 相同的后备数组,这应该会节省一些内存。 p>

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-04-25
    • 1970-01-01
    • 2011-11-16
    • 2011-02-12
    • 2020-12-13
    • 2017-08-30
    • 1970-01-01
    • 2021-07-06
    相关资源
    最近更新 更多