【问题标题】:Invoking multiple AWS Lambdas doesn't make paralel processes调用多个 AWS Lambda 不会产生并行进程
【发布时间】:2023-03-17 10:07:01
【问题描述】:

我正在尝试从另一个 lambda 函数调用多个 lambda 函数(一个 lambda 函数,它将运行单独的并行进程)。第一个作为 cron lambda 运行,它只从 db 查询文档,然后使用 doc 的参数调用另一个 lambda。此 cron lambda 每五分钟运行一次并正确查询文档。我正在用两个文档测试第二个 lambda。问题在于,每次调用第二个 lambda 时,它只处理一个文档 - 每次处理另一个文档时,它在上一次调用时没有处理:

例如:

  • 文档 1
  • 文档 2

首先,调用第二个 lambda -> process doc 1

其次,调用第二个 lambda -> 处理文档 2

第三,调用第二个 lambda -> process doc 1

第二个 lambda 的第四次调用 -> 处理文档 2

等等……

第一个 (cron) lambda 代码:

aws.config.update({
  region : env.lambdaRegion,
  accessKeyId: env.lambdaAccessKeyId,
  secretAccessKey: env.lambdaSecretAccessKey,
});

const lambda = new aws.Lambda({
  region: env.lambdaRegion,
});

exports.handler = async (event: any, context: any) => {
  context.callbackWaitsForEmptyEventLoop = false;

  return new Promise(async (resolve, reject) => {
    for (let i = 0; i < 100; i++) {
      const doc = await mongo.db.collection('docs').
        findOneAndUpdate(
          {
            status: 1,
            lambdaProcessing: null,
          },
          { $set: { lambdaProcessing: new Date() } },
          {
            sort: { processedAt: 1 },
            returnNewDocument: true,
          },
        );

      if (doc.value && doc.value._id) {
        const params = {
          FunctionName: env.lambdaName,
          InvocationType: 'Event',
          Payload: JSON.stringify({ docId: doc.value._id }),
        };

        lambda.invoke(params);
      } else {
        if (doc.lastErrorObject && doc.lastErrorObject.n === 0) {
          break;
        }
      }
    }
    resolve();
  });
};

第二个 lambda 函数:

exports.handler = async (event: any, ctx: any) => {
  ctx.callbackWaitsForEmptyEventLoop = false;

  if (event && event.docId) {
    const doc = await mongo.db.collection('docs').findById(event.docId);
    return await processDoc(doc);
  } else {
    throw new Error('doc ID is not present.');
  }
};

