【问题标题】:Kafka jdbc sink connector with json schema not working带有json模式的Kafka jdbc接收器连接器不起作用
【发布时间】:2018-08-07 21:27:00
【问题描述】:

使用最新的 kafka 和 confluent jdbc sink 连接器。发送一个非常简单的 Json 消息:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "msg"
            }
        ],
        "optional": false,
        "name": "msgschema"
    },
    "payload": {
        "id": 222,
        "msg": "hi"
    }
}

但出现错误:

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

Jsonlint 说 Json 是有效的。我在 kafka 配置中保留了 json schemas.enable=true。有什么指点吗?

【问题讨论】:

  • 请您发布您的连接器和连接工作人员配置
  • 我正在使用命令启动 kafka 连接:./bin/connect-standalone.sh config/connect-standalone.properties config/sink-mysql.properties
  • connect-standalone.properties: bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000
  • sink-mysql.properties: name=test-sink-mysql-jdbc connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=2 connection.url=jdbc:mysql://localhost:3306/testdb connection.user=testuser connection.password=testuser pk.mode=record_value pk.fields=id insert.mode=upsert auto.create=true auto.evolve=false topics=mynewtopic offset.storage.file.filename=/tmp/mysql-sink.offsets
  • 我没有使用架构注册表或 avro。我在类路径中添加了 Confluent jdbc 连接器 jar 和 mysql jdbc jar

标签: json jdbc apache-kafka apache-kafka-connect


【解决方案1】:

您需要告诉 Connect 您的架构嵌入在您使用的 JSON 中。

你有:

value.converter=org.apache.kafka.connect.json.JsonConverter 

但也需要:

value.converter.schemas.enable=true

【讨论】:

  • 对接收器属性文件进行了上述更改 (value.converter.schemas.enable=false),但同样的错误。然后,改为对独立属性文件进行相同的更改 - 仍然是相同的错误。然后在独立属性文件中还设置 key.converter.schemas.enable=false。现在看不到上面的错误,但是相反,我看到了以下错误:org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: mynewtopic(我仍在传递相同的 JSON字符串作为消息,发送给 kafka 控制台生产者)
  • 最后这行得通:我必须设置 key.converter.schemas.enable=false AND value.converter.schemas.enable=true 。我必须在standalone.properties 中进行这些更改——我仍然无法覆盖sink.properties 中的这些设置,但这是一个单独的问题。感谢大家指出正确的方向。
  • 如果您的密钥没有架构,那么是的,key.converter.schemas.enable=false 是有意义的。请记住将答案标记为正确,以帮助其他人在将来找到它。
【解决方案2】:

为了使用 JDBC 接收器,您的流式消息必须具有架构。这可以通过将 Avro 与 Schema Registry 结合使用,或通过将 JSON 与模式结合使用来实现。如果在初始运行源属性文件后配置了schemas.enable=true,您可能需要删除主题,重新运行接收器,然后再次启动源端。

示例:

sink.properties文件

name=sink-mysql
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=test-mysql-jdbc-foobar
connection.url=jdbc:mysql://127.0.0.1:3306/demo?user=user1&password=user1pass
auto.create=true

还有一个示例工作者配置文件connect-avro-standalone.properties

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets

plugin.path=share/java

并执行

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/sink.properties

【讨论】:

  • 我使用的配置与您在此处建议的配置几乎相同(在上面的 cmets 中添加),但仍然出现异常。如前所述,我暂时没有使用模式注册表或 avro。
  • @prabhas 尝试将--property print.key=true 包含在您的控制台使用者命令中。如果它打印出null 键,那么这就是问题,因为 JSON 无法解码空键。
  • 好的,谢谢,这可能是个问题。我正在向控制台生产者发送一个 json 字符串,并期望 kafka jdbc connect 将其加载到数据库表中。但是现在我看到 Json 本身并没有指定哪个是关键 - 我有两个字段 id 和 msg。显然,我应该在内联 json 模式中指定键,以便它有机会工作。如何指定内联架构中的键?
  • /bin/kafka-console-producer.sh \ --broker-list localhost:9092 \ --topic my-topic \ --property "parse.key=true" \ --property "key.separator=:" "thsisIsKey1":JSON 请注意,在此示例中,密钥不是 JSON 格式。如果是这种情况,那么您需要在工作配置文件中设置 key.converter=org.apache.kafka.connect.storage.StringConverter
猜你喜欢
  • 2018-02-06
  • 1970-01-01
  • 2019-11-17
  • 1970-01-01
  • 2019-06-17
  • 2020-01-11
  • 2020-08-08
  • 2019-11-15
  • 1970-01-01
相关资源
最近更新 更多