【问题标题】:Which class in AWS CDK have option to configure Dynamic partitioning for Kinesis delivery streamAWS CDK 中的哪个类可以选择为 Kinesis 传输流配置动态分区
【发布时间】:2021-11-01 10:41:33
【问题描述】:

我正在使用 kinesis 传输流将流从事件桥发送到 s3 存储桶。但我似乎找不到哪个类可以配置动态分区?

这是我的传输流代码:

new CfnDeliveryStream(this, `Export-delivery-stream`, {
        s3DestinationConfiguration: {
            bucketArn: bucket.bucketArn,
            roleArn: kinesisFirehoseRole.roleArn,
            prefix: `test/!{timestamp:yyyy/MM/dd}/`

        }
    });

【问题讨论】:

  • 我在 aws-cdk 中还没有看到动态分区配置选项。但它适用于 aws-cli 和 cloudformation

标签: amazon-s3 aws-cdk amazon-kinesis amazon-kinesis-firehose


【解决方案1】:

我在同一个问题上工作了几天,终于找到了工作。这是一个如何在 CDK 中实现的示例。简而言之,必须像您所做的那样启用分区,但是您需要在所谓的 processingConfiguration 中设置 key 和 .jq 表达式。

我们传入的 json 数据如下所示:

{
    "data": 
        {
        "timestamp":1633521266990,
        "defaultTopic":"Topic",
        "data":
        {
            "OUT1":"Inactive",
            "Current_mA":3.92
        }
    }
}

CDK 代码如下:

const DeliveryStream = new CfnDeliveryStream(this, 'deliverystream', {
  deliveryStreamName: 'deliverystream',
  extendedS3DestinationConfiguration: {
    cloudWatchLoggingOptions: {
      enabled: true,
    },
    bucketArn: Bucket.bucketArn,
    roleArn: deliveryStreamRole.roleArn,
    prefix: 'defaultTopic=!{partitionKeyFromQuery:defaultTopic}/!{timestamp:yyyy/MM/dd}/',
    errorOutputPrefix: 'error/!{firehose:error-output-type}/',
    bufferingHints: {
      intervalInSeconds: 60,
    },
    dynamicPartitioningConfiguration: {
      enabled: true,
    },
    processingConfiguration: {
      enabled: true,
      processors: [
        {
          type: 'MetadataExtraction',
          parameters: [
            {
              parameterName: 'MetadataExtractionQuery',
              parameterValue: '{Topic: .data.defaultTopic}',
            },
            {
              parameterName: 'JsonParsingEngine',
              parameterValue: 'JQ-1.6',
            },
          ],
        },
        {
          type: 'AppendDelimiterToRecord',
          parameters: [
            {
              parameterName: 'Delimiter',
              parameterValue: '\\n',
            },
          ],
        },
      ],
    },
  },
})

【讨论】: