【问题标题】:Updating many(100k+) documents in the most efficient way MongoDB以最有效的方式更新许多(100k+)个文档 MongoDB
【发布时间】:2018-07-28 21:14:03
【问题描述】:

我有一个定期运行的函数,它更新我的Prices 集合中的一些Documentsitem.pricePrice Collection 有 100k+ 项。函数如下所示:

 //Just a helper function for multiple GET requests with request.
let _request = (urls, cb) => {
    let results = {}, i = urls.length, c = 0;
    handler = (err, response, body) => {
        let url = response.request.uri.href;
        results[url] = { err, response, body };

        if (++c === urls.length) {
            cb(results);
        }
    };
    while (i--) {
        request(urls[i], handler);
    }
};
// function to update the prices in our Prices collection.

const update = (cb) => {
    Price.remove({}, (err, remove) => {
        if (err) {
            return logger.error(`Error removing items...`);
        }
        logger.info(`Removed all items... Beginning to update.`);
        _request(urls, (responses) => {
            let url, response, gameid;

            for (url in responses) {
                id = url.split('/')[5].split('?')[0];
                response = responses[url];

                if (response.err) {
                    logger.error(`Error in request to ${url}: ${err}`);
                    return;
                }

                if (response.body) {
                    logger.info(`Request to ${url} successful.`)
                    let jsonResult = {};
                    try {
                        jsonResult = JSON.parse(response.body);
                    } catch (e) {
                        logger.error(`Could not parse.`);
                    }

                    logger.info(`Response body for ${id} is ${Object.keys(jsonResult).length}.`);
                    let allItemsArray = Object.keys(jsonResult).map((key, index) => {
                        return {
                            itemid: id,
                            hash_name: key,
                            price: jsonResult[key]
                        }
                    });

                    Price.insertMany(allItemsArray).then(docs => {
                        logger.info(`Saved docs for ${id}`)
                    }, (e) => {
                        logger.error(`Error saving docs.`);
                    });

                }
            }
            if (cb && typeof cb == 'function') {
                cb();
            }
        })
    });

}

如您所见,为了避免遍历 100k+ 文档并分别更新每一个文档,我在开始时将它们全部删除,然后调用为我提供这些项目的 API 和价格,并使用 @987654328 @ 将它们全部插入我的价格集合中。

此更新过程每 30 分钟发生一次。

但我现在才意识到,如果某些用户想要查看价格,而我的 Prices Collection 目前是空的,因为它正在自我更新,该怎么办?

问题

那么我是否必须遍历所有这些才能不删除它? (请记住,有很多文档每 30 分钟更新一次。)或者还有其他解决方案吗?

这是我的Prices Collection 外观的图片(有 100k 此类文档,我只想更新价格属性):

更新

我已经稍微重写了我的update 函数,现在它看起来像这样:

const update = (cb = null) => {
    Price.remove({}, (err, remove) => {
        if (err) {
            return logger.error(`Error removing items...`);
        }
        logger.info(`Removed all items... Beginning to update.`);
        _request(urls, (responses) => {
            let url, response, gameid;

            for (url in responses) {
                gameid = url.split('/')[5].split('?')[0];
                response = responses[url];

                if (response.err) {
                    logger.error(`Error in request to ${url}: ${err}`);
                    return;
                }

                if (response.body) {
                    logger.info(`Request to ${url} successful.`)
                    let jsonResult = {};
                    try {
                        jsonResult = JSON.parse(response.body);
                    } catch (e) {
                        logger.error(`Could not parse.`);
                    }

                    logger.info(`Response body for ${gameid} is ${Object.keys(jsonResult).length}.`);
                    let allItemsArray = Object.keys(jsonResult).map((key, index) => {
                        return {
                            game_id: gameid,
                            market_hash_name: key,
                            price: jsonResult[key]
                        }
                    });
                    let bulk = Price.collection.initializeUnorderedBulkOp();

                    allItemsArray.forEach(item => {
                        bulk.find({market_hash_name: item.market_hash_name})
                            .upsert().updateOne(item);
                    });
                    bulk.execute((err, bulkers) => {
                        if (err) {
                            return logger.error(`Error bulking: ${e}`);
                        }
                        logger.info(`Updated Items for ${gameid}`)
                    });

                    // Price.insertMany(allItemsArray).then(docs => {
                    //     logger.info(`Saved docs for ${gameid}`)
                    // }, (e) => {
                    //     logger.error(`Error saving docs.`);
                    // });

                }
            }
            if (cb && typeof cb == 'function') {
                cb();
            }
        })
    });

}

现在请注意批量变量(感谢@Rahul),但现在,该集合需要很长时间才能更新。我的处理器正在烧毁,更新 60k 多个文档实际上需要 3 分钟以上的时间。老实说,我觉得像以前的方法,虽然它可能会删除所有它们然后重新插入它们,但速度也快了 10 倍。

有人吗?

【问题讨论】:

  • 不是每次使用MongoDB的bulk upsert都删除和添加数据,另一件事是不要在100/1000的chunk中完成_request返回的数据。
  • 感谢您的评论,我已经更新了我的问题。现在想看看吗?

标签: javascript node.js mongodb mongoose promise


【解决方案1】:

