【问题标题】:Asynchronous foreach or for loop in NodeJSNodeJS 中的异步 foreach 或 for 循环
【发布时间】:2021-07-10 04:02:25
【问题描述】:

我是 NodeJs 的初学者,请原谅。下面的 lambda 函数是在 S3 中压缩/压缩文件并将压缩文件上传回 S3。 listOfKeys 包含要压缩的密钥列表。如果你注意到for (const file in listOfKeys),如果有一个大数据集,即如果listOfKeys 有很长的键列表,这会同步运行导致 lambda 超时。问题是,有没有办法异步或并行运行循环,以便及时或异步压缩文件?

代码:

const AWS = require('aws-sdk');
const async = require('async');
const archiver = require('archiver');
const stream = require('stream');
const request = require('request');

const awsOptions = {
    region: 'us-east-1'
};
const s3 = new AWS.S3(awsOptions);

const streamTo = (bucket, key) => {
    var passthrough = new stream.PassThrough();
    s3.upload({
        Bucket: bucket,
        Key: key,
        Body: passthrough,
        ContentType: "application/zip",
    },
        (err, data) => {
            if (err) throw err;
        }
    );
    return passthrough;
};

const getStream = (bucket, key) => {
    let streamCreated = false;
    const passThroughStream = new stream.PassThrough();

    passThroughStream.on("newListener", event => {
        if (!streamCreated && event == "data") {
            const s3Stream = s3
                .getObject({ Bucket: bucket, Key: key })
                .createReadStream();
            s3Stream
                .on("error", err => passThroughStream.emit("error", err))
                .pipe(passThroughStream);

            streamCreated = true;
        }
    });
    return passThroughStream;
};

exports.handler = async (event, context, callback) => {

    let totalKeys = 0;
    const listOfKeys = [];
    const SrcBucket = event.Records[0].s3.bucket.name;
    const trigger_file = event.Records[0].s3.object.key;
    const prefix = trigger_file.split('/')[0] + '/' + trigger_file.split('/')[1] + '/';
    const dirToZip = trigger_file.split('/')[2].substr(0, trigger_file.split('/')[2].length - '.renamed'.length);
    const s3ListFilter = prefix + dirToZip;
    const destinationKey = prefix + `${dirToZip}.zip`;
    const bucketParams = {
        Bucket: SrcBucket,
        Delimiter: '/',
        Prefix: s3ListFilter + '/'
    };

    let data;
    do {
        bucketParams.Marker = (data && data.NextMarker) ? data.NextMarker : undefined;
        data = await s3.listObjects(bucketParams).promise();
        const contents = data.Contents;
        totalKeys = totalKeys + contents.length;
        listOfKeys.push(...contents.map(x => x.Key));
    } while (data.IsTruncated);

    console.log(`Total keys: ${listOfKeys.length}`);
    
    await new Promise(async (resolve, reject) => {
        var zipStream = streamTo(SrcBucket, destinationKey);
        zipStream.on("close", resolve);
        zipStream.on("end", resolve);
        zipStream.on("error", reject);
        var archive = archiver("zip");
        archive.on("error", err => {
            throw new Error(err);
        });
        archive.pipe(zipStream);

        var keysCounter = 0;
        listOfKeys.forEach(file => {
            archive.append(getStream(SrcBucket, file), { name: file.split('/')[3] })
            keysCounter++
            if (keysCounter >= Object.keys(listOfKeys).length) {
                // Called at the end of the loop
                archive.finalize();
            }
        });

        //archive.finalize();
    }).catch(err => {
        throw new Error(err);
    });

    callback(null, {
        body: { final_destination: destinationKey }
    });
};

【问题讨论】:

    标签: node.js asynchronous async-await node-async


    【解决方案1】:

    我可能会更积极地重写整个事情,但要回答您的具体问题:将您的 listOfKeys.forEach 声明替换为:

    await Promise
      .all(
        listOfKeys.map(key => archive.append(getStream(SrcBucket, key), { name: key.split('/')[3] }))
      );
    

    【讨论】:

    • 感谢您的帮助。但是,代码仍在同步运行。 Lambda 仍然超时
    【解决方案2】:

    Array.prototype.forEach()

    const array1 = ['a', 'b', 'c'];
    
    array1.forEach(element => console.log(element));
    
    // expected output: "a"
    // expected output: "b"
    // expected output: "c"
    

    所以你的代码应该是:

    listOfKeys.forEach(file => {
        archive.append(getStream(SrcBucket, listOfKeys[file]), { name: listOfKeys[file].split('/')[3] })
    })
    

    (不确定它是否有效,请告诉我)

    来源:Array.prototype.forEach() | MDN

    编辑:

    所以archive.finalize() 应该在循环结束后调用,有几种方法可以做到,但我认为这个应该可以正常工作。见:Callback after all asynchronous forEach callbacks are completed

    //There's probably a better way to do it but it works :
    keysCounter = 0
    listOfKeys.forEach(file => {
        archive.append(getStream(SrcBucket, listOfKeys[file]), { name: listOfKeys[file].split('/')[3] })
        keyCounter++
        if(keyCounter >= Object.keys(listOfKeys).length) {
            // Called at the end of the loop
            archive.finalize();
        }
    })
    

    【讨论】:

    • @Aniruddha 我认为这是由于 archive.finalize(),让我编辑我的答案
    • 好的,那应该放在哪里?
    • @Aniruddha 没试过,但我认为现在应该可以了
    • 我很欣赏你的才华和工作,但这似乎也不起作用。 Lambda 超时。 :(
    • @Aniruddha 很遗憾,您可能会尝试在 [// Called at the end of the loop] 下添加更多代码部分,例如 [callback()] 但否则我不知道如何提供更多帮助
    【解决方案3】:

    不要尝试在一个 lambda 函数中执行它们,而是使用 SQS 抵消它们,使用单独的 lambda 来处理每个 zip。

    这样,您可以隔离以下内容:

    • 每个 zip 存档之间的故障
    • 并行运行每个 zip 进程
    • 在单个 lambda 函数中隔离每个 zip 的处理
    • 为无法处理的消息(或 zip)实现死信队列
    • 在您的应用程序中创建 SRP(单一职责),即一个 lambda 获取 zip 文件,另一个处理它

    【讨论】:

    • 感谢您的想法。我会试试的,因为我必须学习 SQS
    猜你喜欢
    • 2019-02-15
    • 1970-01-01
    • 1970-01-01
    • 2018-05-12
    • 1970-01-01
    • 1970-01-01
    • 2018-01-14
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多