【问题标题】:Pushing AWS Lambda data to Kinesis Stream将 AWS Lambda 数据推送到 Kinesis Stream
【发布时间】:2016-10-31 21:44:19
【问题描述】:

有没有办法将数据从 Lambda 函数推送到 Kinesis 流?我搜索了互联网,但没有找到任何与之相关的示例。

谢谢。

【问题讨论】:

    标签: amazon-web-services aws-lambda amazon-kinesis


    【解决方案1】:

    是的,您可以将信息从 Lambda 发送到 Kinesis Stream,这非常简单。确保您以正确的权限运行 Lambda。

    1. 创建一个名为 kinesis.js 的文件,该文件将提供一个“保存”功能,用于接收有效负载并将其发送到 Kinesis Stream。我们希望能够在任何我们想向流中发送数据的地方包含这个“保存”功能。代码:

    const AWS = require('aws-sdk');
    const kinesisConstant = require('./kinesisConstants'); //Keep it consistent
    const kinesis = new AWS.Kinesis({
      apiVersion: kinesisConstant.API_VERSION, //optional
      //accessKeyId: '<you-can-use-this-to-run-it-locally>', //optional
      //secretAccessKey: '<you-can-use-this-to-run-it-locally>', //optional
      region: kinesisConstant.REGION
    });
    
    const savePayload = (payload) => {
    //We can only save strings into the streams
      if( typeof payload !== kinesisConstant.PAYLOAD_TYPE) {
        try {
          payload = JSON.stringify(payload);
        } catch (e) {
          console.log(e);
        }
      }
    
      let params = {
        Data: payload,
        PartitionKey: kinesisConstant.PARTITION_KEY,
        StreamName: kinesisConstant.STREAM_NAME
      };
    
      kinesis.putRecord(params, function(err, data) {
        if (err) console.log(err, err.stack);
        else     console.log('Record added:',data);
      });
    };
    
    exports.save = (payload) => {
      const params = {
        StreamName: kinesisConstant.STREAM_NAME,
      };
    
      kinesis.describeStream(params, function(err, data) {
        if (err) console.log(err, err.stack);
        else {
          //Make sure stream is able to take new writes (ACTIVE or UPDATING are good)
          if(data.StreamDescription.StreamStatus === kinesisConstant.STATE.ACTIVE
            || data.StreamDescription.StreamStatus === kinesisConstant.STATE.UPDATING ) {
            savePayload(payload);
          } else {
            console.log(`Kinesis stream ${kinesisConstant.STREAM_NAME} is ${data.StreamDescription.StreamStatus}.`);
            console.log(`Record Lost`, JSON.parse(payload));
          }
        }
      });
    };
    1. 创建一个 kinesisConstant.js 文件以保持一致:)

    module.exports = {
      STATE: {
        ACTIVE: 'ACTIVE',
        UPDATING: 'UPDATING',
        CREATING: 'CREATING',
        DELETING: 'DELETING'
      },
      STREAM_NAME: '<your-stream-name>',
      PARTITION_KEY: '<string-value-if-one-shard-anything-will-do',
      PAYLOAD_TYPE: 'String',
      REGION: '<the-region-where-you-have-lambda-and-kinesis>',
      API_VERSION: '2013-12-02'
    }
    1. 您的处理程序文件:我们添加了“完成”函数来向想要将数据发送到流的任何人发送响应,但“kinesis.save(event)”完成了所有工作。

    const kinesis = require('./kinesis');
    
    exports.handler = (event, context, callback) => {
      console.log('LOADING handler');
      
      const done = (err, res) => callback(null, {
        statusCode: err ? '400' : '200',
        body: err || res,
        headers: {
          'Content-Type': 'application/json',
        },
      });
      
      kinesis.save(event); // here we send it to the stream
      done(null, event);
    }

    【讨论】:

    • 很好的例子,对我帮助很大。请纠正错字(小) - kinesisConstants.js - 你错过了文件名中的最后一个“s”
    • 这是一个很好的例子。但我不禁想知道,在这里使用 Kinesis 有什么好处。如果您将数据传递给 Lambda,为什么不直接推送到存储(例如:Elastic Search)?当大多数 AWS 存储工具都有自己的 Node 库时,为什么还要使用第三方工具?性能更高?
    【解决方案2】:

    这应该完全像在您的计算机上那样完成。

    这是nodejs中的一个例子:

    let aws = require('aws');
    let kinesis = new aws.Kinesis();
    
    // data that you'd like to send
    let data_object = { "some": "properties" };
    let data = JSON.stringify(data_object);
    
    // push data to kinesis
    const params = {
      Data: data,
      PartitionKey: "1",
      StreamName: "stream name"
    }
    
    kinesis.putRecord(params, (err, data) => {
      if (err) console.error(err);
      else console.log("data sent");
    }
    

    请注意,这段代码不会工作,因为Lambda 对您的信息流没有任何权限。 通过Lambda访问AWS资源时,最好使用IAM角色;

    1. 配置新Lambda时,可以选择现有/创建角色。
    2. 转到IAM,然后转到角色,然后选择您分配给Lambda 函数的角色名称。
    3. 添加相关权限(putRecordputRecords)。

    然后,测试Lambda

    【讨论】:

      【解决方案3】:

      是的,这可以做到,我试图完成同样的事情,并且能够在 Lambda 中使用 Node.js 4.3 运行时做到这一点,并且它也适用于 6.10 版。

      代码如下:

      在您的 Lambda 函数顶部声明以下内容:

      var AWS = require("aws-sdk");
      var kinesis = new AWS.Kinesis();
      function writeKinesis(rawdata){
          data = JSON.stringify(rawdata);
          params = {Data: data, PartitionKey: "<PARTITION_KEY>", StreamName: "<STREAM_NAME>"};
          kinesis.putRecord(params, (err, data) => {
          if (err) console.error(err);
          else console.log("data sent");
          });  
      }
      

      现在,在exports.handler 中调用函数:

      writeKinesis(<YOUR_DATA>);
      

      需要注意的几点... Kinesis 要摄取数据,必须对其进行编码。在下面的示例中,我具有从 CloudWatch 获取日志并将它们发送到 Kinesis 流的函数。

      请注意,我将 buffer.toString('utf8') 的内容插入到 writeKinesis 函数中:

      exports.handler = function(input, context) {
          ...
          var zippedInput = new Buffer(input.awslogs.data, 'base64');
          zlib.gunzip(zippedInput, function(error, buffer) {
              ...
              writeKinesis(buffer.toString('utf8'));
              ...
          }
          ...
      }
      

      最后,在 IAM 中配置适当的权限。您的 Lambda 函数必须在包含以下以下权限的 IAM 角色的上下文中运行。就我而言,我只是修改了默认的 lambda_elasticsearch_execution 角色,以包含一个名为“lambda_kinesis_execution”的策略,代码如下:

      "Effect": "Allow",
      "Action": [
          "kinesis:*"
      ],
      "Resource": [
          "<YOUR_STREAM_ARN>"
      ]
      

      【讨论】:

        猜你喜欢
        • 2016-11-13
        • 2016-09-22
        • 2018-11-20
        • 2018-09-07
        • 1970-01-01
        • 2018-04-13
        • 1970-01-01
        • 2015-11-08
        • 1970-01-01
        相关资源
        最近更新 更多