【问题标题】:How to create a generic deserializer for bytes in Kafka?如何在 Kafka 中为字节创建通用反序列化器?
【发布时间】:2020-06-09 15:25:47
【问题描述】:

我有一个包含多种消息类型的 proto 文件。我想为这些消息创建一个通用的反序列化器。

我是否需要使用 Kafka 消息头发送这些消息类型,以便消费者可以使用此类型信息反序列化这些消息?这是最佳做法还是有其他解决方案?

反序列化方法示例;

public Object deserialize(String topic, Headers headers, byte[] data) {
    if(headers[0].equals("Person")){
        return Person.parseFrom(data);
    } else if....
}

我的原型文件;

message Person {
    uint64 number = 1;
    string name = 2;
}

message Event {
    string msg = 1;
    code = 2;
}

message Data {
    string inf = 1;
    string desc = 2;
}

....

【问题讨论】:

    标签: apache-kafka protocol-buffers


    【解决方案1】:

    我需要发送这些带有 Kafka 消息头的消息类型吗? 消费者可以用这种类型信息反序列化这些消息吗?

    如果您的 KafkaConsumer 仅使用来自特定主题的消息以及特定类型(类)的消息,那么您可以在反序列化器配置中配置该类,例如 value.classkey.class等在您的配置中,您可以使用 configs.get("value.class")configs.get("key.class") 在 Deserializer 中使用 configure() 获得,然后将它们存储在成员变量中。

    void configure(java.util.Map<java.lang.String,?> configs,
                   boolean isKey)
    

    如果你的主题包含不同类型的消息,或者你的消费者订阅了不同的主题,每个主题都有不同类型的消息,那么将类存储在 Headers 中应该是合适的。

    另一种选择是编写一个包装类。

    class MessageWrapper {
       private Class messageClass;
       private byte[] messageData;
       ProtobufSchema schema;
    }
    

    然后在数据中你可以反序列化MessageWrapper。这里messageData 类型可以是PersonDataEventmessageClass 应该可以帮助您进行解析。 例如,

    mapper.readerFor(messageWrapper.getMessageClass())
       .with(messageWrapper.getSchema())
       .readValue(messageWrapper.getMessageData());
    

    一旦你拿到对象,你可以检查它是instanceof Person or Event or Data

    您也可以查看Generating Protobuf schema from POJO definition 并省略MessageWrapper 中的schema 字段

    片段

    ProtobufMapper mapper = new ProtobufMapper()
    ProtobufSchema schemaWrapper = mapper.generateSchemaFor(messageWrapper.getMessageClass())
    NativeProtobufSchema nativeProtobufSchema = schemaWrapper.getSource();
    
    String asProtofile = nativeProtobufSchema.toString();
    

    【讨论】:

      猜你喜欢
      • 2018-07-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-09-08
      • 2020-02-20
      • 2019-09-13
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多