【问题标题】:Mongoose stream an aggregation query using async iteratorsMongoose 使用异步迭代器流式传输聚合查询
【发布时间】:2021-07-30 16:56:11
【问题描述】:

我想使用 Mongoose 流式传输聚合查询的结果,以便允许客户端处理巨大的 JSON 响应(最终通过管道传输到 CSV 转换器)。

到目前为止我的代码:

const pipeline = [
    {
      $unwind: {
        path: '$samples',
        // The name of a new field to hold the array index of the element.
        includeArrayIndex: 'num_sample',
        preserveNullAndEmptyArrays: true,
      },
    },
   {
       $limit: 10000,
    },
    {
      $project: {
        _id: 0,
        t: '$samples.t',
        station: '$station.name',
        loc: '$station.location',
        data: '$samples.data',
      },
    },
  ];
  // const samples = await fixed.aggregate([pipeline]);

  const cursor = fixed
    .aggregate(pipeline)
    .cursor({ batchSize: 1000 })
    .exec();
 res.writeHead(200, { 'content-type': 'application/json' });
 res.write('[');
 await cursor.eachAsync(async (doc, i) => {
    res.write(JSON.stringify(doc));
    res.write(',');
  });
  res.write('{}]');
  res.end();

但是如何将响应以json2csv 传递给 CSV 转换器? 上面的代码在功能上是正确的吗? 我必须在响应流中写入额外的字符才能正确格式化 JSON,但找到的解决方案(带有最终 {})在最终 JSON 中引入了一个空记录(我还没有找到在每个文档之后写“,”的方法从猫鼬游标返回,除了最后一个,因此我不得不引入一条空记录。

【问题讨论】:

    标签: json express mongoose async-await aggregation-framework


    【解决方案1】:

    它是一个流,这意味着您必须将 pipe 该流发送到 res,如下所示:

    cursor.pipe(JSONStream.stringify()).pipe(res)
    

    查看question了解更多详情。

    【讨论】:

      【解决方案2】:

      我终于用下面的代码解决了。

      欢迎提出任何改进所提供解决方案的建议。

      exports.get_FIXED_Samples_CSV = catchAsync(async (req, res, next) => {
        // retrieve start and end dates
        // if start is not provided then set it to current date - 30 days
        const startDate =
          moment(req.query.start).isValid() && req.query.start
            ? moment(new Date(req.query.start))
            : moment(new Date()).subtract(30, 'd');
      
        // if end is not provided or invalid set it to current date
        const endDate =
          moment(req.query.end).isValid() && req.query.end
            ? moment(new Date(req.query.end))
            : moment(new Date());
      
        // retrieve station name and check if valid,if not returns null
        const station = [
          'AQ101',
          'AQ102',
          'AQ103',
          'AQ104',
          'AQ105',
          'AQ106',
          'AQ107',
        ].includes(req.query.station)
          ? req.query.station
          : null;
      
        // eslint-disable line no-unused-vars
        const pipeline = [
          // sort by date DESC adn station.name ASC
          {
            $sort: {
              'station.name': 1,
              date: -1,
            },
          },
          // unwind by samples array adding num_sample counting
          {
            $unwind: {
              path: '$samples',
              // The name of a new field to hold the array index of the element.
              includeArrayIndex: 'num_sample',
              preserveNullAndEmptyArrays: true,
            },
          },
          // ~~~~~~~~~~~~ set limit to returning docs ~~~~~~~~~~~~~~~~~~~
          {
            $limit: 216000,
          },
          // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
          // select fields to show
          {
            $project: {
              _id: 0,
              t: '$samples.t',
              station: '$station.name',
              loc: '$station.location',
              data: '$samples.data',
            },
          },
        ];
      
        // add as first stage to the pipeline a match aggregation
        if (station) {
          // if valid station  is provided
          pipeline.unshift({
            $match: {
              'station.name': station,
              date: {
                $gte: new Date(startDate.format('YYYY-MM-DD')),
                $lte: new Date(endDate.format('YYYY-MM-DD')),
              },
            },
          });
        } else {
          // if station is INVALID OR NOT provided
          pipeline.unshift({
            $match: {
              date: {
                $gte: new Date(startDate.format('YYYY-MM-DD')),
                $lte: new Date(endDate.format('YYYY-MM-DD')),
              },
            },
          });
        }
      
        // transform to apply to generate CSV
        const custTransf = (item, strFormat = 'DD/MM/YYYY HH:mm:ss') => ({
          utc: item.t,
          t: moment(item.t).format(strFormat),
          station: item.station,
          ...item.data,
        });
        // Unwind Samples properties and flatten arrays
        const transforms = [
          // flatten({ objects: false, arrays: true }),
          custTransf,
        ];
        // const fields = ['t', 'station', 'data'];
        const opts = { transforms };
        const transformOpts = { highWaterMark: 8192 };
        const pipeTransf = new Transform(opts, transformOpts);
        // remove data prefix from fields
        const regex = /(data.)/gi;
        const filename = 'FixedStations';
        const strAtt = `attachment;filename=${filename}-${startDate.format(
          'YYYY-MMM-DD'
        )}-${endDate.format('YYYY-MMM-DD')}.csv`;
        res.header('Content-Type', 'text/csv');
        res.setHeader('Content-Type', 'text/csv');
        res.setHeader(
          'Content-Disposition',
          strAtt
          // 'attachment;filename=download.csv'
        );
        res.setHeader('Cache-Control', 'no-cache');
        res.setHeader('Pragma', 'no-cache');
      
        const cursor = fixed
          .aggregate(pipeline)
          .allowDiskUse(true)
          .cursor({ transfor: JSON.stringify, batchSize: 1000 })
          .exec()
          .pipe(JSONStream.stringify())
          .pipe(pipeTransf)
          .pipe(replace(regex, ''))
          .pipe(res);
      });
      

      【讨论】:

        猜你喜欢
        • 2020-09-13
        • 2016-08-12
        • 1970-01-01
        • 2016-08-01
        • 1970-01-01
        • 2021-10-22
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多