在我的应用程序中,我需要提供一个选项来加入多个 mongodb 集合(所有数据)并导出到一个 CSV 以及导出的百分比进度
exports.getExportData = (req, res) => {
req.setTimeout(2400000);
console.log('------------------------------------------ getExportData ----------------------------------------------')
let url = urls.baseUrl + urls.endpoints.getData;
url = postParamsToGet(url, req.query);
let postContents = {
headers: {
'content-type': 'application/json'
},
url: url,
form: req.body,
timeout: 2400000
},
receivedData = '', totalData = 0, column = false, lastEntry = 0,
successResponse = {
status: {
success: true,
code: 200,
message: 'processing...'
},
data: {
url: [],
processed: 0
}
},
errorResponse = {
status: {
success: false,
code: 400,
message: 'something went wrong'
},
data: {
url: ''
}
},
exitChunk = false, filesRows = 0, fileCount = 0, pending = true;
console.log(JSON.stringify(postContents))
try {
req.on("close", function() {
console.log("close the connection!");
errorResponse.status.message = 'connection broke'
res.end(JSON.stringify(errorResponse));
exitChunk = true
});
let fileName = `${md5((new Date()).getTime())}.csv`
let email = 'abc@example.com'
if (req.query.emailId) {
fileName = md5((new Date()).getTime() + req.query.emailId) + '.csv';
email = req.query.emailId
}
let tempFile = path.resolve('./modules/utils/temp/' + fileName);
if (!fs.existsSync(path.resolve('./modules/utils/temp')))
fs.mkdirSync(path.resolve('./modules/utils/temp'))
if (!fs.existsSync(path.resolve(tempFile))) {
fs.appendFileSync(tempFile, '');
}
const uploadChunkFile = (tf, fn) => {
console.log('uploading to S3...', totalData, tf)
return new Promise(async (resolve, reject) => {
try {
if (!fs.existsSync(path.resolve(tf)))
return reject('unable to create file')
await exportCsv.uploadToS3(tf, fn, async function(err, status, url) {
if (err) {
console.log(err)
return resolve(fileName)
}
if (!status) {
console.log(status)
return resolve(fileName)
}
return resolve(url)
});
}
catch(err) {
console.log(err.message)
return reject(err.message)
}
})
}
// chunk process start from here
request.post(postContents)
.on('response', async function(response) {
if (response.statusCode != 200) {
errorResponse.status.message = response.statusMessage
errorResponse.status.code = response.statusCode
res.statusMessage = response.statusMessage
console.log(errorResponse)
return res.status(response.statusCode).send(errorResponse)
}
})
.on('data', async function(d) {
if (exitChunk) {
res.removeAllListeners('data');
return res.destroy();
}
let receive = d.toString()
if (receive.indexOf("END_OF_STRING") > -1) {
let receiveString = receivedData, pendingString = ''
receivedData = ''
receiveString += receive
receiveString = pendingString + receiveString
let splitedStringChunk = receiveString.split("END_OF_STRING")
if (splitedStringChunk.length > 1) {
for (let str = 1; str < splitedStringChunk.length; str++) {
pendingString += splitedStringChunk[str]
}
}
let substrReceiveString = splitedStringChunk[0].split("START_OF_STRING")[1]
let result = JSON.parse(substrReceiveString)
let jsonData = []
totalData = result.total
lastEntry = result.o + result.l
filesRows += result.l
if (!column) {
column = Object.keys(result.data[0]);
jsonData.push(`"${column.join('","')}"\n`)
}
result.data.forEach(d => {
let joinRows = [];
column.forEach(c => {
if (d[c] !== undefined) {
joinRows.push(d[c].toString())
} else {
joinRows.push('-');
}
});
joinRows = joinRows.join('","')
jsonData.push(`"${joinRows.substring(0, joinRows.length)}"\n`)
});
await fs.appendFileSync(tempFile, jsonData.join(''));
pending = true
if (filesRows >= 1000000) {
filesRows = 0
try {
let uploadedUrl = await uploadChunkFile(tempFile, fileName)
successResponse.data.url.push(uploadedUrl)
}
catch(err) {
console.log(err.message)
console.log('chunk file upload error')
}
fileName = `${fileName.split('.')[0]}_${++fileCount}.csv`
tempFile = path.resolve('./modules/utils/temp/' + fileName);
if (!fs.existsSync(path.resolve(tempFile))) {
fs.appendFileSync(tempFile, '');
}
pending = false
}
console.log(`${result.processed}% completed${'.'.repeat(Math.round(result.processed/2))}, ${lastEntry}, ${result.o}, ${result.l}-- ${totalData}`)
successResponse.data.processed = (parseInt(result.processed) == 100) ? '99': result.processed.toString()
await res.write(JSON.stringify(successResponse))
await res.flush()
if (totalData > 0 && totalData == lastEntry) {
console.log('entered ....')
if (pending) {
try {
let uploadedUrl = await uploadChunkFile(tempFile, fileName)
successResponse.data.url.push(uploadedUrl)
}
catch(err) {
console.log(err.message)
console.log('chunk file upload error last time')
}
}
console.log('upload completed')
successResponse.data.processed = '100'
successResponse.status.message = 'url created successfully'
await res.write(JSON.stringify(successResponse))
await res.flush()
return await res.end()
}
}
else {
receivedData += d.toString()
}
})
.on('end', async function() {
if (totalData > 0 && totalData == lastEntry) {
// response already sent
}
else {
successResponse.data.processed = '-1'
successResponse.status.message = 'no data found'
await res.write(JSON.stringify(successResponse))
await res.flush()
return await res.end()
}
})
.on('error', async function(err) {
console.log('on error', err.Error)
errorResponse.status.message = err
return res.status(errorResponse.status.code).send(errorResponse)
})
}
catch(err) {
console.log('on try catch', err.message)
errorResponse.status.message = err.message
return res.status(errorResponse.status.code).send(errorResponse)
}
}