【发布时间】:2020-01-12 04:05:38
【问题描述】:
我正在使用 Kafka 连接 JDBC Sink Connector 将来自主题的数据存储到 SQL Server 表中。数据需要展平。我根据 Confluent 的示例 provided 创建了一个 SQL Server 表和一个 JSON 记录。
所以我的记录是这样的:
{
"payload":{
"id": 42,
"name": {
"first": "David"
}
},
"schema": {
"fields": [
{
"field": "id",
"optional": true,
"type": "int32"
},
{
"name": "name",
"optional": "false",
"type": "struct",
"fields": [
{
"field": "first",
"optional": true,
"type": "string"
}
]
}
],
"name": "Test",
"optional": false,
"type": "struct"
}
}
如您所见,我想展平连接分隔符“_”的字段。所以我的 Sink Connector 配置如下:
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
table.name.format=MyTable
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
topics=myTopic
tasks.max=1
transforms=flatten
value.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
connection.url=jdbc:sqlserver:[url]
transforms.flatten.delimiter=_
当我在主题中写该记录时,我得到以下异常:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Struct schema's field name not specified properly
at org.apache.kafka.connect.json.JsonConverter.asConnectSchema(JsonConverter.java:512)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:360)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
对于不需要展平的记录,接收器连接器可以正常工作。配置有什么问题吗?是否可以使用架构展平 JSON 文件?
附: Kafka连接版本:5.3.0-css
任何帮助将不胜感激。
【问题讨论】:
标签: jdbc apache-kafka apache-kafka-connect confluent-platform