【发布时间】:2020-05-07 08:06:50
【问题描述】:
我正在使用“自动”反序列化器使用来自 Kafka 的 Avro 序列化消息,例如:
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroDeserializer"
);
props.put("schema.registry.url", "https://example.com");
这非常有效,并且位于 https://docs.confluent.io/current/schema-registry/serializer-formatter.html#serializer 的文档中。
我面临的问题是我实际上只是想转发这些消息,但要进行路由,我需要一些内部元数据。一些技术限制意味着我无法编译生成的类文件以使用KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG => true,因此我使用常规解码器而不绑定到 Kafka,特别是将字节读取为Array[Byte] 并将它们传递给手动构造的反序列化器:
var maxSchemasToCache = 1000;
var schemaRegistryURL = "https://example.com/"
var specificDeserializerProps = Map(
"schema.registry.url"
-> schemaRegistryURL,
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG
-> "false"
);
var client = new CachedSchemaRegistryClient(
schemaRegistryURL,
maxSchemasToCache
);
var deserializer = new KafkaAvroDeserializer(
client,
specificDeserializerProps.asJava
);
消息是“容器”类型,真正有趣的部分是 union { A, B, C } msg 记录字段中大约 25 种类型:
record Event {
timestamp_ms created_at;
union {
Online,
Offline,
Available,
Unavailable,
...
...Failed,
...Updated
} msg;
}
所以我成功地将Array[Byte] 读入record 并将其输入到解串器中,如下所示:
var genericRecord = deserializer.deserialize(topic, consumerRecord.value())
.asInstanceOf[GenericRecord];
var schema = genericRecord.getSchema();
var msgSchema = schema.getField("msg").schema();
但问题是我无法通过联合来辨别、区分或“解决”msg 字段的“类型”:
System.out.printf(
"msg.schema = %s msg.schema.getType = %s\n",
msgSchema.getFullName(),
msgSchema.getType().name());
=> msg.schema = union msg.schema.getType = union
在这种情况下如何区分类型?融合注册表知道,这些东西有名称,它们有“类型”,即使我将它们视为GenericRecords,
我的目标是知道record.msg 是“类型”Online | Offline | Available,而不是仅仅知道它是union。
【问题讨论】:
-
所以为了清楚起见,您对收到的特定类型的消息感兴趣,是吗?你想要
type = Online而不是type = union? -
不清楚您所说的“自动反序列化器”是什么意思...您可以定义自己的接受
Array[Byte]并反序列化为 GenericRecord,就像您展示的那样。或者你可以默认使用 KafkaAvroDeserializer 来获取 GenericRecord,因为它已经知道如何处理它 -
是的,确切地说,将更新问题以澄清。
-
即使您确实编译为 Java,我也很好奇您将如何获得该类型。在我看来,无论如何,你想要一个 ENUM,而不是一个联合
-
我的假设是(未经测试)我会为
Event获得一个代码生成的类,它有一个getMsg()访问器,这会给我一个类型化的类(可能经过一些强制) - 但确实,我不确定。 FWIW 我不认为 Avro 支持enums用于复杂类型,仅用于原语。
标签: scala apache-kafka avro confluent-schema-registry