【问题标题】:Exception when processing data during Kafka stream processKafka流处理过程中处理数据的异常
【发布时间】:2018-05-11 16:08:06
【问题描述】:

我正在使用以下代码处理 Kafka 流。如果"UserID":"1",我检查来自JSON obj 的过滤条件。请参考下面的代码

builder.<String,String>stream(Serdes.String(), Serdes.String(), topic)
                   .filter(new Predicate <String, String>() {

               String userIDCheck = null;

               @Override
            public boolean test(String key, String value) {

                   try {
                       JSONObject jsonObj = new JSONObject(value);

                       userIDCheck = jsonObj.get("UserID").toString();
                       System.out.println("userIDCheck: " + userIDCheck);                          
                   } catch (JSONException e) {
                       // TODO Auto-generated catch block
                       e.printStackTrace();
                   }

                   return userIDCheck.equals("1");
               }
            })
           .to(streamouttopic);

值:{"UserID":"1","Address":"XXX","AccountNo":"989","UserName":"Stella","AccountType":"YYY"}

我收到以下错误:

    Exception in thread "SampleStreamProducer-4eecc3ab-858c-44a4-9b8c-5ece2b4ab21a-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=testtopic1, partition=0, offset=270
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:91)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
    at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189)
    ... 3 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to [B
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:89)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76)
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)

上面的流代码中的值和条件都很好,我不明白为什么在执行流代码时会给出这个异常。

【问题讨论】:

  • 你能分享整个堆栈跟踪吗?如果它在日志中被截断,请尝试注册一个未捕获的异常处理程序以获取完整的堆栈跟踪和根本原因。
  • 看起来,Class Cast Exception 的问题。 kafka.common.serialization.ByteArraySerializer) 与实际的键或值类型不兼容

标签: apache-kafka apache-kafka-streams


【解决方案1】:

报告的问题应仅适用于 Kafka 2.0 及更早版本。从 2.1.0 版本开始,Kafka Streams 支持“serde push down”,to() 操作符应该从上游继承正确的 serdes(参见 https://issues.apache.org/jira/browse/KAFKA-7456)。

对于 Kafka 2.0 及更早版本,您必须为 to() 操作明确指定正确的 Serdes。否则,它使用来自 StreamsConfig 的默认 Serdes,即 ByteArraySerde(因为语义或 serde 覆盖是每个运算符“插入覆盖”)——并且 String 不能转换为 byte[]

你需要做的:

.to(streamoutputtopic, Produced.with(Serdes.String(), Serdes.String()));

对于尚未使用Produced 参数的旧版本(1.0 之前),代码为:

.to(Serdes.String(), Serdes.String(), streamoutputtopic);

【讨论】:

  • 观察到产生无法解析。我想,我的版本不一样
  • 旧版本应该是to(Serdes.String(), Serdes.String(), streamoutputtopic); 或类似的
  • 有效!伟大的!我有最后一个疑问。如果我需要流式传输多个主题,我可以这样做吗? builder.stream(topic1).to(streamouttopic); builder.stream(topic2).to(streamouttopic); builder.stream(topic3).to(streamouttopic);
  • 是的。这是可能的——我想知道的是:如果它们有效,你为什么不试试这些东西:)
  • 我已经尝试过了,发现它正在工作。只是想和你这样的专家确认!非常感谢!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-09-18
  • 1970-01-01
  • 2020-05-27
  • 1970-01-01
  • 2020-12-19
  • 1970-01-01
相关资源
最近更新 更多