【问题讨论】:

    标签: node.js typescript amazon-web-services aws-lambda aws-sdk


    【解决方案1】:

    要在没有“丑陋”的 cronjob 解决方案的情况下并行运行多个 lambda,我建议使用类型为 Parallel 的 AWS 步进函数。您可以在serverless.yml 中设置逻辑,函数调用本身就是 lambda 函数。您可以通过callback 的第二个参数传递数据。如果数据大于 32kb,我建议使用 S3 存储桶/数据库。

    serverless.yml 示例

    stepFunctions:
      stateMachines:
        test:
          name: 'test'
          definition:
            Comment: "Testing tips-like state structure"
            StartAt: GatherData
            States:
              GatherData:
                Type: Parallel
                Branches:
                  -
                    StartAt: GatherDataA
                    States:
                      GatherDataA:
                        Type: Task
                        Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${opt:stage, self:provider.stage}-firstA"
                        TimeoutSeconds: 15
                        End: true
                  -
                    StartAt: GatherDataB
                    States:
                      GatherDataB:
                        Type: Task
                        Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${opt:stage, self:provider.stage}-firstB"
                        TimeoutSeconds: 15
                        End: true
                Next: ResolveData
              ResolveData:
                Type: Task
                Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${opt:stage, self:provider.stage}-resolveAB"
                TimeoutSeconds: 15
                End: true
    

    示例处理程序

    module.exports.firstA = (event, context, callback) => {
      const data = {
        id: 3,
        somethingElse: ['Hello', 'World'],
      };
      callback(null, data);
    };
    module.exports.firstB = (event, context, callback) => {
      const data = {
        id: 12,
        somethingElse: ['olleH', 'dlroW'],
      };
      callback(null, data);
    };
    
    module.exports.resolveAB = (event, context, callback) => {
      console.log("resolving data from a and b: ", event);
      const [dataFromA, dataFromB] = event;
      callback(null, event);
    };
    
    

    更多信息见

    【讨论】:

      【解决方案2】:

      关键是为我们想要调用的每个 lambda 创建新的单独的 aws.Lambda() 实例,然后我们必须解析并等待我们调用的每个 lambda(promieses 数组)。如果调用的 lambda 不需要等待,这是可以的,因此我们不会在 AWS 上浪费处理时间 - 因此调用的 lambda 开始处理,然后在不等待其响应的情况下解析,因此主 (cron) lambda 可以解析。

      固定 (cron) lambda 处理程序:

      aws.config.update({
        region : env.lambdaRegion,
        accessKeyId: env.lambdaAccessKeyId,
        secretAccessKey: env.lambdaSecretAccessKey,
      });
      
      exports.handler = async (event: any, context: any) => {
        context.callbackWaitsForEmptyEventLoop = false;
      
        return new Promise(async (resolve, reject) => {
          const promises: any = [];
          for (let i = 0; i < 100; i++) {
            const doc = await global['mongo'].db.collection('docs').
              findOneAndUpdate(
                {
                  status: 1,
                  lambdaProcessing: null,
                },
                { $set: { lambdaProcessing: new Date() } },
                {
                  sort: { processedAt: 1 },
                  returnNewDocument: true,
                },
              );
      
            if (doc.value && doc.value._id) {
              const params = {
                FunctionName: env.lambdaName,
                InvocationType: 'Event',
                Payload: JSON.stringify({ docId: doc.value._id }),
              };
      
              const lambda = new aws.Lambda({
                region: env.lambdaRegion,
                maxRetries: 0,
              });
      
              promises.push(
                new Promise((invokeResolve, invokeReject) => {
                  lambda.invoke(params, (error, data) => {
                    if (error) { console.error('ERROR: ', error); }
                    if (data) { console.log('SUCCESS:', data); }
                    // Resolve invoke promise in any case.
                    invokeResolve();
                  });
                }),
              );
            } else {
              if (doc.lastErrorObject && doc.lastErrorObject.n === 0) {
                break;
              }
            }
          }
          await Promise.all(promises);
          resolve();
        });
      };
      

      第二个(处理)lambda:

      exports.handler = async (event: any, ctx: any) => {
        ctx.callbackWaitsForEmptyEventLoop = false;
      
        if (event && event.docId) {
          const doc = await mongo.db.collection('docs').findById(event.docId);
          processDoc(doc);
          return ctx.succeed('Completed.');
        } else {
          throw new Error('Doc ID is not present.');
        }
      };
      

      我不知道是否有更好的方法来使用严格的 lambda 函数来实现这一点,但这是可行的。

      【讨论】:

      • 为什么不将它与 SQS 解耦?使用 SQS Lambda 触发器,您将获得开箱即用的并行处理。
      • 我是 AWS 新手,所以这是在我的可用时间范围内解决我的问题的最直观的方法。我找不到任何使用 SQS 的可行解决方案。您能否提供一个示例,以便我将来重构我的功能?
      • 我还需要一个将记录添加到 SQS 的 cron 作业吗?
      猜你喜欢
      • 1970-01-01
      • 2019-07-26
      • 2022-01-22
      • 2020-10-18
      • 1970-01-01
      • 2018-09-30
      • 2015-12-05
      • 2013-11-22
      • 1970-01-01
      相关资源
      最近更新 更多