【问题标题】:AWS Kinesis - Empty record in lambdaAWS Kinesis - lambda 中的空记录
【发布时间】:2017-10-16 19:07:21
【问题描述】:

我有以下设置:

API Gateway > Kinesis > Lambda

正在调用 API 网关,但传入 Lambda 函数的数据为空。

exports.handler = (event, context, callback) => {
    event.Records.forEach((record) => {
        console.log('record', record);
    }
}

我得到...的输出

{ kinesisSchemaVersion: '1.0',
partitionKey: 'some-partition-key',
sequenceNumber: '49572842939259722444903461552052718872116185957331369986',
data: 'e30=',
approximateArrivalTimestamp: 1494948079.296 },
eventSource: 'aws:kinesis',
eventVersion: '1.0',
eventID: 'shardId-000000000000:49572842939259722444903461552052718872116185957331369986',
eventName: 'aws:kinesis:record',
invokeIdentityArn: 'arn:aws:iam::************:role/service-role/basic-lambda-execute-role',
awsRegion: 'eu-west-1',
eventSourceARN: 'arn:aws:kinesis:eu-west-1:************:stream/sensor-readings' }

那里的兴趣点是data: 'e30='

如果我从 APIG 方法测试表单调用端点,结果是相同的。它显示了以下响应,在我看来一切都很好。

