【问题标题】:Data corruption in Kinesis ProducerKinesis Producer 中的数据损坏
【发布时间】:2020-06-27 00:52:43
【问题描述】:

我正在使用 kinesis producer 在 AWS Cloudwatch 中插入一些分析数据。 这是整个数据流: 1. Kinesis生产者(使用java SDK)-> Kinesis Stream-> AWS lambda函数-> AWS Cloudwatch

Kinesis 中使用的生产者版本:

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-producer</artifactId>
    <version>0.13.1</version>
</dependency>

使用以下代码将我的分析数据推送给生产者后,

kinesisProducer.addUserRecord(stream, "101", ByteBuffer.wrap(requestString.getBytes()));

正在发送给生产者的数据:

{
    "version": null,
    "namespace": "namespace",
    "metricdata": [
        {
            "name": "system",
            "unit": "Megabytes",
            "value": 879,
            "timestamp": 1590233414481,
            "dimensions": [
                {
                    "name": "systemName",
                    "value": "ramTotal"
                }
            ],
            "type": "gauge"
        }]
}

我知道,数据是由kinesis producer转换成base64格式的。因此 像这样解码后在 lambda 函数中接收到的数据 -

const payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii');

payload 是这样来的

s   B
101fGaG
{
    "version": null,
    "namespace": "`$\u0017`$>`$0o?=o?=`$!`$\u0010`$*",
    "metricdata": [
        {
            "name": "system",
            "unit": "Megabytes",
            "value": 879,
            "timestamp": 1590233414481,
            "dimensions": [
                {
                    "name": "systemName",
                    "value": "ramTotal"
                }
            ],
            "type": "gauge"
        }]
}

谁能帮我解决这个问题。

【问题讨论】:

    标签: encoding aws-lambda amazon-kinesis


    【解决方案1】:

    Kinesis Producer 将小记录聚合成最大 1MB 的较大记录。 Kinesis 消费者必须在处理之前对记录进行分解。

    reference link

    de-aggregation library

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-11-21
      • 1970-01-01
      • 1970-01-01
      • 2021-10-30
      • 2016-12-22
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多