【发布时间】:2017-11-17 18:52:40
【问题描述】:
我正在使用带有 Scala 的 Spark Streaming,并且我正在从 kafka 获取 json 记录。我想解析它,以便获取值(日期时间和质量)和过程。
这是我的代码:
stream.foreachRDD(rdd => {
rdd.collect().foreach(i =>
println(msgParse(i.value()).quality)
)
})
我有这个案例类和我的解析函数:
case class diskQuality(datetime: String , quality : Double) extends Serializable
def msgParse(value: String): diskQuality = {
import org.json4s._
import org.json4s.native.JsonMethods._
implicit val formats = DefaultFormats
val res = parse(value).extract[diskQuality]
return res
}
我已经添加了这个依赖:
libraryDependencies += "org.json4s" % "json4s-native_2.10" % "3.2.4"
我收到的记录格式如下:
"{\"datetime\":\"14-05-2017 14:18:30\",\"quality\":92.6}"
但是我得到了这个错误:
Exception in thread "main" org.json4s.ParserUtil$ParseException: expected field or array Near: ,\"quality\":100.0}"
编辑:
当我尝试使用相同的函数解析以下内容时,它可以工作。但即使 kafka 消息采用相同的格式,它仍然会给出相同的错误:
val test = "{\"datetime\":\"14-05-2017 14:18:30\",\"quality\":92.6}"
我正在使用 scalaVersion := "2.10.6" 和 json4s-native_2.10"
任何帮助将不胜感激。谢谢你的时间
【问题讨论】:
-
第一种格式是正确的 - "{\"datetime\":\"14-05-2017 14:18:30\",\"quality\":92.6}"。您的代码可以使用它。您能否检查一下 build.sbt 中的 Scala 版本是什么。它是 2.10 作为您的 org.json4s 依赖项吗?此外,您可以记录 msgParse 函数的 value 参数,以检查它的实际值。
-
感谢您的回复,我编辑了我的问题,这是我打印时 msgParse 中的值:"{\"datetime\":\"24-04-2017 07:53: 30\",\"质量\":100.0}"
-
@AsmaaM 如果这是你的控制台输出 - 你有引号转义的问题,你能检查你的生产者发送给 kafka 的内容吗?
-
@MonteCristo 当我检查 kafka 主题时,我有这样的事情(我猜每行都有一条消息。它实际上是从包含这些记录的文件中加载的):“[{\”datetime\” :\"24-04-2017 07:53:30\",\"质量\":100.0},""{\"日期时间\":\"24-04-2017 08:14:30\",\ "quality\":100.0},""{\"datetime\":\"24-04-2017 08:21:30\",\"quality\":100.0}]" 在我的代码中,我做了一些更改,因此现在每一行都可以采用正确的格式:record = i.value().replace("[" , "").replace("]" , "").dropRight(2)+ '"' 当我打印“记录”时,它给出了我写的输出
-
@AsmaaM 您的目标是在打印时使用
{"datetime":"14-05-2017 14:18:30","quality":92.6}格式。您发布的内容是带有转义引号的字符串的表示形式。
标签: json scala apache-kafka spark-streaming json4s