Execution log for request test-request
Wed May 17 08:36:45 UTC 2017 : Starting execution for request: test-invoke-request
Wed May 17 08:36:45 UTC 2017 : HTTP Method: POST, Resource Path: /reading
Wed May 17 08:36:45 UTC 2017 : Method request path: {}
Wed May 17 08:36:45 UTC 2017 : Method request query string: {}
Wed May 17 08:36:45 UTC 2017 : Method request headers: {}
Wed May 17 08:36:45 UTC 2017 : Method request body before transformations: {
  "version" : "0.01",
  "concentrator": {
    "id": "con123",
    "rssi": -87,
    "snr": 10.1,
    "gwid": "Gateway1"
  },
  "client": {
    "name": "LukesTestDB",
    "site": "TheSite"
  },
  "data": [
    {
      "timestamp": "1486124533248000000",
      "ch0": "10",
      "ch1": "34",
      "ch2": "56",
      "ch3": "78"
    }
  ],
  "device": {
    "id": "DEV_789",
    "rssi": "100",
    "chx": "ch0",
    "asset": "MyAsset",
    "bType": "MyBType",
    "feed": "MyFeed",
    "location": "MyLocation",
    "other": "MyOther",
    "timestamp": "1486124533248000000"
 }
}
Wed May 17 08:36:45 UTC 2017 : Endpoint request URI: https://kinesis.eu-west-1.amazonaws.com/?Action=PutRecord
Wed May 17 08:36:45 UTC 2017 : Endpoint request headers: {Authorization=*******************************************************************************************************************************************************************************************************************************************************************************************************0c8175, X-Amz-Date=20170517T083645Z, x-amzn-apigateway-api-id=aawfu1iss2, Accept=application/json, User-Agent=AmazonAPIGateway_aawfu1iss2, X-Amz-Security-Token=AgoGb3JpZ2luEAQaCWV1LXdlc3QtMSKAAk80yYO4cLZv4RB1ThK+Fc4ZShZ33tUtXfk19XEBCknow94tmnaaTKXDhqd9TM/Fn6yGJC/vHQG3OEV9eWUVtGvx6GYAv98p0NV98jNxIniIncsQP6xcOL+Q4mN1MT7/pXd5WrjCPJntNJ5KKblfmuTlZXYZvp0f7hWK6+wOs2vD0BT3WJh2hMPxfuokfXO4M2rnHhPEcav6MLzxZtA8fnW+6uQoESGS1MnD9Vxua3JEeM7OwK0nWv2118RvIQIgKha+42Y9b3V6VnN6JqXkAOQDBI72YB5PcPXcJYUh1vNOUpGcUwGs/XDoqdHPcQM/yEHM3k7RxRSyug9a93vAhgUqgAIIKhAAGgw0MTk3NjIxMjExMTIiDDH8iXCSo6iQU7aIUCrdAZ1oQ7U6KYLyUONxLMg/YBv0ijO0rLlU4dzAXD5HsjtjKiyfc676EIsqMbP7WqLxdP1lHscMmZE4cnIcGxm94mwFypWyrTYW3ZAhWnuM [TRUNCATED]
Wed May 17 08:36:45 UTC 2017 : Endpoint request body after transformations: {
    "Data": "eyJ2ZXJzaW9uIjoiMC4wMSIsImNvbmNlbnRyYXRvciI6eyJpZCI6ImNvbjEyMyIsInJzc2kiOi04Nywic25yIjoxMC4xLCJnd2lkIjoiR2F0ZXdheTEifSwiY2xpZW50Ijp7Im5hbWUiOiJMdWtlc1Rlc3REQiIsInNpdGUiOiJUaGVTaXRlIn0sImRhdGEiOlt7InRpbWVzdGFtcCI6IjE0ODYxMjQ1MzMyNDgwMDAwMDAiLCJjaDAiOiIxMCIsImNoMSI6IjM0IiwiY2gyIjoiNTYiLCJjaDMiOiI3OCJ9XSwiZGV2aWNlIjp7ImlkIjoiREVWXzc4OSIsInJzc2kiOiIxMDAiLCJjaHgiOiJjaDAiLCJhc3NldCI6Ik15QXNzZXQiLCJiVHlwZSI6Ik15QlR5cGUiLCJmZWVkIjoiTXlGZWVkIiwibG9jYXRpb24iOiJNeUxvY2F0aW9uIiwib3RoZXIiOiJNeU90aGVyIiwidGltZXN0YW1wIjoiMTQ4NjEyNDUzMzI0ODAwMDAwMCJ9fQ==",
    "StreamName": "sensor-readings",
    "PartitionKey": "some-partition-key"
}
Wed May 17 08:36:45 UTC 2017 : Endpoint response body before transformations: {"SequenceNumber":"49572842939259722444903521585275507706013464929106919426","ShardId":"shardId-000000000000"}
Wed May 17 08:36:45 UTC 2017 : Endpoint response headers: {Server=Apache-Coyote/1.1, x-amzn-RequestId=c6c9329d-b519-d631-975f-3f401e1e34d2, x-amz-id-2=l5W9SQ3x4cnc3vF/oKlWoHK8qFsfRzZTK9byAK/Rpo477Of3JSlLPMorjH4/KV0s2BRKQtsVwbvYahkKKqfHtQZ5uHK6dKBs, Content-Length=110, Date=Wed, 17 May 2017 08:36:45 GMT, Content-Type=application/x-amz-json-1.1}
Wed May 17 08:36:45 UTC 2017 : Method response body after transformations: {"SequenceNumber":"49572842939259722444903521585275507706013464929106919426","ShardId":"shardId-000000000000"}
Wed May 17 08:36:45 UTC 2017 : Method response headers: {X-Amzn-Trace-Id=Root=1-591c0b9d-cf38868aada300b710e2ab3d, Content-Type=application/json}
Wed May 17 08:36:45 UTC 2017 : Successfully completed execution
Wed May 17 08:36:45 UTC 2017 : Method completed with status: 200

我们有....

"Data": "eyJ2ZXJzaW9uIjoiMC4wMSIsImNvbmNlbnRyYXRvciI6eyJpZCI6ImNvbjEyMyIsInJzc2kiOi04Nywic25yIjoxMC4xLCJnd2lkIjoiR2F0ZXdheTEifSwiY2xpZW50Ijp7Im5hbWUiOiJMdWtlc1Rlc3REQiIsInNpdGUiOiJUaGVTaXRlIn0sImRhdGEiOlt7InRpbWVzdGFtcCI6IjE0ODYxMjQ1MzMyNDgwMDAwMDAiLCJjaDAiOiIxMCIsImNoMSI6IjM0IiwiY2gyIjoiNTYiLCJjaDMiOiI3OCJ9XSwiZGV2aWNlIjp7ImlkIjoiREVWXzc4OSIsInJzc2kiOiIxMDAiLCJjaHgiOiJjaDAiLCJhc3NldCI6Ik15QXNzZXQiLCJiVHlwZSI6Ik15QlR5cGUiLCJmZWVkIjoiTXlGZWVkIiwibG9jYXRpb24iOiJNeUxvY2F0aW9uIiwib3RoZXIiOiJNeU90aGVyIiwidGltZXN0YW1wIjoiMTQ4NjEyNDUzMzI0ODAwMDAwMCJ9fQ=="

....这是发送到 Kinesis 的负载的 base64 编码版本。

谁能告诉我为什么我在 Lambda 中得到 data: 'e30=' 或者我应该从哪里开始寻找?

非常感谢,

卢克。

【问题讨论】:

  • 我不确定我是否了解您的拓扑。您已将 API Gateway 连接到 Kinesis?这对我来说很奇怪,你能解释一下为什么吗?关于您的问题 - 如果您将 Kinesis 设置为 lambda 的触发器,那么您可以确定 lambda 收到了 Kinesis 记录的内容,问题可能仅存在于 API Gateway 和 Kinesis 之间。
  • @johni - 感谢您的回复。我使用 API 作为 Kinesis (docs.aws.amazon.com/apigateway/latest/developerguide/…) 的代理。我同意认为问题出在从 API 到 Kinesis 的数据之间。但是,响应表明 Kinesis 一切正常。我对 AWS 很陌生,所以很想知道您为什么认为这很“奇怪”。
  • 关于“为什么” - 因为 Kinesis 流不是您希望对任何知道 API 端点的人公开可用的东西 - 我假设您通过将 API 网关连接到流来执行此操作。如果您仍然希望您的客户端能够将记录添加到您的 Kinesis 流,那么您应该将您的 API Gateway 连接到一个新的 lambda,这会插入到 Kinesis 流中(或者有一个服务可以做到这一点,即具有自动缩放功能的虚拟机所以,会便宜得多)。基本上 - 就成本而言 - 您不会希望将 API Gateway 和 Lambda 用于大容量拓扑。
  • @johni - 正确,我确实希望客户端通过 API 将记录添加到 Kinesis 流。我不明白为什么当我可以通过 API 的集成请求“直接”执行时,我会在 API 和 Kinesis 之间放置一个新的 lambda 以将数据推送到流中。 lambda 不是介于做同一件事的不同方式之间吗?我目前由流触发的 lambda(并且没有获取数据)最终会将数据推送到数据库中。
  • 试试这个:` (event.Records || []).forEach((record) => { const dataString = (new Buffer(record.kinesis.data,'base64').toString( 'utf8') || {}); `

标签: lambda aws-lambda amazon-kinesis


【解决方案1】:

好吧,这很尴尬……

记住新手 - FIFO(先进先出)

第一个输入确实是一个空的有效负载,在 lambda 可以处理它之前,不会处理后续数据。

我一直在流中推送新数据,希望它能够立即得到处理。

只见树木不见森林!

我希望这对其他新手有所帮助,或者让某人发笑:)

感谢@driedel - 当我重新阅读您的评论时,一切都变得有意义了。

【讨论】:

    猜你喜欢
    • 2021-02-13
    • 2021-06-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-19
    • 2023-03-13
    相关资源
    最近更新 更多