【问题标题】:kafka streams - NullPointerException when joining KStream with KTable leftjoin()kafka 流 - 使用 KTable leftjoin() 加入 KStream 时出现 NullPointerException
【发布时间】:2018-04-21 17:53:34
【问题描述】:

我使用leftJoin 使用表中的值转换流消息(事务)。我连接多个连接。每次加入后,我都会选择一个新密钥。我在上次加入时收到了NullPointerException。过滤 null 键没有帮助。

Exception in thread "account-transactions-e0a66751-f65e-4ba0-947c-024b5d32f7c2-StreamThread-1" java.lang.NullPointerException at org.namematching.kafkastreams.transactiontransformation.TransactionTransformationStream.lambda$launchStream$14(TransactionTransformationStream.java:124)
at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:58)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)

这是我的加入逻辑:

KStream<String,DoiTAccountTransaction> transactionsWithAccounts =
            transactionStream.leftJoin(accountTable,(v1,v2) -> { v1.setBeneficiaryBankAccount(v2.getOriginId()); return v1; }, Joined.with(Serdes.String(), transactionSerde, accountSerde))
            .selectKey((k,v) -> v.getTransactionCode());

KStream<String,DoiTAccountTransaction> transactionsWithTransactionCodes =
            transactionsWithAccounts.leftJoin(transactionCodesTable, (v1,v2) -> {v1.setTransactionCode(v2.getUid()); return v1; }, Joined.with(Serdes.String(), transactionSerde, transactionCodesSerde))
            .selectKey((k,v) -> v.getCurrency());

KStream<String,DoiTAccountTransaction> transactionsWithCurrencies =
            transactionsWithTransactionCodes.leftJoin(currenciesTable, (v1,v2) -> {v1.setCurrency(v2.getUid()); return v1;}, Joined.with(Serdes.String(), transactionSerde, currenciesSerde))
            .selectKey((k,v) -> v.getOriginalBeneficiary1());

KStream<String, DoiTAccountTransaction> transactionsWithPersons =
            transactionsWithCurrencies.leftJoin(personsTable,(v1,v2) -> {v1.setOriginalBeneficiary1(v2.getUid()); return v1;}, Joined.with(Serdes.String(), transactionSerde, personsSerde));


transactionsWithPersons.to("account-transactions-processed", Produced.with(Serdes.String(),transactionSerde));

上次加入失败。过滤空键没有帮助。

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams


    【解决方案1】:

    是否可能(当时)personsTable 中不存在该键的一行,并且 NPE 来自 v2.getUid() 表达式?

    在访问它们的字段(所有 v2 的)之前,对连接中的所有右侧对象进行非空检查会更安全。

    https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KStream-KTableJoin

    【讨论】:

      猜你喜欢
      • 2020-08-11
      • 2022-07-06
      • 2021-12-11
      • 2020-11-04
      • 1970-01-01
      • 2022-12-02
      • 1970-01-01
      • 2019-09-24
      • 1970-01-01
      相关资源
      最近更新 更多