我需要发送这些带有 Kafka 消息头的消息类型吗?
消费者可以用这种类型信息反序列化这些消息吗?
如果您的 KafkaConsumer 仅使用来自特定主题的消息以及特定类型(类)的消息,那么您可以在反序列化器配置中配置该类,例如 value.class、key.class等在您的配置中,您可以使用 configs.get("value.class") 或 configs.get("key.class") 在 Deserializer 中使用 configure() 获得,然后将它们存储在成员变量中。
void configure(java.util.Map<java.lang.String,?> configs,
boolean isKey)
如果你的主题包含不同类型的消息,或者你的消费者订阅了不同的主题,每个主题都有不同类型的消息,那么将类存储在 Headers 中应该是合适的。
另一种选择是编写一个包装类。
class MessageWrapper {
private Class messageClass;
private byte[] messageData;
ProtobufSchema schema;
}
然后在数据中你可以反序列化MessageWrapper。这里messageData 类型可以是Person、Data 或Event,messageClass 应该可以帮助您进行解析。
例如,
mapper.readerFor(messageWrapper.getMessageClass())
.with(messageWrapper.getSchema())
.readValue(messageWrapper.getMessageData());
一旦你拿到对象,你可以检查它是instanceof Person or Event or Data
您也可以查看Generating Protobuf schema from POJO definition 并省略MessageWrapper 中的schema 字段
片段
ProtobufMapper mapper = new ProtobufMapper()
ProtobufSchema schemaWrapper = mapper.generateSchemaFor(messageWrapper.getMessageClass())
NativeProtobufSchema nativeProtobufSchema = schemaWrapper.getSource();
String asProtofile = nativeProtobufSchema.toString();