【问题标题】:Processing DynamoDB streams using the AWS Java DynamoDB streams Kinesis adapter使用 AWS Java DynamoDB 流 Kinesis 适配器处理 DynamoDB 流
【发布时间】:2017-02-24 22:17:06
【问题描述】:

我正在尝试使用 DynamoDB 流和 AWS 提供的 Java DynamoDB 流 Kinesis 适配器来捕获 DynamoDB 表更改。我正在使用 Scala 应用程序中的 AWS Java 开发工具包。

我首先关注 AWS guide,然后浏览 AWS 发布的 code example。但是,我在让亚马逊自己发布的代码在我的环境中工作时遇到问题。我的问题在于 KinesisClientLibConfiguration 对象。

在示例代码中,KinesisClientLibConfiguration 配置了 DynamoDB 提供的流 ARN。

new KinesisClientLibConfiguration("streams-adapter-demo",
    streamArn, 
    streamsCredentials, 
    "streams-demo-worker")

我在我的 Scala 应用程序中遵循了类似的模式,首先从我的 Dynamo 表中找到当前 ARN:

lazy val streamArn = dynamoClient.describeTable(config.tableName)
.getTable.getLatestStreamArn

然后使用提供的 ARN 创建KinesisClientLibConfiguration

lazy val kinesisConfig :KinesisClientLibConfiguration =
new KinesisClientLibConfiguration(
  "testProcess",
  streamArn,
  defaultProviderChain,
  "testWorker"
).withMaxRecords(1000)
   .withRegionName("eu-west-1")
   .withMetricsLevel(MetricsLevel.NONE)
  .withIdleTimeBetweenReadsInMillis(500)
  .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)

我已经验证了提供的流 ARN,并且所有内容都与我在 AWS 控制台中看到的相符。

在运行时我最终得到一个异常,指出提供的 ARN 不是有效的流名称:

com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask call
SEVERE: Caught exception while sync'ing Kinesis shards and leases
com.amazonaws.services.kinesis.model.AmazonKinesisException: 1 validation     
error detected: Value 'arn:aws:dynamodb:eu-west-1:STREAM ARN' at 
'streamName'    failed to satisfy constraint: Member must satisfy regular 
expression pattern: [a-zA-Z0-9_.-]+ (Service: AmazonKinesis; Status Code: 
400; Error Code: ValidationException; Request ID: )

查看KinesisClientLibConfiguration 上提供的文档确实有道理,因为第二个参数被列为 streamName 而没有提及 ARN。

我似乎在KinesisClientLibConfiguration 上找不到与 ARN 相关的任何内容。由于我使用的是 DynamoDB 流而不是 Kinesis 流,因此我也不确定如何找到我的流名称。

此时我不确定我在已发布的 AWS 示例中遗漏了什么,似乎他们可能使用的是更旧版本的 KCL。我正在使用 1.7.0 版的 amazon-kinesis-client。

