【问题标题】:KafkaAvroDeserializer failing with Kyro ExceptionKafkaAvroDeserializer 因 Kyro 异常而失败
【发布时间】:2020-05-09 05:58:17
【问题描述】:

我编写了一个消费者来使用模式注册表读取 Avro 的通用记录。

FlinkKafkaConsumer010 kafkaConsumer010 = new FlinkKafkaConsumer010(KAFKA_TOPICS,
                new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
                properties);

反序列化类看起来像这样:

public class KafkaGenericAvroDeserializationSchema implements KeyedDeserializationSchema<GenericRecord> {

   private final String registryUrl;
    private transient KafkaAvroDeserializer inner;

    public KafkaGenericAvroDeserializationSchema(String registryUrl) {
        this.registryUrl = registryUrl;
    }

    @Override
    public GenericRecord deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) {
        checkInitialized();
        return (GenericRecord) inner.deserialize(topic, message);
    }

    @Override
    public boolean isEndOfStream(GenericRecord nextElement) {
        return false;
    }

    @Override
    public TypeInformation<GenericRecord> getProducedType() {
        return TypeExtractor.getForClass(GenericRecord.class);
    }

    private void checkInitialized() {
        if (inner == null) {
            Map<String, Object> props = new HashMap<>();
            props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
            props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
            SchemaRegistryClient client =
                    new CachedSchemaRegistryClient(
                            registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
            inner = new KafkaAvroDeserializer(client, props);
        }
    }
}

它在我的机器上本地工作,但是当我将它部署在纱线集群上时,我遇到了以下异常:

java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)

Caused by: com.esotericsoftware.kryo.KryoException: Error constructing instance of class: org.apache.avro.Schema$LockableArrayList
Serialization trace:
types (org.apache.avro.Schema$UnionSchema)
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)

Caused by: java.lang.IllegalAccessException: Class com.twitter.chill.Instantiators$$anonfun$normalJava$1 can not access a member of class org.apache.avro.Schema$LockableArrayList with modifiers "public"

请帮我解决这个问题。

【问题讨论】:

标签: java apache-kafka apache-flink avro flink-streaming


【解决方案1】:

mailing list上所写:

问题是您没有提供任何有意义的类型信息,因此 Flink 不得不求助于 Kryo。您需要在查询编译期间(在您的主程序中)提取架构并将其传递给您的反序列化架构。

public TypeInformation<T> getProducedType() {
      return (TypeInformation<T>) new GenericRecordAvroTypeInfo(this.schema);
}

如果你不想静态提取它,你需要告诉 Flink 如何处理任意 GenericRecords。您可以实现自己的 serializer,它将 GenericRecords 写入 byte[],反之亦然。

请注意,我仍然建议将架构与您的 Flink 应用程序捆绑在一起,而不是重新发明轮子。

【讨论】:

    猜你喜欢
    • 2020-11-14
    • 2018-03-12
    • 2014-07-29
    • 1970-01-01
    • 2014-02-06
    • 2015-06-15
    • 1970-01-01
    • 2022-08-13
    • 2020-12-08
    相关资源
    最近更新 更多