【问题标题】:Handling large amounts of data retrieval on NodeJS server在 NodeJS 服务器上处理大量数据检索
【发布时间】:2018-02-21 09:47:18
【问题描述】:

关于如何降低从我的 NodeJS 后端服务器上的数据库中获取和处理大量物联网时间序列数据的延迟的一般问题。即使我将超时设置为 15 秒,有时我也会超时。

我当前的设置使用 Google Datastore 作为流数据的数据库,NodeJS 后端服务器用于在将数据传递到前端之前与 Datastore 交互。我还使用本地托管在后端服务器上的 MongoDB 作为缓存层。

1个请求的数据检索工作流程大致如下:从MySQL db中查询设备mac地址并用于查询请求时间范围的缓存,然后从时间间隙(未被缓存覆盖)中检索数据数据存储,并聚合到请求的时间分辨率中,如果需要,还会对某些类型的数据进行额外的解析。

我能想到的一些提高性能的策略包括。欢迎对以下策略提出任何其他建议/cmets。

  1. 异步 - 使用异步从缓存和数据存储中获取数据(已实现)

  2. Streams - 使用 fs 在流中返回数据以提高内存利用率

  3. 压缩 - 使用压缩等库来减小数据大小 发送到前端

  4. 集群 - 使用集群来利用多核处理器(更多用于处理多个请求,而与减少每个请求的延迟无关)

  5. 增加服务器 CPU/RAM - 提供更强大的服务器(我如何知道要提供多少内存/处理器速度/内核数量?)

【问题讨论】:

  • App Engine 服务位于哪个位置以及您从哪里进行测试?此外,这可能取决于您所在地区的网络。
  • 目前我还没有将网络服务器部署到 GCP。我正在寻找使数据检索性能更高的一般策略,并不是说它总是在我的请求上超时,而是想看看我是否总体上走在正确的轨道上,或者我是否可以做其他事情来改进性能。
  • 我运行了与您提供的相同的示例,在我的机器上,在 localhost 上,对于填充 1 的 3MB 生成数组,这平均需要 1.10 秒。
  • 标准 node.js 流会让你头疼 - 我建议使用像 scramjet 这样的流框架 - 它会为你整理简单的数据流转换和多线程。跨度>

标签: node.js server google-cloud-datastore gzip


【解决方案1】:

您希望专注于减少延迟,但在我看来,您使用的系统过于复杂,涉及的技术未得到有效利用。你说你的工作流程是这样的:

MySQL -> 缓存(使用 MongoDB)/数据存储 -> NodeJS -> 前端

首先,请查看您使用的是三种不同的数据库解决方案。我知道它们适用于不同的任务,但这似乎并不是最有效的方法。我会说Datastore+MongoDB的组合看起来也不是很自然。你为什么不使用以下之一?:

  1. Datastore + Redis 的组合(用于缓存功能)。这里有关于如何Cache Application Data Using Redis 的官方文档。在这里你有一个更详细的教程关于如何Connect to Redis from Node.js on Google App Engine Flexible,做所有的互连,包括部署Redis on Google Cloud using Deploy Launcher
  2. 直接使用Mongodb,这里有Connect to MongoDB from Node.js on Google App Engine Flexible environment的教程。

无论您选择第一个还是第二个选项:您不能放弃使用 MySQL 吗?

如果您使用Google App Engine 使用这些解决方案中的任何一个,您将更容易回答与集群和增加服务器 CPU/RAM 相关的问题。您可以尝试使用 different configurations of your app.yaml 来检查最适合您的内核和 RAM 数量。

我没有想到任何与异步和压缩策略相关的事情。根据 Streams,您可以使用 @Michał Czapracki 关于使用 scramjet 的建议。

