【问题标题】:When serializing a Kafka Avro object I then get "org.apache.kafka.common.errors.SerializationException: Unknown magic byte!"序列化 Kafka Avro 对象时,我会得到“org.apache.kafka.common.errors.SerializationException:未知魔术字节!”
【发布时间】:2021-07-27 21:55:00
【问题描述】:

我有一个 Java Spring 应用程序,它使用 Kafka 来使用 Avro 消息。

在我的测试中,我将 Avro 对象序列化为 byte[] 并将其与 KafkaTemplate<String, byte[]> 一起发送到主题。

这就是我序列化对象的方式:

private static byte[] serialize(Product product) {
    var specificDatumWriter = new SpecificDatumWriter<>(Product.class);
    try (var byteArrayOutputStream = new ByteArrayOutputStream()) {
        var binaryEncoder = EncoderFactory.get().directBinaryEncoder(byteArrayOutputStream, null);
        specificDatumWriter.write(product, binaryEncoder);
        binaryEncoder.flush();
        return byteArrayOutputStream.toByteArray();
    } catch (IOException e) {
        throw new IllegalStateException(e);
    }
}

kafkaTemplate.send("products", uuid.toString(), serialize(product));

我已经按照这个例子来序列化对象:https://www.baeldung.com/java-apache-avro#1-serialization

但是,当我的 @KafkaListener 尝试反序列化该消息时,我收到此错误消息:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2060)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2045)
    ...
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
    at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer.deserializationException(ErrorHandlingDeserializer.java:216)
    at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer.deserialize(ErrorHandlingDeserializer.java:191)
    ...
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1232)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1127)
    ... 4 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

我错过了什么?我在序列化方面做错了什么?

【问题讨论】:

    标签: java serialization apache-kafka avro spring-kafka


    【解决方案1】:

    我现在已经开始工作了。我必须使用KafkaAvroSerializer 来序列化对象。

    private byte[] serialize(Product product) {
        try (var kafkaAvroSerializer = new KafkaAvroSerializer()) {
            Map<String, Object> props = new HashMap<>();
            props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
            props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
            props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class);
            kafkaAvroSerializer.configure(props, false);
            return kafkaAvroSerializer.serialize("products", product);
        }
    }
    

    另见:

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-05-19
      • 2019-12-31
      • 1970-01-01
      • 2020-06-02
      • 2019-06-24
      • 2018-06-07
      • 2020-07-06
      • 2019-12-19
      相关资源
      最近更新 更多