【发布时间】:2016-08-13 06:03:48
【问题描述】:
我在 Kafka 上有这样的 JSON 消息:
{"id_post":"p1", "message":"blablabla"}
我想解析消息,并打印(或用于进一步计算)message 元素。
使用以下代码打印 json
val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, inputGroup, topicMap)
val postStream = kafkaStream.map(_._2)
postStream.foreachRDD((rdd, time) => {
val count = rdd.count()
if (count > 0){
rdd.foreach(record => {
println(record)
}
}
但我无法获得单个元素。 我尝试了一些 JSON 解析器,但没有运气。 有什么想法吗?
更新: 不同 JSON 解析器的一些错误 这是circe解析器的代码和输出:
val parsed_record = parse(record)
和输出:
14:45:00,676 ERROR Executor:95 - Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
at io.circe.jawn.CirceSupportParser$$anon$1$$anon$4.add(CirceSupportParser.scala:36)
at jawn.CharBasedParser$class.parseString(CharBasedParser.scala:90)
at jawn.StringParser.parseString(StringParser.scala:15)
at jawn.Parser.rparse(Parser.scala:397)
at jawn.Parser.parse(Parser.scala:338)
at jawn.SyncParser.parse(SyncParser.scala:24)
at jawn.SupportParser$$anonfun$parseFromString$1.apply(SupportParser.scala:15)
等等..在我使用parse(record)的那一行
看起来它无法访问和/或解析字符串 record。
如果我使用 lift-json 也一样
在parse(record) 处,错误输出或多或少相同:
16:58:20,425 ERROR Executor:95 - Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.NoSuchMethodError: scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
at net.liftweb.json.JsonParser$$anonfun$2.apply(JsonParser.scala:144)
at net.liftweb.json.JsonParser$$anonfun$2.apply(JsonParser.scala:141)
at net.liftweb.json.JsonParser$.parse(JsonParser.scala:80)
at net.liftweb.json.JsonParser$.parse(JsonParser.scala:45)
at net.liftweb.json.package$.parse(package.scala:40)
at SparkConsumer$$anonfun$main$1$$anonfun$apply$1.apply(SparkConsumer.scala:98)
at SparkConsumer$$anonfun$main$1$$anonfun$apply$1.apply(SparkConsumer.scala:95)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
在 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
【问题讨论】:
-
您在使用 JSON 解析器时遇到了哪些问题?
-
如果JSON有多个换行符
\n怎么办?
标签: json scala apache-spark apache-kafka spark-streaming