【问题讨论】:

    标签: java scala amazon-web-services amazon-dynamodb amazon-kinesis


    【解决方案1】:

    这个问题实际上超出了我的KinesisClientLibConfiguration

    我能够通过使用相同的配置并提供 DynamoDB 流适配器库中包含的流适配器以及 DynamoDB 和 CloudWatch 的客户端来解决这个问题。

    我的工作解决方案现在看起来像这样。

    定义 Kinesis 客户端配置。

    //Kinesis config for DynamoDB streams
    lazy val kinesisConfig :KinesisClientLibConfiguration =
        new KinesisClientLibConfiguration(
            getClass.getName, //DynamoDB shard lease table name
            streamArn, //pulled from the dynamo table at runtime
            dynamoCredentials, //DefaultAWSCredentialsProviderChain 
            KeywordTrackingActor.NAME //Lease owner name
        ).withMaxRecords(1000) //using AWS recommended value
         .withIdleTimeBetweenReadsInMillis(500) //using AWS recommended value
        .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
    

    定义流适配器和 CloudWatch 客户端

    val streamAdapterClient :AmazonDynamoDBStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoCredentials)
    streamAdapterClient.setRegion(region)
    
    val cloudWatchClient :AmazonCloudWatchClient = new AmazonCloudWatchClient(dynamoCredentials)
    cloudWatchClient.setRegion(region)
    

    创建一个RecordProcessorFactory 的实例,由您来定义一个实现KCL 提供的IRecordProcessorFactory 和返回的IRecordProcessor 的类。

    val recordProcessorFactory :RecordProcessorFactory = new RecordProcessorFactory(context, keywordActor, config.keywordColumnName)
    

    而我缺少的部分,所有这些都需要提供给你的工人。

    val worker :Worker =
      new Worker.Builder()
        .recordProcessorFactory(recordProcessorFactory)
        .config(kinesisConfig)
        .kinesisClient(streamAdapterClient)
        .dynamoDBClient(dynamoClient)
        .cloudWatchClient(cloudWatchClient)
        .build()
    
    //this will start record processing
    streamExecutorService.submit(worker)
    

    【讨论】:

      【解决方案2】:

      或者,您可以使用com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorker 代替com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker 内部使用AmazonDynamoDBStreamsAdapterClient

      lazy val kinesisConfig :KinesisClientLibConfiguration =
      new KinesisClientLibConfiguration(
          getClass.getName, //DynamoDB shard lease table name
          streamArn, //pulled from the dynamo table at runtime
          dynamoCredentials, //DefaultAWSCredentialsProviderChain 
          KeywordTrackingActor.NAME //Lease owner name
      ).withMaxRecords(1000) //using AWS recommended value
       .withIdleTimeBetweenReadsInMillis(500) //using AWS recommended value
      .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
      
      val worker = new com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorker(recordProcessorFactory, kinesisConfig)
      

      【讨论】:

        【解决方案3】:

        只是为了回答问题所在 - 当它只需要流名称时,您提供了 ARN。

        【讨论】:

          【解决方案4】:

          我最近对这个项目进行了 PR gfc-aws-kinesis,现在您只需传递适配器并编写 KinesisRecordAdapter 实现即可使用它。

          在示例中,我使用 Scanamo 来解析 hashmap

          创建客户端

          val streamAdapterClient: AmazonDynamoDBStreamsAdapterClient =
              new AmazonDynamoDBStreamsAdapterClient()
          

          在配置中传递:

          val streamConfig = KinesisStreamConsumerConfig[Option[A]](
            applicationName,
            config.stream, //the full dynamodb stream arn
            regionName = Some(config.region),
            checkPointInterval = config.checkpointInterval,
            initialPositionInStream = config.streamPosition,
            dynamoDBKinesisAdapterClient = Some(streamAdapterClient)
          )
          KinesisStreamSource(streamConfig).mapMaterializedValue(_ => NotUsed)
          

          创建一个适合读取 dynamodb 事件的隐式记录读取器:

          implicit val kinesisRecordReader
            : KinesisRecordReader[Option[A]] =
            new KinesisRecordReader[Option[A]] {
              override def apply(record: Record): Option[A] = {
                record match {
                  case recordAdapter: RecordAdapter =>
                    val dynamoRecord: DynamoRecord =
                      recordAdapter.getInternalObject
                    dynamoRecord.getEventName match {
                      case "INSERT" =>
                        ScanamoFree
                          .read[A](
                            dynamoRecord.getDynamodb.getNewImage)
                          .toOption
                      case _ => None
                    }
                  case _ => None
                }
              }
            }
          

          【讨论】:

          • 您应该通过在此处添加示例和简短说明来改进您的答案。也许阅读this 可以帮助您改进答案。
          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 2020-12-14
          • 1970-01-01
          • 1970-01-01
          • 2018-11-20
          • 1970-01-01
          • 1970-01-01
          • 2022-12-01
          相关资源
          最近更新 更多