无需清除数据库并重新插入。您可以为此使用 bulkWrite() 方法或使用 updateMany() 方法进行更新。

您可以将现有代码重构为

const update = (cb) => {
    _request(urls, responses => {
        let bulkUpdateOps = [], gameid;

        responses.forEach(url => {
            let response = responses[url];
            gameid = url.split('/')[5].split('?')[0];

            if (response.err) {
                logger.error(`Error in request to ${url}: ${response.err}`);
                return;
            }

            if (response.body) {
                logger.info(`Request to ${url} successful.`)
                let jsonResult = {};
                try {
                    jsonResult = JSON.parse(response.body);
                } catch (e) {
                    logger.error(`Could not parse.`);
                }

                Object.keys(jsonResult).forEach(key => {
                    bulkUpdateOps.push({
                        "updateOne": {
                           "filter": { market_hash_name: key },
                           "update": { "$set": {
                                game_id: gameid,
                                price: jsonResult[key]
                           } },
                           "upsert": true
                        }
                    });
                });
            }

            if (bulkUpdateOps.length === 1000) {
                Price.bulkWrite(bulkUpdateOps).then(result => {
                    logger.info(`Updated Items`)
                }).catch(e => logger.error(`Error bulking: ${e}`));
                bulkUpdateOps = [];
            }
        });

        if (bulkUpdateOps.length > 0) {
            Price.bulkWrite(bulkUpdateOps).then(result => {
                logger.info(`Updated Items`)
            }).catch(e => logger.error(`Error bulking: ${e}`));
        }
    });

    if (cb && typeof cb == 'function') {
        cb();
    }
}

【讨论】:

  • 如果价格不存在,这是否会增加价格?
  • @FilipBarakovski 是的,如果您在上面的更新中添加"upsert" : true 选项。
【解决方案2】:

根据我的经验(每小时更新数百万个 mongo 文档),这是一个非常大批量更新的现实方法:

  • 单独执行所有 API 调用并将结果以 bson 格式写入文件
  • 调用mongoimport 并将该bson 文件导入一个新的空集合prices_new。 Javascript,更不用说高级 OO 包装器,太慢了
  • 重命名prices_new -> prices dropTarget=true(这将是原子的,因此不会停机)

在 JS 中大概是这样的

let fname = '/tmp/data.bson';
let apiUrls = [...];

async function doRequest(url) {
    // perform a request and return an array of records
}

let responses  = await Promise.all(apiUrls.map(doRequest));

// if the data too big to fit in memory, use streams instead of this:

let data = flatMap(responses, BSON.serialize).join('\n'));
await fs.writeFile(fname, data);

await child_process.exec(`mongoimport --collection prices_new --drop ${fname}`);

await db.prices_new.renameCollection('prices', true);

【讨论】:

  • 我不明白你是如何从“原子”推导出“无停机时间”的。
  • 感谢您的回答。这似乎是最有效的方法。愿意举一个将结果写入 BSON 文件然后导入的示例吗?
  • 如果 MongoDB 在另一个实例上运行怎么办。那个时候我们无法访问 BSON 文件,我们如何处理同样的情况?
  • @RahulSharma:你可以在网络上运行mongoimport--host xxx 等),或者在你的 bson 被填充后复制到那里。
  • @georg 感谢您的回答:)
【解决方案3】:

我没有测试过任何东西,但你可以试试这个,可能会有所帮助。我正在使用 bluebird 库进行并发。

let _request = (url) => {
    return new Promise((resolve, reject) => {
        request(url, (err, response, body) => {
            if (err) {
                reject(err);
            }
            resolve(body);
        });
    });
};

const formatRespose = async (response) => {
    // do stuff
    return {
        query: {}, //  itemid: id,
        body: {}
    };
}

const bulkUpsert = (allItemsArray) => {
    let bulk = Price.collection.initializeUnorderedBulkOp();
    return new Promise((resolve, reject) => {
        allItemsArray.forEach(item => {
            bulk.find(item.query).upsert().updateOne(item.body);
        });
        bulk.execute((err, bulkers) => {
            if (err) {
                return reject(err);
            }
            return resolve(bulkers);
        });
    });
}


const getAndUpdateData = async (urls) => {
    const allItemsArray = urls.map((url) => {
        const requestData = await _request(url); // you can make this also parallel
        const formattedData = formatRespose(requestData); // return {query: {},body: {} };
        return formattedData;
    });
    return await (bulkUpsert(allItemsArray));
};

function update() {
    // split urls into as per your need 100/1000
    var i, j, chunkUrls = [],
        chunk = 100;
    for (i = 0, j = urls.length; i < j; i += chunk) {
        chunkUrls.push(getAndUpdateData(urls.slice(i, i + chunk)));
    }

    Bluebird.map(chunkUrls, function (chunk) {
        return await chunk;
    }, {
        concurrency: 1 // depends on concurrent request change 1 = 100 request get and insert in db at time
    }).then(function () {
        console.log("done");
    }).catch(function () {
        console.log("error");
    });
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-02-22
    • 2018-03-02
    • 2016-01-18
    • 2020-07-11
    • 1970-01-01
    • 2019-05-22
    • 2017-03-07
    • 2020-11-19
    相关资源
    最近更新 更多