【问题标题】:Loop through KStream object and print out it's value循环遍历 KStream 对象并打印出它的值
【发布时间】:2019-10-02 09:07:22
【问题描述】:

我正在尝试遵循 Kafka Stream 的示例代码之一,以确保我的消费者能够正确使用主题,我想迭代并打印出它读取的事件/值。从文档here,我尝试使用foreach,以下是我得到的代码。

        @Component
        public static class PageCountSink {

        private final Log log = LogFactory.getLog(getClass());
        @StreamListener
        public void process(@Input((AnalyticsBinding.CONSUMER_IN)) KStream<String, PageViewEvent> event) {

            event
                    .foreach((key,value) -> log.info("Test"));

        }



我得到了这个错误:-

2019-10-02 16:59:17.866 ERROR 30553 --- [-StreamThread-2] o.a.k.s.p.i.AssignedStreamsTasks         : stream-thread [consumer-2-453a13f7-6551-4d6a-b692-6d35c9a57195-StreamThread-2] Failed to process stream task 0_0 due to the following error:

java.lang.ClassCastException: class java.lang.String cannot be cast to class com.example.analytics.PageViewEvent (java.lang.String is in module java.base of loader 'bootstrap'; com.example.analytics.PageViewEvent is in unnamed module of loader 'app')
    at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:964) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736) ~[kafka-streams-2.0.1.jar:na]

2019-10-02 16:59:17.879 ERROR 30553 --- [-StreamThread-2] o.a.k.s.p.internals.StreamThread         : stream-thread [consumer-2-453a13f7-6551-4d6a-b692-6d35c9a57195-StreamThread-2] Encountered the following error during processing:

java.lang.ClassCastException: class java.lang.String cannot be cast to class com.example.analytics.PageViewEvent (java.lang.String is in module java.base of loader 'bootstrap'; com.example.analytics.PageViewEvent is in unnamed module of loader 'app')
    at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:964) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767) ~[kafka-streams-2.0.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736) ~[kafka-streams-2.0.1.jar:na]

下面是这个消费者的相关sn-ps


    interface AnalyticsBinding {

        String CONSUMER_IN = "pcin" ;

        @Input(CONSUMER_IN)
        KStream<String, PageViewEvent> consumerIn();

    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    class PageViewEvent {
        private String userId, page;
        private int randNum;
        private long duration;
    }

如果有人能指出我所缺少的,将不胜感激。 谢谢

【问题讨论】:

  • 什么版本的spring cloud stream?如果您可以升级到 3.0 的最新里程碑,则在入站和出站时会自动推断 Serdes。
  • 感谢@sobychacko,正如您建议的那样,当我将 Spring Cloud 依赖项版本更改为 Greenwich.SR3 时它可以工作

标签: apache-kafka-streams spring-cloud-stream


【解决方案1】:

看起来它不知道如何反序列化为 PageViewEvent。您可能已将其配置为默认使用 StringSerializer。在这种情况下,您需要为 PageViewEvent 类型定义 Serdes。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-01-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-12-19
    • 1970-01-01
    相关资源
    最近更新 更多