【问题标题】:Kinesis Analytics Application calls Lambda too oftenKinesis Analytics 应用程序调用 Lambda 过于频繁
【发布时间】:2019-12-28 14:10:23
【问题描述】:

我们正在尝试基于 Kinesis Analytics 创建账单报告。我们有一个 Kinesis Stream,它收集有关 Lambda 调用、DynamoDB 写入和读取等信息。这些数据在 Kinesis Analytics 应用程序中使用以下 SQL 语句进行处理:

CREATE OR REPLACE STREAM "DESTINATION_MINUTE_STEAM" ("RowTime" timestamp, "CustomerId" varchar(128), "MandantenId" integer,"TotalCalls" integer,"TotalCost" double, "TypeReport" varchar(32));
CREATE OR REPLACE  PUMP "STREAM_PUMP_MINUTE" AS INSERT INTO "DESTINATION_MINUTE_STEAM"
SELECT STREAM "ROWTIME" as "RowTime","CustomerId","MandantenId", COUNT(*) AS "TotalCalls", SUM("OverallCost") "TotalCost",  'Minute' as "TypeReport"
FROM "SOURCE_SQL_STREAM_001"
GROUP BY "CustomerId","MandantenId", STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);

这将创建一个目标流,该目标流应每 60 秒写入一次数据(基于 tumbling windows 的文档)。对于目的地,我们使用 Lambda 函数将数据写入 DynamoDB(文档也建议这样做)。现在我们期待 Lambda 函数基于分钟,但我们的 CloudWatch 日志显示它每隔几秒调用一次。

我们没有任何其他流,没有任何其他调用此函数的 Lambda 函数,没有其他分析应用程序,没有触发器。

这里是template.yaml的一部分:

RTCKINESISINVOICECONSUMER:
    Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction
    Properties:
      CodeUri: bin/ 
      Handler: billinghandler
      Runtime: go1.x
      Tracing: Active # https://docs.aws.amazon.com/lambda/latest/dg/lambda-x-ray.html
      Policies:
        - AmazonDynamoDBFullAccess
        - AmazonKinesisFullAccess
      Environment:
        Variables:
          DISABLE_SSL: !Ref LambdaDisableSSL 

  KinAnalyticsApp:
    Type: AWS::KinesisAnalytics::Application
    Properties:
      ApplicationName: "RealtimeBillingAnalytics"
      ApplicationDescription: "Sample Kin App"
      ApplicationCode: !Ref KinesisSqlCode
      Inputs:
        - NamePrefix: "SOURCE_SQL_STREAM"
          InputSchema:
            RecordColumns:
             - Name: "RecordId"
               SqlType: "VARCHAR(128)"
               Mapping: "$.RecordId"
             - Name: "MandantenId"
               SqlType: "Integer"
               Mapping: "$.MandantenId"
             - Name: "CustomerId"
               SqlType: "VARCHAR(128)"
               Mapping: "$.CustomerId"
             - Name: "OverallCost"
               SqlType: "Real"
               Mapping: "$.OverallCost"
            RecordFormat:
              RecordFormatType: "JSON"
              MappingParameters:
                JSONMappingParameters:
                  RecordRowPath: "$"
          KinesisStreamsInput:
            ResourceARN: !GetAtt KinInputStream.Arn
            RoleARN: !GetAtt KinesisAnalyticsRole.Arn
  KinInputStream:
    Type: AWS::Kinesis::Stream
    Properties:
      ShardCount: 1
  KinesisAnalyticsRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: kinesisanalytics.amazonaws.com
            Action: "sts:AssumeRole"
      Path: "/"
      Policies:
        - PolicyName: Open
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action: "*"
                Resource: "*"
  KinAnalyticsAppOutputs:
    Type: AWS::KinesisAnalytics::ApplicationOutput
    DependsOn: KinAnalyticsApp
    Properties:
      ApplicationName: !Ref KinAnalyticsApp
      Output:
        Name: "DESTINATION_MINUTE_STEAM"
        DestinationSchema:
          RecordFormatType: "JSON"
        LambdaOutput:
          ResourceARN: !GetAtt RTCKINESISINVOICECONSUMER.Arn
          RoleARN: !GetAtt KinesisAnalyticsRole.Arn

有人知道为什么 Kinesis Analytics 应用程序经常发出信号吗?

【问题讨论】:

    标签: amazon-web-services aws-lambda analytics amazon-kinesis


    【解决方案1】:

    我认为您的 lambda 可能不会将状态“OK”作为确认返回,因此 Kinesis 分析将其视为失败。然后它继续发送失败记录。 请查看此页面下的“记录响应模型”部分。 https://docs.aws.amazon.com/kinesisanalytics/latest/dev/how-it-works-output-lambda.html

    作为输出函数(带有记录 ID)发送到您的 Lambda 的每条记录都必须通过 Ok 或 DeliveryFailed 进行确认,并且它必须包含以下参数。否则,Kinesis Data Analytics 会将它们视为交付失败。

    记录的交付状态。以下是可能的值:

    Ok:记录已成功转换并发送到最终目的地。 Kinesis Data Analytics 提取记录以进行 SQL 处理。

    DeliveryFailed:记录未通过 Lambda 作为输出函数成功传送到最终目的地。 Kinesis Data Analytics 不断重试将交付失败的记录作为输出函数发送到 Lambda。

    顺便说一句,您可以在此处获取 lambda 示例。 https://docs.aws.amazon.com/kinesisanalytics/latest/dev/how-it-works-output-lambda-functions.html#how-it-works-lambda-dest-python

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2011-06-30
      • 2020-05-18
      • 1970-01-01
      • 2018-09-07
      • 2013-12-28
      • 2022-07-08
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多