【问题标题】:Kafka Streams: POJO serialization/deserializationKafka Streams:POJO 序列化/反序列化
【发布时间】:2018-05-16 17:58:39
【问题描述】:

我们可以使用 Kafka Streams 中的哪些类/方法将 Java 对象序列化/反序列化为字节数组,反之亦然?以下链接提出了 ByteArrayOutputStream 和 ObjectOutputStream 的用法,但它们不是线程安全的。

Send Custom Java Objects to Kafka Topic

还有另一个使用 ObjectMapper 的选项,ObjectReader(用于线程安全),但这是从 POJO -> JSON -> bytearray 转换的。似乎这个选项是一个广泛的选项。想检查是否有直接的方法将对象转换为字节数组,反之亦然,这是线程安全的。请推荐

import org.apache.kafka.common.serialization.Serializer;
public class HouseSerializer<T> implements Serializer<T>{
    private Class<T> tClass;
    public HouseSerializer(){

    }

    @SuppressWarnings("unchecked")
    @Override
    public void configure(Map configs, boolean isKey) {
        tClass = (Class<T>) configs.get("POJOClass");       
    }

    @Override
    public void close() {
    }

    @Override
    public byte[] serialize(String topic, T data) {
        //Object serialization to be performed here
        return null;
    }
}


注意:Kafka 版本 - 0.10.1

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    想检查是否有直接的方法将对象转换为字节数组

    如果可能,我建议您将Avro serialization 与 Confluent Schema Registry 一起使用,但不是必需的。 JSON 是一个很好的后备方案,但会占用更多“在线”空间,因此MsgPack 将是那里的替代方案。

    See Avro code example here

    以上示例使用avro-maven-pluginsrc/main/resources/avro 架构文件生成一个LogLine 类。


    否则how to serialize your object into a byte array由你自己决定,比如一个String一般打包成

    [(length of string) (UTF8 encoded bytes)]
    

    虽然布尔值是单个 0 或 1 位

    线程安全

    我理解您的担忧,但您通常不会在线程之间共享反序列化数据。您为每个独立的消息发送/读取/处理一条消息。

    【讨论】:

    • 感谢以上回复。我知道每条消息本身都是 Kafka 对象的一个​​单独实例,但是为了将其转换为 Java 对象然后序列化/反序列化,我们可能会遇到竞争条件。我希望是否有办法在上述类方法“序列化”中将数据值转换为字节数组
    • Avro/JSON 可以为您做到这一点,并且可以跨多种不同的语言使用,而不仅仅是“默认 Java 对象序列化”。你不需要自己去一个字节数组。如果使用 JSON,只需使用 StringSerializer
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-08-11
    • 2019-08-11
    • 2020-08-17
    • 2018-07-11
    • 1970-01-01
    • 2021-10-21
    相关资源
    最近更新 更多