【问题标题】:Is it possible to perform PutRecord with Kinesis FireHose on AWS Lambda using .Net Core 2.2?是否可以使用 .Net Core 2.2 在 AWS Lambda 上使用 Kinesis FireHose 执行 PutRecord?
【发布时间】:2020-05-22 12:35:38
【问题描述】:

之所以问这个问题是因为我没有看到任何官方文件提到从 AWS Lambda 函数到 FireHose 执行 PutRecord。我想在 Kinesis FireHose 上从 AWS Lambda 执行 PutRecord。我还为我尝试从中进行 PutRecord 的 AWS Lambda 函数提供了适当的 PutRecord 策略。当使用 .Net 2.2 从 AWS Lambda 执行 PutRecord 操作时,我收到以下错误

用户:arn:aws:sts::accountnumber:assumed-role/listener-role/lambda 无权执行:kinesis:PutRecord on resource:arn:aws:kinesis:us-west-1:accountnumber:assumed :stream/firehose-stream

我的政策如下

{
  "permissionsBoundary": {},
  "roleName": "listener-role",
  "policies": [
    {
      "document": {
        "Version": "2012-10-17",
        "Statement": [
          {....},
          {
            "Effect": "Allow",
            "Action": [
              "firehose:PutRecord",
              "firehose:PutRecordBatch"
            ],
            "Resource": [
              "*"
            ]
          }
        ]
      },
      "name": "policy",
      "type": "inline"
    }
  ],
  "trustedEntities": [
    "lambda.amazonaws.com"
  ]
}

.Net 被截断以在 Kinesis FireHose 上记录

_kinesisClient 是 AmazonKinesisClient

        MemoryStream recordStream = new MemoryStream();
        IFormatter formatter = new BinaryFormatter();
        formatter.Serialize(recordStream, data);
        var request = new PutRecordRequest
        {
            PartitionKey = Guid.NewGuid().ToString(),
            Data = recordStream,
            StreamName = Environment.GetEnvironmentVariable("KinesisStream")
        };
        await _kinesisClient.PutRecordAsync(request);

【问题讨论】:

    标签: amazon-web-services aws-lambda amazon-iam amazon-kinesis amazon-kinesis-firehose


    【解决方案1】:

    您正试图将数据放入 Kinesis 数据流。您的策略允许您将数据放入 Kinesis Firehose。由于 Kinesis 的不同风格,这可能会有些混乱。如果您确实尝试将数据放入 Kinesis 数据流,则应将策略操作更改为 kinesis:Put*

    另一方面,如果您想将数据放入 Kinesis Firehose,请将您的 .NET 代码更改为类似这样(我不是 .NET 专家):

    var putRecordRequest = new PutRecordRequest();
    var deliveryStreamName = Environment.GetEnvironmentVariable("KinesisStream");
    
    putRecordRequest.setDeliveryStreamName(deliveryStreamName);
    
    var record = new Record().withData(ByteBuffer.wrap(data.getBytes()));
    
    putRecordRequest.setRecord(record);
    
    // Put record into the DeliveryStream
    firehoseClient.putRecord(putRecordRequest);
    

    【讨论】:

    • 你指引我的方向是正确的。我使用了错误的客户端来记录。谢谢!
    【解决方案2】:

    我使用错误的客户端在 Kinesis Firehose 上放置记录。 KinesisFireHose 客户端看起来像这样。

    Nuget 包:AWSSDK.KinesisFirehose" 版本="3.3.103.28"

    serviceCollection.AddScoped<IAmazonKinesisFirehose, AmazonKinesisFirehoseClient>();
    

    使用依赖注入的 IAmazonKinesisFirehose

    var data = "{\"casenumber\": \"" + 123 + "\"}";
    
    // convert string to stream
    var byteArray = Encoding.UTF8.GetBytes(data);
    
    var putRecordRequest = new PutRecordRequest {
     DeliveryStreamName = Environment.GetEnvironmentVariable("KinesisFirehose"), // AWS console -> Data FIrehose -> "Firehose delivery streams" 
      Record = new Record {
       Data = new MemoryStream(byteArray)
      }
    };
    
    // Put record into the DeliveryStream
    Console.WriteLine($ "PutRecordAsync: {data}");
    
    Console.WriteLine("Writing EmitScanDataToKinesisAsync");
    await _fireHoseClient.PutRecordAsync(putRecordRequest);
    Console.WriteLine("End EmitScanDataToKinesisAsync");
    

    【讨论】:

    • 很好,你解决了!您可能应该将环境变量“KinesisStream”重命名为“DeliveryStream”。这样可以更清楚地表明您在谈论 Firehose,而不是 Kinesis Data Stream。
    猜你喜欢
    • 2017-05-08
    • 2020-02-08
    • 2016-01-17
    • 2018-07-02
    • 2021-02-17
    • 2018-11-23
    • 2017-09-06
    • 2021-09-14
    • 2021-06-15
    相关资源
    最近更新 更多