【问题标题】:Highland JS - turn reduce result array back to streamHighland JS - 将减少结果数组转回流
【发布时间】:2021-04-08 13:03:42
【问题描述】:

我正在尝试聚合时间序列数据的数据流。 reduce 有效,但是我找不到将结果数组转回另一个流的解决方案。

当我在 reduce 上调用 map 时,我只会返回整个数组作为结果。不是数组中的数据。

欢迎任何想法或提示。

const fs = require('fs')
const highland = require('highland')

const streamAgg = (aggData, parts) => {
        if (!aggData[parts.groupBySec]) {
          aggData[parts.groupBySec] = {}
          aggData[parts.groupBySec]['volume'] = parts.volume
          aggData[parts.groupBySec]['start-time'] = parts.timeStamp
          aggData[parts.groupBySec]['end-time'] = parts.timeStamp
        } else {
          aggData[parts.groupBySec]['volume']  += parts.volume
          aggData[parts.groupBySec]['end-time'] = parts.timeStamp
        }
        return aggData
      }


highland(fs.createReadStream('./timeseriesdata.csv', 'utf8'))
    .split()
    .map(line => line.split(','))
    .map(parts => ({
          timeStamp: parts[0],
          timeStampParsed: Date.parse(parts[0]),
          groupBySec: Math.floor(Date.parse(parts[0])/1000)*1000,
          volume: Number(parts[3]),
      }))
    .reject(parts => isNaN(parts.timeStampParsed))
    .reduce([], streamAgg)
    .map(x => x)
    .each(x => console.log(x))

【问题讨论】:

    标签: javascript data-stream highland.js


    【解决方案1】:

    似乎您正在简化为一个数组而不是一个对象(您的减速器函数似乎期望什么)。这有效:

    const fs       = require('fs');
    const highland = require('highland');
    
    
    const streamAgg = (aggData, parts) => {
      if (!aggData[parts.groupBySec]) {
        aggData[parts.groupBySec]               = {};
        aggData[parts.groupBySec].volume        = parts.volume;
        aggData[parts.groupBySec]['start-time'] = parts.timeStamp;
        aggData[parts.groupBySec]['end-time']   = parts.timeStamp;
      } else {
        aggData[parts.groupBySec].volume     += parts.volume;
        aggData[parts.groupBySec]['end-time'] = parts.timeStamp;
      }
      return aggData;
    };
    
    
    highland(fs.createReadStream('./timeseriesdata.csv', 'utf8'))
      .split()
      .map(line => line.split(','))
      .map(parts => ({
        timeStamp:       parts[0],
        timeStampParsed: Date.parse(parts[0]),
        groupBySec:      Math.floor(Date.parse(parts[0]) / 1000) * 1000,
        volume:          Number(parts[3])
      }))
      .reject(parts => isNaN(parts.timeStampParsed))
      .reduce({}, streamAgg)
      .doto(console.log)
      .done(() => {
        process.exit(0);
      });
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-05-30
      • 2021-07-11
      • 2021-01-01
      • 2018-12-05
      • 2017-10-02
      相关资源
      最近更新 更多