【问题标题】:JSON to AVRO deserialization in KSQL error : Skipping record due to deserialization errorKSQL 错误中的 JSON 到 AVRO 反序列化:由于反序列化错误而跳过记录
【发布时间】:2019-11-19 16:39:48
【问题描述】:

我已经在 AWS 上建立了一个融合平台。我的来源是 MySql,我使用 debezium 连接器将它连接到 Kafka 连接。来自源的数据格式是 JSON。现在在 KSQL 中,我创建了一个派生主题并将 JSON 主题转换为 AVRO,以使数据可以使用 JDBC 连接器下沉到 MYSQL。我使用了以下查询:

CREATE STREAM json_stream (userId int, auth_id varchar, email varchar) WITH (KAFKA_TOPIC='test', VALUE_FORMAT='JSON');

派生主题:

create TABLE avro_stream WITH (VALUE_FORMAT='AVRO') AS select * from json_stream;

我曾尝试使用 JSON 消息直接接收到 mysql,但由于连接器需要架构而失败,因此带有架构的 JSON 或 Avro 消息将帮助我接收数据。

从主题 avro_stream 消费时:

 [2019-07-09 13:27:30,239] WARN task [0_3] Skipping record due to
 deserialization error. topic=[avro_stream] partition=[3] offset=[144]
 (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
 org.apache.kafka.connect.errors.DataException: avro_stream     at
 io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
    at
 io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:44)
    at
 io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:26)
    at
 org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
    at
 org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
    at
 org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
    at
 org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at
 org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:97)
    at
 org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at
 org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:638)
    at
 org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:936)
    at
 org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:831)
    at
 org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
    at
 org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
 Caused by: org.apache.kafka.common.errors.SerializationException:
 Error deserializing Avro message for id -1 Caused by:
 org.apache.kafka.common.errors.SerializationException: Unknown magic
 byte!

我的 debezium 连接器配置:

{
"name": "debezium-connector",
"config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.user": "XXXXX",
    "auto.create.topics.enable": "true",
    "database.server.id": "1",
    "tasks.max": "1",
    "database.history.kafka.bootstrap.servers": "X.X.X.X:9092",,
    "database.history.kafka.topic": "XXXXXXX",
    "transforms": "unwrap",
    "database.server.name": "XX-server",
    "database.port": "3306",
    "include.schema.changes": "true",
    "table.whitelist": "XXXX.XXXX",
    "key.converter.schemas.enable": "false",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "database.hostname": "X.X.X.X",
    "database.password": "xxxxxxx",
    "value.converter.schemas.enable": "false",
    "name": "debezium-connector",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "database.whitelist": "XXXXX",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter"
},
"tasks": [
    {
        "connector": "debezium-connector",
        "task": 0
    }
],
"type": "source"

}

【问题讨论】:

  • (1) 能否在问题中包含源连接器 (Debezium) 配置 (2) 为什么不直接使用源中的 Avro,那么就不需要重新序列化?跨度>
  • 我从源代码尝试过 AVRO,但在 KSQL 中执行查询时遇到了序列化问题。由于我是 Kafka 新手,所以我将源数据格式更改为 JSON 并让 KSQL 查询开始工作。我提到了源连接器(Debezium)配置。
  • 如果您是新手,请相信我:从一开始就使用 Avro。混合和匹配 JSON/Avro 是灾难的根源。如果您无法从源代码中运行 Avro,请在此处发布问题,我们将帮助您解决问题。
  • 好的,我会从源代码中再次尝试使用 Avro。但是,仍然想知道上述设置有什么问题。

标签: avro apache-kafka-connect ksqldb


【解决方案1】:

KSQL 将键写入STRING,因此当您使用 Avro 进行值序列化时,键不是。因此,您的 Sink worker 需要这样配置:

"key.converter": "org.apache.kafka.connect.storage.StringConverter"
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "<url to schema registry>",

如果您已将工作器本身配置为使用 Avro,那么您可以仅覆盖连接器配置的 key.converter

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-07-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-02-26
    相关资源
    最近更新 更多