【发布时间】:2019-12-28 14:10:23
【问题描述】:
我们正在尝试基于 Kinesis Analytics 创建账单报告。我们有一个 Kinesis Stream,它收集有关 Lambda 调用、DynamoDB 写入和读取等信息。这些数据在 Kinesis Analytics 应用程序中使用以下 SQL 语句进行处理:
CREATE OR REPLACE STREAM "DESTINATION_MINUTE_STEAM" ("RowTime" timestamp, "CustomerId" varchar(128), "MandantenId" integer,"TotalCalls" integer,"TotalCost" double, "TypeReport" varchar(32));
CREATE OR REPLACE PUMP "STREAM_PUMP_MINUTE" AS INSERT INTO "DESTINATION_MINUTE_STEAM"
SELECT STREAM "ROWTIME" as "RowTime","CustomerId","MandantenId", COUNT(*) AS "TotalCalls", SUM("OverallCost") "TotalCost", 'Minute' as "TypeReport"
FROM "SOURCE_SQL_STREAM_001"
GROUP BY "CustomerId","MandantenId", STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);
这将创建一个目标流,该目标流应每 60 秒写入一次数据(基于 tumbling windows 的文档)。对于目的地,我们使用 Lambda 函数将数据写入 DynamoDB(文档也建议这样做)。现在我们期待 Lambda 函数基于分钟,但我们的 CloudWatch 日志显示它每隔几秒调用一次。
我们没有任何其他流,没有任何其他调用此函数的 Lambda 函数,没有其他分析应用程序,没有触发器。
这里是template.yaml的一部分:
RTCKINESISINVOICECONSUMER:
Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction
Properties:
CodeUri: bin/
Handler: billinghandler
Runtime: go1.x
Tracing: Active # https://docs.aws.amazon.com/lambda/latest/dg/lambda-x-ray.html
Policies:
- AmazonDynamoDBFullAccess
- AmazonKinesisFullAccess
Environment:
Variables:
DISABLE_SSL: !Ref LambdaDisableSSL
KinAnalyticsApp:
Type: AWS::KinesisAnalytics::Application
Properties:
ApplicationName: "RealtimeBillingAnalytics"
ApplicationDescription: "Sample Kin App"
ApplicationCode: !Ref KinesisSqlCode
Inputs:
- NamePrefix: "SOURCE_SQL_STREAM"
InputSchema:
RecordColumns:
- Name: "RecordId"
SqlType: "VARCHAR(128)"
Mapping: "$.RecordId"
- Name: "MandantenId"
SqlType: "Integer"
Mapping: "$.MandantenId"
- Name: "CustomerId"
SqlType: "VARCHAR(128)"
Mapping: "$.CustomerId"
- Name: "OverallCost"
SqlType: "Real"
Mapping: "$.OverallCost"
RecordFormat:
RecordFormatType: "JSON"
MappingParameters:
JSONMappingParameters:
RecordRowPath: "$"
KinesisStreamsInput:
ResourceARN: !GetAtt KinInputStream.Arn
RoleARN: !GetAtt KinesisAnalyticsRole.Arn
KinInputStream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 1
KinesisAnalyticsRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: kinesisanalytics.amazonaws.com
Action: "sts:AssumeRole"
Path: "/"
Policies:
- PolicyName: Open
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action: "*"
Resource: "*"
KinAnalyticsAppOutputs:
Type: AWS::KinesisAnalytics::ApplicationOutput
DependsOn: KinAnalyticsApp
Properties:
ApplicationName: !Ref KinAnalyticsApp
Output:
Name: "DESTINATION_MINUTE_STEAM"
DestinationSchema:
RecordFormatType: "JSON"
LambdaOutput:
ResourceARN: !GetAtt RTCKINESISINVOICECONSUMER.Arn
RoleARN: !GetAtt KinesisAnalyticsRole.Arn
有人知道为什么 Kinesis Analytics 应用程序经常发出信号吗?
【问题讨论】:
标签: amazon-web-services aws-lambda analytics amazon-kinesis