【问题标题】:Differentiating an AVRO union type区分 AVRO 联合类型
【发布时间】:2020-05-07 08:06:50
【问题描述】:

我正在使用“自动”反序列化器使用来自 Kafka 的 Avro 序列化消息,例如:

props.put(
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    "io.confluent.kafka.serializers.KafkaAvroDeserializer"
);
props.put("schema.registry.url", "https://example.com");

这非常有效,并且位于 https://docs.confluent.io/current/schema-registry/serializer-formatter.html#serializer 的文档中。

我面临的问题是我实际上只是想转发这些消息,但要进行路由,我需要一些内部元数据。一些技术限制意味着我无法编译生成的类文件以使用KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG => true,因此我使用常规解码器而不绑定到 Kafka,特别是将字节读取为Array[Byte] 并将它们传递给手动构造的反序列化器:

var maxSchemasToCache = 1000;
var schemaRegistryURL = "https://example.com/"
var specificDeserializerProps = Map(
  "schema.registry.url" 
      -> schemaRegistryURL,
  KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG 
      -> "false"
);
var client = new CachedSchemaRegistryClient(
                     schemaRegistryURL, 
                     maxSchemasToCache
                 );
var deserializer = new KafkaAvroDeserializer(
                         client,
                         specificDeserializerProps.asJava
                   );

消息是“容器”类型,真正有趣的部分是 union { A, B, C } msg 记录字段中大约 25 种类型:

record Event {
    timestamp_ms created_at;
    union {
        Online,
        Offline,
        Available,
        Unavailable,
        ...
        ...Failed,
        ...Updated
    } msg;
}

所以我成功地将Array[Byte] 读入record 并将其输入到解串器中,如下所示:

var genericRecord = deserializer.deserialize(topic, consumerRecord.value())
                       .asInstanceOf[GenericRecord];
var schema = genericRecord.getSchema();
var msgSchema = schema.getField("msg").schema();

但问题是我无法通过联合来辨别、区分或“解决”msg 字段的“类型”:

System.out.printf(
    "msg.schema = %s msg.schema.getType = %s\n", 
    msgSchema.getFullName(),  
    msgSchema.getType().name());
=> msg.schema = union msg.schema.getType = union

在这种情况下如何区分类型?融合注册表知道,这些东西有名称,它们有“类型”,即使我将它们视为GenericRecords

我的目标是知道record.msg 是“类型”Online | Offline | Available,而不是仅仅知道它是union

【问题讨论】:

  • 所以为了清楚起见,您对收到的特定类型的消息感兴趣,是吗?你想要type = Online 而不是type = union
  • 不清楚您所说的“自动反序列化器”是什么意思...您可以定义自己的接受Array[Byte] 并反序列化为 GenericRecord,就像您展示的那样。或者你可以默认使用 KafkaAvroDeserializer 来获取 GenericRecord,因为它已经知道如何处理它
  • 是的,确切地说,将更新问题以澄清。
  • 即使您确实编译为 Java,我也很好奇您将如何获得该类型。在我看来,无论如何,你想要一个 ENUM,而不是一个联合
  • 我的假设是(未经测试)我会为 Event 获得一个代码生成的类,它有一个 getMsg() 访问器,这会给我一个类型化的类(可能经过一些强制) - 但确实,我不确定。 FWIW 我不认为 Avro 支持 enums 用于复杂类型,仅用于原语。

标签: scala apache-kafka avro confluent-schema-registry


【解决方案1】:

在研究了 AVRO Java 库的实现之后,它认为可以肯定地说,鉴于当前的 API,这是不可能的。我发现了以下在解析时提取类型的方法,使用自定义的GenericDatumReader 子类,但在我在生产代码中使用类似的东西之前需要大量的打磨:D

所以这是子类:

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.ResolvingDecoder;

import java.io.IOException;
import java.util.List;

public class CustomReader<D> extends GenericDatumReader<D> {
    private final GenericData data;
    private Schema actual;
    private Schema expected;

