【发布时间】:2023-03-29 08:11:01
【问题描述】:
基本上,我必须使用 Spark 在 HDFS 上分析一些复杂的 JSON。
我使用“用于理解”来(预)过滤 JSON 和“提取”方法 json4s 将其包装到一个案例类中
这个很好用!
def foo(rdd: RDD[String]) = {
case class View(C: String,b: Option[Array[List[String]]], t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats
rdd.map { jsonString =>
val jsonObj = parse(jsonString)
val listsOfView = for {
JObject(value) <- jsonObj
JField(("v"), JObject(views)) <- value
normalized <- views.map(x => (x._2))
} yield normalized
}
到目前为止一切顺利!
当我尝试将(预)过滤的 JSON 提取到我的 CaseClass 我明白了:
线程“主”org.apache.spark.SparkException 中的异常:作业因阶段失败而中止:任务不可序列化:java.io.NotSerializableException:org.json4s.DefaultFormats$
这里是提取代码:
def foo(rdd: RDD[String]) = {
case class View(C: String,b: Option[Array[List[String]]], t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats
rdd.map { jsonString =>
val jsonObj = parse(jsonString)
val listsOfView = for {
JObject(value) <- jsonObj
JField(("v"), JObject(views)) <- value
normalized <- views.map(x => (x._2))
} yield normalized.extract[View]
}
我已经在 scala ws 上尝试过我的代码,并且它的工作!我对 hdfs 和 spark 的东西真的很陌生,所以我将不胜感激。
【问题讨论】:
标签: json scala hdfs apache-spark json4s