【发布时间】:2018-06-07 11:41:32
【问题描述】:
你能解决我的疑问吗?通过使用 Kafka Streams,我能够在我的 Kafka 主题中从 jsonObject (a-->{"host":"mou","type":"kat"}) 中获取一个字段,并使用 Java8 中的以下代码。
KStream<String, String> sources = builder
.stream(Serdes.String(), jsonSerde, "example")
.map((k,v) -> new KeyValue<>(k, (String) v.path("a").toString()));
sources.foreach((w, c) -> System.out.println("word: ->" + c));
sources.print("FETCHING FIELD VALUE");
但如果我的输入是{"type":"abc","name":"abc","a"[{"host":"mouni"}]},我会收到序列化错误。 (请就我的问题发送 POC。)
【问题讨论】:
-
输入值是JSON吗?我不是 JSON 专家。
-
嘿,输入是 json,正如我在上面发布的问题中提到的那样。
标签: stream apache-kafka apache-kafka-streams confluent-platform