【问题标题】:ClassCastException in spring-kafka-test using `merger()`使用`merger()`的spring-kafka-test中的ClassCastException
【发布时间】:2020-04-21 11:01:32
【问题描述】:

我想通过使用 kafka-streams-test-utils 的单元测试来测试我的 Kafka Streams 拓扑。我已经使用这个库很长时间了,并且我已经使用 TestNG 围绕我的测试构建了一些抽象层。 但是由于我在我的 Stream 中添加了 merge(...),所以我得到了以下异常:

 org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=my-topic-2, partition=0, offset=0
 at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:318)
at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:393)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: com.MyKey / value type: com.MyValue). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302)
... 3 more
Caused by: java.lang.ClassCastException: class com.MyKey cannot be cast to class [B (com.MyValue is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:156)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:101)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 15 more

这是我如何使用 TopologyTestDriver 的 StreamBuilder 构建 Stream 的部分:

// Block 1
KStream<MyKey, MyValue> stream2 = streamsBuilder.stream(
    "my-topic-2",
    consumedAs(OtherKey.class, OtherValue.class, AllowEmpty.NONE) // Provides default json Serde
).flatMap(
    (key, value) -> {
        List<KeyValue<MyKey, MyValue>> list = new ArrayList<>();
        // Do stuff an fill out the list
        return list;
    })
 .through("tmp-topic");

// Block 2
KStream<MyKey, MyValue>[] branches = stream1
    .merge(stream2)
    ... business stuff

为了在源主题上生成消息,我使用了 TopologyTestDriver.pipeInput(...),并使用 JsonSerDes 进行了初始化。 通过转换 ByteArray 会发生异常,但我不知道为什么 ByteArraySerializer 的预期参数是同一个类,但来自另一个模块,而不是加载的消费类。它们也可能由另一个类加载器加载。但是后台没有 Spring 堆栈,一切都应该同步运行。

我真的对这种行为感到困惑。

Apache Kafka Dependecies 的版本为:2.0.1,我使用的是 openjdk-11。是否可以对齐序列化程序的类加载? 仅当我在 my-topic-2 上生成某些内容时,才会出现该错误,合并的另一个主题可以正常工作。

【问题讨论】:

    标签: java testing apache-kafka apache-kafka-streams


    【解决方案1】:

    没有看到你所有的代码,我不能肯定地说,但这是我认为可能发生的事情。

    提供带有Consumed 的Serdes 仅在使用来自输入主题的记录时提供反序列化; Kafka Streams 不会通过拓扑的其余部分传播它们。在任何时候,如果再次需要 Serde,Kafka Streams 会使用 StreamsConfig 中提供的那些。 Serdes.ByteArraySerde 是默认值。

    我建议尝试两件事:

    1. 在您的汇节点中使用Produced.with(keySerde, valueSerde)
    2. 通过StreamsConfig 为您的类型提供 Serde。

    HTH,让我知道事情进展如何。

    -比尔

    【讨论】:

    • 感谢您的提示,由于@mazaneicha 的具体解决方案,我将其他答案标记为正确。
    【解决方案2】:

    正如@bbejeck 所述,您需要使用different version of .through(),它允许您覆盖应用于K, V 的默认(ByteArraySerde) serdes。

    KStream<K,V> through​(java.lang.String topic,
                         Produced<K,V> produced) 
    

    将此流具体化为一个主题,并使用 Produced 实例从该主题创建一个新的 KStream,以配置 key serdevalue serde 和 StreamPartitioner。 ...这相当于调用 to(someTopic, Produced.with(keySerde, valueSerde) 和 StreamsBuilder#stream(someTopicName, Consumed.with(keySerde, valueSerde))。

    【讨论】:

    • 这很好用,我不知道,为什么我这么盲目地看到这个参数:-)
    • 该错误消息更令人困惑而不是有用,其中包含所有加载程序和模块信息:)
    猜你喜欢
    • 1970-01-01
    • 2021-11-09
    • 1970-01-01
    • 2020-10-12
    • 1970-01-01
    • 2022-06-16
    • 2020-10-09
    • 2021-02-16
    • 2019-06-17
    相关资源
    最近更新 更多