我终于用下面的代码解决了。
欢迎提出任何改进所提供解决方案的建议。
exports.get_FIXED_Samples_CSV = catchAsync(async (req, res, next) => {
// retrieve start and end dates
// if start is not provided then set it to current date - 30 days
const startDate =
moment(req.query.start).isValid() && req.query.start
? moment(new Date(req.query.start))
: moment(new Date()).subtract(30, 'd');
// if end is not provided or invalid set it to current date
const endDate =
moment(req.query.end).isValid() && req.query.end
? moment(new Date(req.query.end))
: moment(new Date());
// retrieve station name and check if valid,if not returns null
const station = [
'AQ101',
'AQ102',
'AQ103',
'AQ104',
'AQ105',
'AQ106',
'AQ107',
].includes(req.query.station)
? req.query.station
: null;
// eslint-disable line no-unused-vars
const pipeline = [
// sort by date DESC adn station.name ASC
{
$sort: {
'station.name': 1,
date: -1,
},
},
// unwind by samples array adding num_sample counting
{
$unwind: {
path: '$samples',
// The name of a new field to hold the array index of the element.
includeArrayIndex: 'num_sample',
preserveNullAndEmptyArrays: true,
},
},
// ~~~~~~~~~~~~ set limit to returning docs ~~~~~~~~~~~~~~~~~~~
{
$limit: 216000,
},
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// select fields to show
{
$project: {
_id: 0,
t: '$samples.t',
station: '$station.name',
loc: '$station.location',
data: '$samples.data',
},
},
];
// add as first stage to the pipeline a match aggregation
if (station) {
// if valid station is provided
pipeline.unshift({
$match: {
'station.name': station,
date: {
$gte: new Date(startDate.format('YYYY-MM-DD')),
$lte: new Date(endDate.format('YYYY-MM-DD')),
},
},
});
} else {
// if station is INVALID OR NOT provided
pipeline.unshift({
$match: {
date: {
$gte: new Date(startDate.format('YYYY-MM-DD')),
$lte: new Date(endDate.format('YYYY-MM-DD')),
},
},
});
}
// transform to apply to generate CSV
const custTransf = (item, strFormat = 'DD/MM/YYYY HH:mm:ss') => ({
utc: item.t,
t: moment(item.t).format(strFormat),
station: item.station,
...item.data,
});
// Unwind Samples properties and flatten arrays
const transforms = [
// flatten({ objects: false, arrays: true }),
custTransf,
];
// const fields = ['t', 'station', 'data'];
const opts = { transforms };
const transformOpts = { highWaterMark: 8192 };
const pipeTransf = new Transform(opts, transformOpts);
// remove data prefix from fields
const regex = /(data.)/gi;
const filename = 'FixedStations';
const strAtt = `attachment;filename=${filename}-${startDate.format(
'YYYY-MMM-DD'
)}-${endDate.format('YYYY-MMM-DD')}.csv`;
res.header('Content-Type', 'text/csv');
res.setHeader('Content-Type', 'text/csv');
res.setHeader(
'Content-Disposition',
strAtt
// 'attachment;filename=download.csv'
);
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Pragma', 'no-cache');
const cursor = fixed
.aggregate(pipeline)
.allowDiskUse(true)
.cursor({ transfor: JSON.stringify, batchSize: 1000 })
.exec()
.pipe(JSONStream.stringify())
.pipe(pipeTransf)
.pipe(replace(regex, ''))
.pipe(res);
});