【问题标题】:Null value in KStream.map causing IllegalArgumentException: Payload must not be nullKStream.map 中的空值导致 IllegalArgumentException:有效负载不能为空
【发布时间】:2019-10-21 15:45:50
【问题描述】:

问题描述:

我正在创建一个 Spring Cloud Kafka 流应用程序。我有一个输入主题和一个输出主题,我正在尝试使用 KStream.map 函数对输入主题应用 KStream 键值转换操作
如果我将转换后的值设为 null,则该函数会抛出 IllegalArgumentException
我的问题是:
1:异常原因? 虽然在文档中说:“忽略具有空键或空值的输入记录”
2:在无状态/有状态操作中处理异常的最佳实践?是否应该围绕整个处理计划进行尝试/捕获就足够了?或者我应该在每个转换函数(例如过滤器、映射、连接、减少)中都有一个 try/catch?

感谢任何想法。

应用配置:

spring:
  application:
    name:kafka-streams-test
  cloud.stream:
    kafka.streams:
      binder:
        brokers: localhost:9093
        configuration:
          commit.interval.ms: 1000
          security.protocol: SASL_PLAINTEXT
          sasl.mechanism: GSSAPI
          sasl.kerberos.service.name: kafka
        serdeError: logAndContinue
      bindings:
        streams-words-input:
          consumer:
            application-id: Input-Words
        streams-words-output:
          consumer:
            application-id: Output-Words
    bindings:
      streams-words-input:
        destination: streams-words-input
      streams-words-output:
        destination: streams-words-output

示例代码:

@StreamListener()
@SendTo("streams-words-output")
public KStream<String, Long> createWords(
    @Input("streams-words-input") final KStream<String, String> wordsInput){
    return wordsInput
            .map((key,value) -> KeyValue.pair(key, null));
}

异常堆栈跟踪:

java.lang.IllegalArgumentException: Payload must not be null
at org.springframework.util.Assert.notNull(Assert.java:198)
at org.springframework.messaging.support.MessageBuilder.<init>(MessageBuilder.java:57)
at org.springframework.messaging.support.MessageBuilder.withPayload(MessageBuilder.java:179)
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate.lambda$serializeOnOutbound$0(KafkaStreamsMessageConversionDelegate.java:86)
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate$$Lambda$743/1725151361.apply(Unknown Source)
at org.apache.kafka.streams.kstream.internals.AbstractStream$2.apply(AbstractStream.java:87)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:40)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamPassThrough$KStreamPassThroughProcessor.process(KStreamPassThrough.java:33)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
at org.apache.kafka.streams.kstream.internals.KStreamBranch$KStreamBranchProcessor.process(KStreamBranch.java:48)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:964)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)

【问题讨论】:

    标签: spring-integration apache-kafka-streams spring-kafka spring-cloud-stream spring-messaging


    【解决方案1】:

    您看到的异常来自 Spring Messaging 长期存在的规则 - “没有带有空负载的消息”。换句话说,如果没有可通信的内容,就没有要发送的消息。

    也就是说,KStream 以及它如何处理这种情况显然存在问题,所以我建议在Kafka binder 中提出问题。同时,您可以轻松地将filter 操作添加到您的管道中以过滤掉空值。

    【讨论】:

    • 带有null value() 的Kafka 记录被视为墓碑记录(与压缩主题一起使用)。当我们希望向 kafka 发送墓碑时,我们会使用一个“特殊”有效载荷 KafkaNull.INSTANCEKafkaStreamsMessageConversionDelegate 应该在遇到 null value() 时进行该翻译。
    • 抛出这个异常确实是一个bug,需要在binder中修复。话虽如此,如果您可以将您的应用程序升级到 3.0.0 行(Horsham Spring Cloud Stream 的 Release train),那么您将不会遇到此异常,因为默认情况下转换是由 Kafka Streams Serdes 完成的。如果你想在 Spring 中进行消息转换,你必须选择加入。如果可能的话,我建议先升级,但我们会在活页夹中修复它。感谢您指出这一点。
    • 感谢 cmets 的帮助。我正在使用 cloud-stream-binder-kafka 2.1.2 版本,因为它依赖于 spring boot 2.1.5,所以我可能无法升级到 3.0.0。实际上我指出了 Kafka 活页夹中的错误,并且我已经应用过滤器来暂时解决问题。
    猜你喜欢
    • 2015-09-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-01-03
    • 1970-01-01
    • 2021-04-20
    • 2015-09-04
    • 1970-01-01
    相关资源
    最近更新 更多