【问题标题】:How to convert a large stream to a gzipped base64 string如何将大流转换为 gzip 压缩的 base64 字符串
【发布时间】:2018-10-18 18:04:34
【问题描述】:

我正在构建一个分析平台,我想先压缩我的 ETL(提取转换负载)作业,然后再将它们存储到我的数据库中。在我开始编写代码之前,我想知道是否有经验的人可以告诉我如何正确地完成它。我想 gzip 数据,然后将其转换为 base64 字符串。我只是简单地 gzip,然后转换为 base64 还是不行?

这是我目前用于这些大型数据集的过程。

var streamObj = athenaClient.execute('my query').toStream()
var data = [];

redis.set('Some Dashboard Data', '[')

streamObj.on('data', function(record) {
    // TODO gzip record then convert to base64
    if (data.length === 500) {
        let tempData = JSON.stringify(data);
        data = []
        redis.append('Some Dashboard Data', tempData.slice(1, tempData.length - 1) + ',')
        }
        data.push(record);
    })
}

如果这不可行,有没有办法存储压缩后的字符串?

【问题讨论】:

  • 为什么 data.length === 500 ?你就是这样分流的吗?
  • 如果我不这样做,节点会耗尽内存并导致堆栈溢出;)
  • 我可以将它写入一个临时文件,然后压缩整个文件吗?
  • 正在谈论 20mb 的数据
  • 我认为在您的 if 条件下,您可以尝试暂停读取流,进行 gzip 和 base64 转换,将其保存在 redis 中,并在成功恢复流时。这样您就不会有多余的数据

标签: javascript node.js zlib amazon-athena node-redis


【解决方案1】:

让 node.js 环境使用流提供的背压来控制内存。

我会考虑这个解决方案:

inputStream
    .pipe(zlib)
    .pipe(transformToBase64Stream)
    .pipe(redisCli);

zlib 是原生的,因此不会造成任何问题。 要转换为 base64,您可以编写转换流或使用外部 tools。要将结果通过管道传输到redis by stream,您可以在管道模式下生成子进程redis-cli。如大量插入和 redis cli 文章中所述,建议将其用于大数据,但您必须自己处理 redis 协议。阅读提供的文章,如果它有助于解决您的问题,请告诉我。

【讨论】:

  • 感谢您的洞察力。我会尝试在现实生活中应用它,然后告诉你它是怎么回事。
【解决方案2】:

只是为了进一步详细说明 Zilvinas 的答案。我将向您展示我是如何工作的。

const athena = require('./athena')
const redis = require('./redis')
const zlib = require('zlib')
const Stream = require('stream')

exports.persistStream = (config, query, name, transform) => {
return new Promise((resolve, reject) => {
    let recordCount = 0

    var transformStream = new Stream.Transform({ writableObjectMode: true, readableObjectMode: true})
    transformStream._transform = function (chunk, encoding, done) {

        recordCount++

        if (transform) chunk = transform(chunk)

        let jsonChunk = JSON.stringify([chunk])

        switch (true) {
            case recordCount === 1: 
                jsonChunk = jsonChunk.slice(0, jsonChunk.length - 1); break
            default:
                jsonChunk = ',' + jsonChunk.slice(1, jsonChunk.length - 1); break
        }
        this.push(jsonChunk)
        done();
    };

    transformStream._final = function (done) {
        this.push(']')
        done()
    }

    const gzip = zlib.createGzip()

    let buffers = []

    var stream = athena.execute(query)
        .toStream()
        .pipe(transformStream)
        .pipe(gzip)

    gzip.on('data', (chunk) => {
        buffers.push(chunk)
    })

    gzip.on('end', function () {
        let buffer = Buffer.concat(buffers)
        redis.set(name, buffer.toString('base64'), (err, response) => {
            zlib.gzip(config, (err, buff) => {
                redis.set(name + ' Config', buff.toString('base64'), (err, response) => {
                    if (err) {
                        console.log(err)
                        reject()
                    } else {

                        console.log(name + ' succeeded')
                        resolve()
                    }
                })
            })
        })
    })

    stream.on('error', (err) => {
        console.log(err)
        reject()
    })
})
}

【讨论】:

  • 希望这对某人有所帮助!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-08-18
  • 2013-04-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多