【问题标题】:Discard message in Kstream Spring kafka application在 Kstream Spring kafka 应用程序中丢弃消息
【发布时间】:2021-10-23 08:19:33
【问题描述】:

我创建了一个 kafka 流 Spring Boot 应用程序。 输入 : json格式

输出: AVRO 格式

当我解析 Json 时,如果发现它已损坏或无效,我想跳过它。

但是当我尝试返回空的 AVRO 类时,Stream API(自动发布到输出通道)返回

Exception in thread "AutonomousStreamListener-process-applicationId-31990c75-5965-42b6-906a-92e13855e40c-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=FAULT_MANAGEMENT, partition=0, offset=4, stacktrace=org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException: null of string of com.fujitsu.fnc.fums.faultMgmt.avro.model.Fault
    at org.apache.avro.generic.GenericDatumWriter.npe(GenericDatumWriter.java:183)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:177)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:110)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:59)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:176)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111)
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:92)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)


如何处理这种情况,它涉及在流中跳过消息而不总是返回消息。

伪代码:

@StreamListener("Processor-input-channel")
    @SendTo("Processor-output-channel")
    public KStream<String, AVROClass> process(KStream<String, String> input){
        //parse input , map to fault and change received key to time stamp and send 
        KStream<String, AVROClass> kStream = input
                .mapValues(v -> service.getAVROResponse(v))
                .map((k,v)->KeyValue.pair((Long.toString((System.currentTimeMillis()))), v));

        kStream.foreach((k, v) -> log.info(String.format("Key: %s, Value: %s", k, v)));
        
        
        return kStream;
        
    }

[编辑]:

此外,应用程序似乎停在那里并且流线程关闭。应用程序不会接收新消息。

感谢任何有关 AVRO 流异常处理和伪代码的帮助。

【问题讨论】:

    标签: java spring-boot apache-kafka-streams avro spring-kafka


    【解决方案1】:

    您可以使用过滤操作跳过消息。

    @StreamListener("Processor-input-channel")
    @SendTo("Processor-output-channel")
    public KStream<String, AVROClass> process(KStream<String, String> input){
        //parse input, map to fault and change received key to timestamp and send 
        KStream<String, AVROClass> kStream = input
            .mapValues(v -> service.getAVROResponse(v))
            .map((k,v)->KeyValue.pair((Long.toString((System.currentTimeMillis()))), v))
            .filter((k, v) -> v != null) <<<< ADD YOUR CONDITION >>>>
            .peek((k, v) -> log.info(String.format("Key: %s, Value: %s", k, v)));
        return kStream;    
    }
    

    您也不需要forEach 来打印每条消息。这可以通过peek 轻松完成。

    此外,应用程序似乎停在那里并且流线程关闭。应用程序不接收新消息。

    您可以通过将deserialization-exception-handler 属性值设置为logandcontinue 来忽略反序列化异常。但是如果服务端的消息处理出现错误,可以用try..catch块包裹起来处理。

    【讨论】:

    • 它不能作为日志工作,继续是一种反序列化异常方法而不是序列化方法。例外是当我们尝试生成记录并遇到空值时。
    • 在生产消息时,防止空值被发送到主题
    • 实际上不是问题,我有一个数据可能很脏的情况,我必须跳过它。但是当我尝试不将此消息返回到流时,我无法返回一个空白对象或 null,并且我在流中得到了生产者的序列化异常。我无法控制我收到的数据。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-05-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-21
    • 1970-01-01
    相关资源
    最近更新 更多