【发布时间】:2018-08-07 21:27:00
【问题描述】:
使用最新的 kafka 和 confluent jdbc sink 连接器。发送一个非常简单的 Json 消息:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "msg"
}
],
"optional": false,
"name": "msgschema"
},
"payload": {
"id": 222,
"msg": "hi"
}
}
但出现错误:
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
Jsonlint 说 Json 是有效的。我在 kafka 配置中保留了 json schemas.enable=true。有什么指点吗?
【问题讨论】:
-
请您发布您的连接器和连接工作人员配置
-
我正在使用命令启动 kafka 连接:
./bin/connect-standalone.sh config/connect-standalone.properties config/sink-mysql.properties -
connect-standalone.properties:
bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000 -
sink-mysql.properties:
name=test-sink-mysql-jdbc connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=2 connection.url=jdbc:mysql://localhost:3306/testdb connection.user=testuser connection.password=testuser pk.mode=record_value pk.fields=id insert.mode=upsert auto.create=true auto.evolve=false topics=mynewtopic offset.storage.file.filename=/tmp/mysql-sink.offsets -
我没有使用架构注册表或 avro。我在类路径中添加了 Confluent jdbc 连接器 jar 和 mysql jdbc jar
标签: json jdbc apache-kafka apache-kafka-connect