【问题标题】:Serializer not compatible to org.apache.avro.util.Utf8序列化程序与 org.apache.avro.util.Utf8 不兼容
【发布时间】:2020-06-26 23:48:55
【问题描述】:

我尝试使用 kafkastreams 进行聚合,但出现如下错误

这是我正在做的:

KGroupedStream<String, Long> countrywiseAmount = ......;
KTable<String, CountSum> countrywiseAverageSum = countrywiseAmount
                .aggregate(new Initializer<CountSum>() {
                            @Override
                            public CountSum apply() {
                                return new CountSum();
                            }
                }, new Aggregator<String,Long,CountSum>() {
                    @Override
                    public CountSum apply(String country, Long amount, CountSum sumByCountry) {
                        sumByCountry.setCountry(country.toString());
                        sumByCountry.setCount(sumByCountry.getCount()+1);
                        sumByCountry.setSum(sumByCountry.getSum()+amount);
                        return sumByCountry;
                    }
                }, Materialized.with(stringSerde, countSumSerde));

我得到的错误如下

引起:org.apache.kafka.streams.errors.StreamsException: A 序列化程序(键: org.apache.kafka.common.serialization.StringSerializer / 值: org.apache.kafka.common.serialization.LongSerializer) 不是 兼容实际的键或值类型(键类型: org.apache.avro.util.Utf8 / 值类型:java.lang.Long)。更改 StreamConfig 中的默认 Serdes 或通过方法提供正确的 Serdes 参数。 在 org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94) 在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) 在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) 在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) 在 org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43) 在 org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117) 在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) 在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) 在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) 在 org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) 在 org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117) 在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) 在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) 在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) 在 org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87) 在 org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:363) ... 5 更多 原因:java.lang.ClassCastException:org.apache.avro.util.Utf8 不能转换为 java.lang.String 在 org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) 在 org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) 在 org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162) 在 org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:103) 在 org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)

有什么线索吗?

【问题讨论】:

    标签: apache-kafka-streams


    【解决方案1】:

    KTable 聚合逻辑没有问题,但是KGroupedStream&lt;String, Long&gt; countrywiseChkAmount 的问题,从KStream&lt;String, AvroModel&gt; 创建它时,新的KeyValue 创建,键应该从CharSequence 转换为String。谢谢

    【讨论】:

    • 您应该将自己的答案标记为已接受。
    猜你喜欢
    • 2023-04-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-06-04
    • 2013-02-05
    • 1970-01-01
    • 2018-02-16
    相关资源
    最近更新 更多