【问题标题】:AWS Configure Kinesis Stream with DynamoDB LambdaAWS 使用 DynamoDB Lambda 配置 Kinesis Stream
【发布时间】:2018-11-20 08:14:30
【问题描述】:

从这个问题,AWS DynamoDB Stream into Redshift

DynamoDB --> DynamoDBStreams --> Lambda 函数 --> Kinesis Firehose --> Redshift。

如何配置我的 Kinesis 函数以获取 Lambda 函数源?

我创建了一个 DynamoDB 表(Purchase Sales),并添加了 DynamoDB 流。然后我配置了 Lambda 函数来获取 DynamoDB 流。我的问题是如何配置 Kinesis 以获取 Lambda 函数 Source?我知道如何配置 Lambda 转换,但想选择作为源。不知道如何配置下面的 Direct Put Source。

谢谢,

执行了以下步骤:

【问题讨论】:

  • 据我了解,您不知道数据如何从 Lambda 传输到 Firehose。在这种情况下,您需要使用 PutRecords API (docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/…) 将记录放入 lambda 中以自行处理。
  • 我应该在 Lambda 或 firehose 控制台中编写 Put 函数吗?您能否给出示例语法,阅读页面,firehose.putRecord("PurchaseSalesFirehose",),同步中的记录是什么?我希望亚马逊能够制作即插即用而不是编写脚本等

标签: amazon-web-services aws-lambda amazon-redshift amazon-kinesis-firehose


【解决方案1】:

在您的情况下,您会将 dynamodb 流式传输到 redshift

DynamoDB --> DynamoDBStreams --> Lambda Function --> Kinesis Firehose --> Redshift.

首先,您需要一个 lambda 函数来处理 DynamoDBStream。对于每个 DynamoDBStream 事件,使用 firehose PutRecord API 将数据发送到 firehose。来自example

var firehose = new AWS.Firehose();
firehose.putRecord({
  DeliveryStreamName: 'STRING_VALUE', /* required */
  Record: { /* required */
    Data: new Buffer('...') || 'STRING_VALUE' /* Strings will be Base-64 encoded on your behalf */ /* required */
  }
}, function(err, data) {
  if (err) console.log(err, err.stack); // an error occurred
  else     console.log(data);           // successful response
});

接下来,我们必须知道数据是如何插入到 RedShift 中的。来自firehose document

对于向 Amazon Redshift 传输数据,Kinesis Firehose 首先交付 以前面描述的格式传入 S3 存储桶的数据。 Kinesis Firehose 然后发出 Amazon Redshift COPY 命令以加载 将数据从 S3 存储桶传输到 Amazon Redshift 集群。

所以,我们应该知道让COPY 命令将数据映射到 RedShift 模式的数据格式。我们必须关注data format requirement for redshift COPY command

默认情况下,COPY 命令期望源数据是 字符分隔的 UTF-8 文本。默认分隔符是管道 字符(|)。

因此,您可以对输入 dynamodb 流事件的 lambda 进行编程,将其转换为管道 (|) 分隔的行记录,然后将其写入 firehose。

var firehose = new AWS.Firehose();
firehose.putRecord({
  DeliveryStreamName: 'YOUR_FIREHOSE_NAME',
  Record: { /* required */
    Data: "RED_SHIFT_COLUMN_1_DATA|RED_SHIFT_COLUMN_2_DATA\n"
  }
}, function(err, data) {
  if (err) console.log(err, err.stack); // an error occurred
  else     console.log(data);           // successful response
});

记得添加\n,因为firehose 不会为你添加新行。

【讨论】:

  • 我收到错误,firehose 未定义;请修复或添加 /*global firehose / firehose.putRecord({ DeliveryStreamName: 'PurchaseSalesKinesis', Record: { / required */ Data: "PurchaseSalesId\n" } }, function(err, data) { if (err) console.log(err, err.stack); // 发生错误 else console.log(data); // 成功响应 });
  • 通过阅读此处docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Firehose.html的文档,您需要创建一个firehose API客户端var firehose = new AWS.Firehose();,并且在您的运行时环境中,您需要安装aws-sdk npm包npmjs.com/package/aws-sdk
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-10-31
  • 2015-11-08
  • 2017-02-24
  • 2018-09-07
  • 2019-11-05
  • 2019-07-27
相关资源
最近更新 更多