    private ResolvingDecoder creatorResolver = null;
    private final Thread creator;
    private List<Schema> unionTypes;

    // vvv This is the constructor I've modified, added a list of types
    public CustomReader(Schema schema, List<Schema> unionTypes) {
        this(schema, schema, GenericData.get());
        this.unionTypes = unionTypes;
    }

    public CustomReader(Schema writer, Schema reader, GenericData data) {
        this(data);
        this.actual = writer;
        this.expected = reader;
    }

    protected CustomReader(GenericData data) {
        this.data = data;
        this.creator = Thread.currentThread();
    }

    protected Object readWithoutConversion(Object old, Schema expected, ResolvingDecoder in) throws IOException {
        switch (expected.getType()) {
            case RECORD:
                return super.readRecord(old, expected, in);
            case ENUM:
                return super.readEnum(expected, in);
            case ARRAY:
                return super.readArray(old, expected, in);
            case MAP:
                return super.readMap(old, expected, in);
            case UNION:
                // vvv The magic happens here
                Schema type = expected.getTypes().get(in.readIndex());
                unionTypes.add(type);
                return super.read(old, type, in);
            case FIXED:
                return super.readFixed(old, expected, in);
            case STRING:
                return super.readString(old, expected, in);
            case BYTES:
                return super.readBytes(old, expected, in);
            case INT:
                return super.readInt(old, expected, in);
            case LONG:
                return in.readLong();
            case FLOAT:
                return in.readFloat();
            case DOUBLE:
                return in.readDouble();
            case BOOLEAN:
                return in.readBoolean();
            case NULL:
                in.readNull();
                return null;
            default:
                return super.readWithoutConversion(old, expected, in);
        }
    }
}

我已将 cmets 添加到有趣部分的代码中,因为它主要是样板文件。

然后你可以像这样使用这个自定义阅读器:

        List<Schema> unionTypes = new ArrayList<>();
        DatumReader<GenericRecord> datumReader = new CustomReader<GenericRecord>(schema, unionTypes);
        DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(eventFile, datumReader);
        GenericRecord event = null;

        while (dataFileReader.hasNext()) {
            event = dataFileReader.next(event);
        }

        System.out.println(unionTypes);

这将为每个解析的union 打印该union 的类型。请注意,您必须根据记录中的联合数等来确定该列表中的哪个元素是您感兴趣的。

不是很漂亮:D

【讨论】:

    【解决方案2】:

    经过大量挖掘,我想出了一个一次性解决方案:

    val records: ConsumerRecords[String, Array[Byte]] = consumer.poll(100);
    for (consumerRecord <- asScalaIterator(records.iterator)) {
      var genericRecord = deserializer.deserialize(topic, consumerRecord.value()).asInstanceOf[GenericRecord];
      var msgSchema = genericRecord.get("msg").asInstanceOf[GenericRecord].getSchema();
      System.out.printf("%s \n", msgSchema.getFullName());
    

    打印 com.myorg.SomeSchemaFromTheEnum 并在我的用例中完美运行。

    令人困惑的是,由于使用了GenericRecord.get("msg") 返回Object,通常我无法安全地进行类型转换。在这种有限的情况下,我知道演员阵容是安全的。

    在我有限的用例中,上面 5 行中的解决方案是合适的,但对于更通用的解决方案,https://stackoverflow.com/users/124257/fresskoma 发布的答案 https://stackoverflow.com/a/59844401/119669 似乎更合适。

    是否使用DatumReaderGenericRecord 可能是一个偏好问题,以及是否考虑到Kafka 生态系统,单独使用Avro 我可能更喜欢DatumReader 解决方案,但在这种情况下,我可以忍受我的代码中的 Kafak 式命名法。

    【讨论】:

      【解决方案3】:

      要检索字段值的架构,您可以使用

      new GenericData().induce(genericRecord.get("msg"))
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2015-05-17
        • 1970-01-01
        • 2020-02-16
        • 1970-01-01
        • 1970-01-01
        • 2020-08-22
        • 2021-09-03
        • 1970-01-01
        相关资源
        最近更新 更多