【问题标题】:kafka connect JDBC sink. Error flattening JSON recordskafka 连接 JDBC 接收器。展平 JSON 记录时出错
【发布时间】:2020-01-12 04:05:38
【问题描述】:

我正在使用 Kafka 连接 JDBC Sink Connector 将来自主题的数据存储到 SQL Server 表中。数据需要展平。我根据 Confluent 的示例 provided 创建了一个 SQL Server 表和一个 JSON 记录。

所以我的记录是这样的:

{
    "payload":{ 
        "id": 42,
        "name": {
          "first": "David"
        }
    },
    "schema": {
        "fields": [
            {
                "field": "id",
                "optional": true,
                "type": "int32"
            },
            {
                "name": "name",
                "optional": "false",
                "type": "struct",
                "fields": [
                    {
                        "field": "first",
                        "optional": true,
                        "type": "string"
                    }
                ]
            }
        ],
        "name": "Test",
        "optional": false,
        "type": "struct"
    }   
}

如您所见,我想展平连接分隔符“_”的字段。所以我的 Sink Connector 配置如下:

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
table.name.format=MyTable
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
topics=myTopic
tasks.max=1
transforms=flatten
value.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
connection.url=jdbc:sqlserver:[url]
transforms.flatten.delimiter=_

当我在主题中写该记录时,我得到以下异常:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Struct schema's field name not specified properly
    at org.apache.kafka.connect.json.JsonConverter.asConnectSchema(JsonConverter.java:512)
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:360)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more

对于不需要展平的记录,接收器连接器可以正常工作。配置有什么问题吗?是否可以使用架构展平 JSON 文件?

附: Kafka连接版本:5.3.0-css

任何帮助将不胜感激。

【问题讨论】:

    标签: jdbc apache-kafka apache-kafka-connect confluent-platform


    【解决方案1】:

    好的,问题是嵌套字段的字段名称。正确的字段名称是“field”,而不是“name”:

    {
        "payload":{ 
            "id": 42,
            "name": {
              "first": "David"
            }
        },
        "schema": {
            "fields": [
                {
                    "field": "id",
                    "optional": true,
                    "type": "int32"
                },
                {
                    **"field": "name",**
                    "optional": "false",
                    "type": "struct",
                    "fields": [
                        {
                            "field": "first",
                            "optional": true,
                            "type": "string"
                        }
                    ]
                }
            ],
            "name": "Test",
            "optional": false,
            "type": "struct"
        }   
    }
    

    【讨论】:

      猜你喜欢
      • 2019-06-17
      • 2020-01-11
      • 2020-01-03
      • 2019-11-15
      • 2019-11-03
      • 1970-01-01
      • 2018-05-01
      • 2021-05-07
      • 2018-07-11
      相关资源
      最近更新 更多