【问题标题】:NotSerializableException with json4s on Spark在 Spark 上带有 json4s 的 NotSerializableException
【发布时间】:2023-03-29 08:11:01
【问题描述】:

基本上,我必须使用 Spark 在 HDFS 上分析一些复杂的 JSON。

我使用“用于理解”来(预)过滤 JSON 和“提取”方法 json4s 将其包装到一个案例类中

这个很好用!

def foo(rdd: RDD[String]) = {

case class View(C: String,b: Option[Array[List[String]]],  t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats

rdd.map { jsonString =>
  val jsonObj = parse(jsonString)
  val listsOfView = for {
    JObject(value) <- jsonObj
    JField(("v"), JObject(views)) <- value
    normalized <- views.map(x => (x._2))
  } yield normalized
}

到目前为止一切顺利!

当我尝试将(预)过滤的 JSON 提取到我的 CaseClass 我明白了:

线程“主”org.apache.spark.SparkException 中的异常:作业因阶段失败而中止:任务不可序列化:java.io.NotSerializableException:org.json4s.DefaultFormats$

这里是提取代码:

def foo(rdd: RDD[String]) = {

case class View(C: String,b: Option[Array[List[String]]],  t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats

rdd.map { jsonString =>
  val jsonObj = parse(jsonString)
  val listsOfView = for {
    JObject(value) <- jsonObj
    JField(("v"), JObject(views)) <- value
    normalized <- views.map(x => (x._2))
  } yield normalized.extract[View]
}

我已经在 scala ws 上尝试过我的代码,并且它的工作!我对 hdfs 和 spark 的东西真的很陌生,所以我将不胜感激。

【问题讨论】:

标签: json scala hdfs apache-spark json4s


【解决方案1】:

Spark 序列化 RDD 转换的闭包并将其“运送”给工作人员以进行分布式执行。 这要求闭包中的所有代码(通常也在包含对象中)应该是可序列化的。

查看 org.json4s.DefaultFormat$ 的 impl(该 trait 的伴生对象):

object DefaultFormats extends DefaultFormats {
    val losslessDate = new ThreadLocal(new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
    val UTC = TimeZone.getTimeZone("UTC")

}

很明显,这个对象是不可序列化的,也不能这样。 (ThreadLocal 本身就是不可序列化的)

您似乎没有在代码中使用 Date 类型,所以您可以摆脱 implicit val formats = DefaultFormats 或用可序列化的东西替换 DefaultFormats?

【讨论】:

    【解决方案2】:

    这实际上已经得到修复; JSON4S 从 3.3.0 版本开始可序列化:https://github.com/json4s/json4s/issues/137

    【讨论】:

    • 但是 JSON4S 3.3 与任何 Spark 版本都不兼容,除非您隐藏依赖关系,因为二进制/运行时与 Spark 包含的 JSON4S 3.2 不兼容。
    【解决方案3】:

    解决我的问题的是,我在rdd.foreach{} 循环中使用了implicit val formats = DefaultFormats。它解决了我的可序列化异常。

    这是我解决问题的代码 sn-p:

    case class rfId(rfId: String) {}
    
    // ... some code here ...
    
     rdd.foreach { record =>
        val value = record.value()
    
        // Bring in default date formats etc and makes json4s serializable
        implicit val formats = DefaultFormats
        val json = parse(value)
        println(json.camelizeKeys.extract[rfId])  // Prints `rfId(ABC12345678)`
     }
    

    【讨论】:

      最近更新 更多