【问题标题】:How to create a read stream of a AWS S3 object in a async function?如何在异步函数中创建 AWS S3 对象的读取流?
【发布时间】:2020-06-25 15:06:45
【问题描述】:

如何在异步函数中创建 AWS S3 对象的读取流?

如果我尝试

exports.handler = async (event) => {
  var csvreadstream = await s3.getObject({ Bucket: bucket, Key: filename }).promise().createReadStream()
}

exports.handler = async (event) => {
  var s3Object = await s3.getObject({ Bucket: bucket, Key: filename }).promise();
  var csvreadstream = s3Object.createReadStream();
}

我明白了

{
  "errorType": "TypeError",
  "errorMessage": "(intermediate value).createReadStream is not a function",
  "trace": [
    "TypeError: (intermediate value).createReadStream is not a function",
    "    at Runtime.exports.handler (/var/task/app.js:29:86)",
    "    at processTicksAndRejections (internal/process/task_queues.js:94:5)"
  ]
}

谁能建议如何以异步(异步/等待方式)函数从 S3 对象创建读取流? 谢谢!

感谢 Mark B,我更进一步:

const AWS = require('aws-sdk');
const utils = require('./utils');
const csv = require('fast-csv');
const stream = require('stream');
const s3 = new AWS.S3();

exports.handler = async (event) => {
    console.log("Incoming Event: ", JSON.stringify(event));
    const bucket = event.Records[0].s3.bucket.name;
    const filename = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));
    const message = `File is uploaded in - ${bucket} -> ${filename}`;
    console.log(message);
    
    var errors = [];

    const splittedFilename = filename.split('.');
    const reportFilename = splittedFilename[0] + "Report." + splittedFilename[1];
    const reportBucket = 'external.transactions.reports';
    
    var s3object = (await s3.getObject({ Bucket: bucket, Key: filename }).promise());
    var csvreadstream = new stream.Readable();
    csvreadstream._read = () => {};
    csvreadstream.push(s3object.Body);
   
    csvreadstream
    .pipe(csv.parse({ headers: true }))
    .on('data', async function(data){
        this.pause();
        console.log("DATA: " + data);
        await utils.filterLogic(data, errors);
        this.resume();
    })
    .on('end', async function(){
        console.log("END");
        await utils.writeErrorReport(errors, s3, reportBucket, reportFilename);
    })
};

但是,流似乎没有得到处理,比如调用.on() 有没有人建议如何在异步函数中处理读取流? 非常感谢您的治疗。

【问题讨论】:

    标签: node.js amazon-web-services amazon-s3 aws-lambda async-await


    【解决方案1】:

    聚会迟到了,但我相信你只是想做:

    var readableStream = await s3.getObject({ Bucket: bucket, Key: filename }).createReadStream();
    

    这将返回一个可读流,而无需使用 .promise() 预先下载整个对象并等待它被解析,然后在整个缓冲区下载完成后手动将其转换为可读流。

    【讨论】:

      【解决方案2】:

      在您的代码中s3Object.Body 将返回一个缓冲区。如果您需要将 Buffer 转换为 Stream,则可以查看类似答案 here 的技术。

      【讨论】:

      • 非常感谢 Mark B。是的,这行得通! var s3object = (await s3.getObject({ Bucket: bucket, Key: filename }).promise()); var csvreadstream = new stream.Readable(); csvreadstream._read = () => {}; csvreadstream.push(s3object.Body);
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-05-30
      • 1970-01-01
      • 2016-06-25
      • 1970-01-01
      • 2016-08-23
      • 2016-03-08
      相关资源
      最近更新 更多