【问题标题】:Avro java.io.EOFException when using transform in a KStream DSL在 KStream DSL 中使用转换时出现 Avro java.io.EOFException
【发布时间】:2019-08-02 11:40:32
【问题描述】:

我需要在 KStream 中使用 transform() 操作,但在未设置必要的 serdes 时会出现常见的 ClassNotFoundException:

Caused by: java.lang.ClassCastException: xxx.SomeKey cannot be cast to [B
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:161)
val someKeySerde = SpecificAvroSerde<SomeKeySerde>()
someKeySerde.configure(serdeConfig, false)
val someValueSerde = SpecificAvroSerde<SomeValueSerde>()
someValueSerde.configure(serdeConfig, false)
val someExtendedValueSerde = SpecificAvroSerde<SomeExtendedValueSerde>()
someExtendedValueSerde.configure(serdeConfig, false)

myKStream
    .transform(TransformerSupplier {
        object : Transformer<SomeKey, SomeValue, KeyValue<SomeKey, SomeValue>> {

            private lateinit var context: ProcessorContext

            override fun close() {
            }

            override fun transform(key: SomeKey, value: SomeValue): KeyValue<SomeKey, SomeValue> {
                println("@@@@@@@@@@@@ timestamp ${context.timestamp()}")
                ...   
                return KeyValue(key, enrichedValue)
            }

            override fun init(context: ProcessorContext) {
                this.context = context
            }

        }
    }).groupByKey()
      .aggregate(getSomeValueAggregationInitializer(),
                        getAggregator("absolute"),
                        materializedAbsoluteSomeValueFrequency)

其他 KStream 操作允许我们指定要使用的 serdes,但不能在转换的情况下。我该如何设置它们? (如您在上面看到的,SpecificAvroSerde)?

更新: 正如 Matthias 所指出的,问题在于转换后的 groupByKey 操作中缺少 Serdes。我已经用新问题更新了问题标题。

1) 为什么没有Grouped.with(clientProjectIdSerde, deploymentFinishedSerde) 时没有transform() 调用但我需要在transform() 之后添加它?

如果我更新 groupById 以包含 GroupedWith -> .groupByKey(Grouped.with(clientProjectIdSerde, deploymentFinishedSerde)) 现在我可以看到通话记录 ("@@@@@@@@@@@@ ... 但出现了一个新问题:

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: java.io.EOFException
    at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
    at org.apache.avro.io.BinaryDecoder.readLong(BinaryDecoder.java:160)
    at org.apache.avro.io.ResolvingDecoder.readLong(ResolvingDecoder.java:162)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:184)
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)

如果我在没有.transform() 的情况下留下完全相同的代码,它可以工作:

myKStream
    .groupByKey(Grouped.with(clientProjectIdSerde, deploymentFinishedSerde))
      .aggregate(getSomeValueAggregationInitializer(),
                        getAggregator("absolute"),
                        materializedAbsoluteSomeValueFrequency)

2) 为什么我在使用 transform() 时会收到此 Avro 异常,但如果没有它,我该如何解决?

从 SomeValue Avro 对象中读取 dateTime long 字段时发生错误:

{
  "namespace" : "xxx",
  "type" : "record",
  "name" : "SomeValue",
  "fields" : [
    {"name": "name", "type": "string"},
    {"name":"dateTime", "type": "long", "logicalType": "timestamp-millis"}
  ]
}

【问题讨论】:

  • 您确定 ClassCastException 是由您复制/粘贴的转换器抛出的,而不是在转换之后(或之前)的代码中吗?我问这个,因为例外是关于 SomeKey 的问题,但是在您的代码中您没有触及消息的关键部分:您接受 SomeKey 并发送 SomeKey ....
  • 如果我删除了 transform() 调用,其余的过程工作正常,所以它似乎来自它。
  • 从代码中看不清楚,但是只有在某些主题中写入了一些数据才会出现异常。在transform() 之后是否有连续的to() 操作,或者您是否在transform() 中使用了可能写入更改日志主题的状态存储?
  • @MatthiasJ.Sax 我已经用新信息更新了这个问题,感谢您查看它
  • 如果您在上游设置 Serdes,Kafka Stream 会尝试在下游使用它们,键和值类型不会改变。但是,如果您使用transform(),键和值类型都可能会更改,因此使用上游 Serdes 是不安全的。因此,如果您不再设置 Serdes,Kafka Streams 会从配置中回退到默认的 Serdes。

标签: apache-kafka avro apache-kafka-streams


【解决方案1】:

第一个问题的答案由 cmets 中的 Matthias 提供。

关于第二个,问题出现的实际场景是在测试期间。与普通的 Kafka Broker + Schema Registry 相比,它运行良好。

问题出在我在测试期间使用的io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient 类中。

这是模式注册的流程:

1) 首先它从输入主题中读取一条记录,并为 SomeKey 和 id 2 注册了一个模式。

2) 在transform() 之后,流程逻辑使用重新分区主题。它尝试序列化密钥。为此,MockSchemaRegistryClient.getIdFromRegistry() 方法为键和值的模式生成了错误的 id (-1)。然后,当它尝试序列化密钥时,它使用了 -1 id,该 id 首先分配给了密钥,但后来覆盖了值模式。因此,它试图用值的模式序列化键。这就是 Avro 异常的原因。

我使用的是 io.confluent:kafka-schema-registry-client:5.2.1。升级到 5.2.2 问题就消失了。这是修复的提交:https://github.com/confluentinc/schema-registry/commit/6ef5d4a523a5eedff0fa32bea1e1405be42efc13#diff-e5caaf947bc9ff275003783d5d50eee6R90

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-08-09
    • 1970-01-01
    • 2022-10-24
    • 2019-09-24
    • 1970-01-01
    • 1970-01-01
    • 2014-11-09
    • 2014-08-04
    相关资源
    最近更新 更多