【讨论】:

    【解决方案2】:

    在我的应用程序中,我需要提供一个选项来加入多个 mongodb 集合(所有数据)并导出到一个 CSV 以及导出的百分比进度

    exports.getExportData = (req, res) => {
        req.setTimeout(2400000);
        console.log('------------------------------------------ getExportData ----------------------------------------------')
        let url = urls.baseUrl + urls.endpoints.getData;
        url = postParamsToGet(url, req.query);
        let postContents = {
            headers: {
                'content-type': 'application/json'
            },
            url: url,
            form: req.body,
            timeout: 2400000
        }, 
        receivedData = '', totalData = 0, column = false, lastEntry = 0, 
        successResponse = {
            status: {
                success: true,
                code: 200,
                message: 'processing...'
            },
            data: {
                url: [],
                processed: 0
            }
        }, 
        errorResponse = {
            status: {
                success: false,
                code: 400,
                message: 'something went wrong'
            },
            data: {
                url: ''
            }
        }, 
        exitChunk = false, filesRows = 0, fileCount = 0, pending = true;
    
        console.log(JSON.stringify(postContents))
        try {
            req.on("close", function() {
                console.log("close the connection!");
                errorResponse.status.message = 'connection broke'
                res.end(JSON.stringify(errorResponse));
                exitChunk = true
            });
    
            let fileName = `${md5((new Date()).getTime())}.csv`
            let email = 'abc@example.com'
            if (req.query.emailId) {
                fileName = md5((new Date()).getTime() + req.query.emailId) + '.csv';
                email = req.query.emailId
            }
            let tempFile = path.resolve('./modules/utils/temp/' + fileName);
            if (!fs.existsSync(path.resolve('./modules/utils/temp')))
                fs.mkdirSync(path.resolve('./modules/utils/temp'))
    
            if (!fs.existsSync(path.resolve(tempFile))) {
                fs.appendFileSync(tempFile, '');
            }
            const uploadChunkFile = (tf, fn) => {
                console.log('uploading to S3...', totalData, tf)
                return new Promise(async (resolve, reject) => {
                    try {
                        if (!fs.existsSync(path.resolve(tf)))
                            return reject('unable to create file')
                        await exportCsv.uploadToS3(tf, fn, async function(err, status, url) {
                            if (err) {
                                console.log(err)
                                return resolve(fileName)
                            }
                            if (!status) {
                                console.log(status)
                                return resolve(fileName)
                            }
                            return resolve(url)
                        });
                    }
                    catch(err) {
                        console.log(err.message)
                        return reject(err.message)
                    }
                })
            }
    
            // chunk process start from here
            request.post(postContents)
            .on('response', async function(response) {
                if (response.statusCode != 200) {
                    errorResponse.status.message = response.statusMessage
                    errorResponse.status.code = response.statusCode
                    res.statusMessage = response.statusMessage
                    console.log(errorResponse)
                    return res.status(response.statusCode).send(errorResponse)
                }
            })
            .on('data', async function(d) {
                if (exitChunk) {
                    res.removeAllListeners('data');
                    return res.destroy();
                }
    
                let receive = d.toString()
                if (receive.indexOf("END_OF_STRING") > -1) {
                    let receiveString = receivedData, pendingString = ''
                    receivedData = ''
                    receiveString += receive
                    receiveString = pendingString + receiveString
                    let splitedStringChunk = receiveString.split("END_OF_STRING")
                    if (splitedStringChunk.length > 1) {
                        for (let str = 1; str < splitedStringChunk.length; str++) {
                            pendingString += splitedStringChunk[str]
                        }
                    }
                    let substrReceiveString = splitedStringChunk[0].split("START_OF_STRING")[1]
                    let result = JSON.parse(substrReceiveString)
                    let jsonData = []
                    totalData = result.total
                    lastEntry = result.o + result.l
                    filesRows += result.l
                    if (!column) {
                        column = Object.keys(result.data[0]);
                        jsonData.push(`"${column.join('","')}"\n`)
                    }
                    result.data.forEach(d => {
                        let joinRows = [];
                        column.forEach(c => {
                            if (d[c] !== undefined) {
                                joinRows.push(d[c].toString())
                            } else {
                                joinRows.push('-');
                            }
                        });
                        joinRows = joinRows.join('","')
                        jsonData.push(`"${joinRows.substring(0, joinRows.length)}"\n`)
                    });
    
                    await fs.appendFileSync(tempFile, jsonData.join(''));
    
                    pending = true
                    if (filesRows >= 1000000) {
                        filesRows = 0
                        try {
                            let uploadedUrl = await uploadChunkFile(tempFile, fileName)
                            successResponse.data.url.push(uploadedUrl)
                        }
                        catch(err) {
                            console.log(err.message)
                            console.log('chunk file upload error')
                        }
                        fileName = `${fileName.split('.')[0]}_${++fileCount}.csv`
                        tempFile = path.resolve('./modules/utils/temp/' + fileName);
                        if (!fs.existsSync(path.resolve(tempFile))) {
                            fs.appendFileSync(tempFile, '');
                        }
                        pending = false
                    }
    
                    console.log(`${result.processed}% completed${'.'.repeat(Math.round(result.processed/2))}, ${lastEntry}, ${result.o}, ${result.l}-- ${totalData}`)
                    successResponse.data.processed = (parseInt(result.processed) == 100) ? '99': result.processed.toString()
                    await res.write(JSON.stringify(successResponse))
                    await res.flush()
    
                    if (totalData > 0 && totalData == lastEntry) {
                        console.log('entered ....')
                        if (pending) {
                            try {
                                let uploadedUrl = await uploadChunkFile(tempFile, fileName)
                                successResponse.data.url.push(uploadedUrl)
                            }
                            catch(err) {
                                console.log(err.message)
                                console.log('chunk file upload error last time')
                            }
                        }
                        console.log('upload completed')
                        successResponse.data.processed = '100'
                        successResponse.status.message = 'url created successfully'
                        await res.write(JSON.stringify(successResponse))
                        await res.flush()
                        return await res.end()
                    }
                }
                else {
                    receivedData += d.toString()
                }
            })
            .on('end', async function() {
                if (totalData > 0 && totalData == lastEntry) {
                   // response already sent
                }
                else {
                    successResponse.data.processed = '-1'
                    successResponse.status.message = 'no data found'
                    await res.write(JSON.stringify(successResponse))
                    await res.flush()
                    return await res.end()
                }
            })
            .on('error', async function(err) {
                console.log('on error', err.Error)
                errorResponse.status.message = err
                return res.status(errorResponse.status.code).send(errorResponse)
            })
        }
        catch(err) {
            console.log('on try catch', err.message)
            errorResponse.status.message = err.message
            return res.status(errorResponse.status.code).send(errorResponse)
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-11-29
      • 1970-01-01
      • 2016-09-28
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多