【问题标题】:spark scala:Task not serializablespark scala:任务不可序列化
【发布时间】:2016-06-01 19:12:38
【问题描述】:

这是一个工作代码示例:

case class log(log_version: String,log_ip: String,log_from: String,SDK: String,action_time: String,action: String,sn: String,fdn: String,typ: String,vid: String,version: String,device_id: String,ip: String,timestamp: String) extends serializable



val RDD = input.map{ line => 
    val p = line.split("\\|")
    val log_version = p(0)
    val log_ip = p(1)
    val log_from = p(2)
    val SDK = p(3)
    val action_time = p(4)
    val action = p(5)
    val sn = p(6)
    val JsonMap = if(p.length==8){
    val jsontest = parse(p(7), useBigDecimalForDouble = true)
    jsontest.extract[Map[String,String]]
    } else(Map("error" -> "empty"))
    val fdn:String = JsonMap.get("fdn").getOrElse("null")
    val typ:String = JsonMap.get("type").getOrElse("null")
    val vid:String = JsonMap.get("vid").getOrElse("null")
    val version:String = JsonMap.get("version").getOrElse("null")
    val device_id:String = JsonMap.get("device_id").getOrElse("null")
    val ip:String = JsonMap.get("ip").getOrElse("null")
    val timestamp:String = JsonMap.get("timestamp").getOrElse("null")
    log(log_version,log_ip,log_from,SDK,action_time,action,sn,fdn,typ,vid,version,device_id,ip,timestamp)}.toDF()

每当我尝试访问 sc 时都会收到以下错误。我在这里做错了什么?

org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)

我像这样更改我的代码:

case class JsonLong(fdn:String,typ:String,vid:String,version:String,device_id:String,ip:String,timestamp:String)


case class log(log_version: String,log_ip: String,log_from: String,SDK: String,action_time: String,action: String,sn: String,JsonClass:JsonLong) extends serializable


val RDD = input.map{ line => 
    val p = line.split("\\|")
    val log_version = p(0)
    val log_ip = p(1)
    val log_from = p(2)
    val SDK = p(3)
    val action_time = p(4)
    val action = p(5)
    val sn = p(6)
    val JsonMap:JsonLong = if(p.length==8){
    val jsontest = parse(p(7), useBigDecimalForDouble = true)
    val x = jsontest.extract[Map[String,String]]
    JsonLong(x.get("fdn").getOrElse("NULL"),x.get("typ").getOrElse("NULL"),x.get("vid").getOrElse("NULL"),x.get("version").getOrElse("NULL"),x.get("fdn").getOrElse("NULL"),x.get("ip").getOrElse("NULL"),x.get("timestamp").getOrElse("NULL"))
    } else(null)
    log(log_version,log_ip,log_from,SDK,action_time,action,sn,JsonMap)}.toDF()

但我还是错了?为什么?没看懂~~~谁能告诉我?

【问题讨论】:

    标签: scala apache-spark apache-spark-sql spark-dataframe


    【解决方案1】:

    Spark 需要能够序列化闭包以将其发送给每个执行器。作为对代码中无法序列化的内容的猜测,您似乎正在使用 json4s,它需要 implicit Formats 来提取 Map[String, String]。尝试在 map 函数中声明隐式。

    【讨论】:

    • 尝试使用Serializable 扩展JsonLonglog。在 Scala 中,约定是用大写字母命名类,因此请考虑将 log 重命名为 Log
    • 还有你的implicit val formats = DefaultFormats(或者你用于案例类序列化的任何东西)在哪里。这应该在 map 函数内,这就是我在回答中的意思。
    • 我的代码中有'implicit val formats = DefaultFormats'。我还尝试扩展JsonLong并使用Serializable记录,但仍然得到同样的错误。我确定是JsonLong类引起的问题。但是不知道为什么~~
    • 将你的implicit val formats = DefaultFormats移动到val RDD = input.map{ line =>下面的行
    • 我发现了问题。那是由 'val jsontest = parse(p(7), useBigDecimalForDouble = true) ' 当我更改为使用 JSON.parseFull(p(7) 都变成正确的。但我仍然不知道为什么~~~
    猜你喜欢
    • 2015-12-16
    • 2017-09-21
    • 2021-08-12
    • 2020-02-04
    • 2016-09-14
    • 2023-04-02
    • 2015-09-21
    • 1970-01-01
    • 2018-04-06
    相关资源
    最近更新 更多