【问题标题】:Flink throwing com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationExceptionFlink 抛出 com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
【发布时间】:2021-04-26 16:13:55
【问题描述】:

我正在尝试在我的 flink 流作业中反序列化 kafka 事件。这是我的代码:

...
case class URLResponse (status: Int, domain: String, url: String, queue: String, html: String)
...
val schema: Schema = AvroSchema[URLResponse]
...
val stream = env.addSource(new FlinkKafkaConsumer[GenericRecord](kafkaTopic, ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, schemaRegistryURL), properties))

以及在运行时抛出此异常的作业:

...
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
Caused by: java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
    ... 26 more

Process finished with exit code 1

我读到我不应该使用Kryo,但我不知道该怎么做。我试过了:

executionConfig.enableForceAvro()
executionConfig.disableForceKryo()

但没用。

【问题讨论】:

    标签: scala apache-kafka apache-flink avro4s


    【解决方案1】:

    如果您不能使用 Java 环境添加源(也许您正在使用 StreamExecutionEnvironment.readFile 方法),这里共享另一个解决方案:https://stackoverflow.com/a/32453031/899937,本质上是: p>

    val unmodifiableCollectionClass = Class.forName("java.util.Collections$UnmodifiableCollection")
    env.getConfig.addDefaultKryoSerializer(unmodifiableCollectionClass, classOf[UnmodifiableCollectionsSerializer])
    

    kryo-serializers 不再包含在 Flink 中,因此您必须将其添加为依赖项。

    【讨论】:

      【解决方案2】:

      我在java中也遇到了同样的问题,下面的代码sn-p帮助我解决问题

          StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
      
      Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
      environment.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
      

      你还需要添加 maven 依赖来解析 UnmodifiableCollectionsSerializer

          <dependency>
              <groupId>de.javakaffee</groupId>
              <artifactId>kryo-serializers</artifactId>
              <version>0.45</version>
          </dependency>
      

      【讨论】:

        【解决方案3】:

        提到的异常与 avro 反序列化的 scala 实现问题有关。如果我使用 java 实现(https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro),它工作正常。我的解决方案:

        val javaStream = env.getJavaEnv.addSource(new FlinkKafkaConsumer[GenericRecord](
            kafkaTopic, ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, schemaRegistryURL), properties),
            new GenericRecordAvroTypeInfo(schema))
        val stream = new DataStream[GenericRecord](javaStream)
        

        【讨论】:

          【解决方案4】:

          我在 Kinesis 数据流上使用 Avro GenericRecord 时遇到了同样的问题。 使用 scala 2.12 和 flink 1.11.4。

          我的解决方案是添加一个隐式类型信息

          implicit val typeInfo: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(avroSchema)
          

          下面是一个专注于序列化问题的完整代码示例:

          @Test def `test avro generic record serializer`(): Unit = {
            val env = StreamExecutionEnvironment.getExecutionEnvironment
            val schema: String =
              """
                |{
                | "namespace": "com.mberchon.monitor.dto.avro",
                | "type": "record",
                | "name": "TestAvro",
                | "fields": [
                |  {"name": "strVal", "type": ["null", "string"]},
                |  {"name": "longVal",  "type": ["null", "long"]}
                |  ]
                |}
          """.stripMargin
          
            val avroSchema = new Schema.Parser().parse(schema)
            val rec:GenericRecord = new GenericRecordBuilder(avroSchema)
              .set("strVal","foo")
              .set("longVal",1234L)
              .build()
          
            implicit val typeInfo: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(avroSchema)
            val _ = env.fromElements(rec,rec).addSink(new PrintSinkFunction[GenericRecord]())
          
            env.execute("Test serializer")
          }
          

          回到你的上下文,下面的代码应该可以工作:

          ...
          case class URLResponse (status: Int, domain: String, url: String, queue: String, html: String)
          ...
          val schema: Schema = AvroSchema[URLResponse]
          ...
          implicit val typeInfo: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(schema)
          val stream = env.addSource(new FlinkKafkaConsumer[GenericRecord](kafkaTopic, ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, schemaRegistryURL), properties))
          

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 2020-09-11
            • 2021-07-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2016-11-28
            • 1970-01-01
            • 2022-10-20
            相关资源
            最近更新 更多