【问题标题】:Serialization error while writing JSON to file将 JSON 写入文件时出现序列化错误
【发布时间】:2015-08-11 02:02:39
【问题描述】:

我正在阅读文本文件并在每次迭代中创建 Json 对象JsValues。我想在每次迭代时将它们保存到文件中。我正在使用 Play Framework 创建 JSON 对象。

class Cleaner {
  def getDocumentData() = {
     for (i <- no_of_files) {
     .... do something ...
         some_json = Json.obj("text" -> LARGE_TEXT)
         final_json = Json.stringify(some_json)
         //save final_json here to a file
     }
  }
}

我尝试使用PrintWriter 来保存该json,但我得到Exception in thread "main" org.apache.spark.SparkException: Task not serializable 作为错误。

我应该如何纠正这个问题?还是有其他方法可以保存 JsValue?

更新:

我读到在这种情况下必须使用特征serializable。我有以下功能:

class Cleaner() extends Serializable {
  def readDocumentData() {
    val conf = new SparkConf()
      .setAppName("linkin_spark")
      .setMaster("local[2]")
      .set("spark.executor.memory", "1g")
      .set("spark.rdd.compress", "true")
      .set("spark.storage.memoryFraction", "1")

    val sc = new SparkContext(conf)

    val temp = sc.wholeTextFiles("text_doc.dat)
    val docStartRegex = """<DOC>""".r
    val docEndRegex = """</DOC>""".r
    val docTextStartRegex = """<TEXT>""".r
    val docTextEndRegex = """</TEXT>""".r
    val docnoRegex = """<DOCNO>(.*?)</DOCNO>""".r
    val writer = new PrintWriter(new File("test.json"))

    for (fileData <- temp) {
      val filename = fileData._1
      val content: String = fileData._2
      println(s"For $filename, the data is:")
      var startDoc = false // This is for the
      var endDoc = false // whole file
      var startText = false //
      var endText = false //
      var textChunk = new ListBuffer[String]()
      var docID: String = ""
      var es_json: JsValue = Json.obj()

      for (current_line <- content.lines) {
        current_line match {
          case docStartRegex(_*) => {
            startDoc = true
            endText = false
            endDoc = false
          }
          case docnoRegex(group) => {
            docID = group.trim
          }
          case docTextStartRegex(_*) => {
            startText = true
          }
          case docTextEndRegex(_*) => {
            endText = true
            startText = false
          }
          case docEndRegex(_*) => {
            endDoc = true
            startDoc = false
            es_json = Json.obj(
              "_id" -> docID,
              "_source" -> Json.obj(
                "text" -> textChunk.mkString(" ")
              )
            )
            writer.write(es_json.toString())
            println(es_json.toString())
            textChunk.clear()
          }
          case _ => {
            if (startDoc && !endDoc && startText) {
              textChunk += current_line.trim
            }
          }
        }
      }
    }
    writer.close()
  }
}

这是我添加特征的函数,但我仍然遇到相同的异常。 我重写了一个较小的版本:

def foo() {
    val conf = new SparkConf()
      .setAppName("linkin_spark")
      .setMaster("local[2]")
      .set("spark.executor.memory", "1g")
      .set("spark.rdd.compress", "true")
      .set("spark.storage.memoryFraction", "1")
    val sc = new SparkContext(conf)

    var es_json: JsValue = Json.obj()
    val writer = new PrintWriter(new File("test.json"))
    for (i <- 1 to 10) {
      es_json = Json.obj(
        "_id" -> i,
        "_source" -> Json.obj(
          "text" -> "Eureka!"
        )
      )
      println(es_json)
      writer.write(es_json.toString() + "\n")
    }
    writer.close()
  }

这个函数在有和没有serializable 的情况下都能正常工作。我不明白发生了什么?

【问题讨论】:

    标签: json scala playframework apache-spark


    【解决方案1】:

    编辑:电话上的第一个答案。

    不是你的主类需要可序列化,而是你在 rdd 处理循环中使用的类,在这种情况下在for (fileData &lt;- temp) 中 它需要可序列化,因为 spark 数据位于可能位于多台计算机上的多个分区上。因此,您应用于此数据的函数需要可序列化,以便您可以将它们发送到将并行执行它们的另一台计算机。 PrintWriter 不能序列化,因为它指的是只能从原始计算机获得的文件。因此出现序列化错误。

    在初始化 spark 过程的计算机上写入数据。您需要将整个集群中的数据带到您的机器上然后写入。

    为此,您可以收集结果。 rdd.collect() 这将从集群中获取所有数据并将其放入驱动程序线程内存中。然后您可以使用PrintWriter 将其写入文件。

    像这样:

    temp.flatMap { fileData =>
      val filename = fileData._1
      val content: String = fileData._2
      println(s"For $filename, the data is:")
      var startDoc = false // This is for the
      var endDoc = false // whole file
      var startText = false //
      var endText = false //
      var textChunk = new ListBuffer[String]()
      var docID: String = ""
      var es_json: JsValue = Json.obj()
    
      var results = ArrayBuffer[String]()
    
      for (current_line <- content.lines) {
        current_line match {
          case docStartRegex(_*) => {
            startDoc = true
            endText = false
            endDoc = false
          }
          case docnoRegex(group) => {
            docID = group.trim
          }
          case docTextStartRegex(_*) => {
            startText = true
          }
          case docTextEndRegex(_*) => {
            endText = true
            startText = false
          }
          case docEndRegex(_*) => {
            endDoc = true
            startDoc = false
            es_json = Json.obj(
              "_id" -> docID,
              "_source" -> Json.obj(
                "text" -> textChunk.mkString(" ")
              )
            )
            results.append(es_json.toString())
            println(es_json.toString())
            textChunk.clear()
          }
          case _ => {
            if (startDoc && !endDoc && startText) {
              textChunk += current_line.trim
            }
          }
        }
      }
    
      results
    }
    .collect()
    .foreach(es_json => writer.write(es_json))
    

    如果结果对于驱动程序线程内存来说太大,您可以使用 saveAsTextFile 函数将每个分区流式传输到您的驱动器。在第二种情况下,您作为参数提供的路径将被放入一个文件夹,并且您的 rdd 的每个分区都将写入其中的一个编号文件。

    像这样:

    temp.flatMap { fileData =>
      val filename = fileData._1
      val content: String = fileData._2
      println(s"For $filename, the data is:")
      var startDoc = false // This is for the
      var endDoc = false // whole file
      var startText = false //
      var endText = false //
      var textChunk = new ListBuffer[String]()
      var docID: String = ""
      var es_json: JsValue = Json.obj()
    
      var results = ArrayBuffer[String]()
    
      for (current_line <- content.lines) {
        current_line match {
          case docStartRegex(_*) => {
            startDoc = true
            endText = false
            endDoc = false
          }
          case docnoRegex(group) => {
            docID = group.trim
          }
          case docTextStartRegex(_*) => {
            startText = true
          }
          case docTextEndRegex(_*) => {
            endText = true
            startText = false
          }
          case docEndRegex(_*) => {
            endDoc = true
            startDoc = false
            es_json = Json.obj(
              "_id" -> docID,
              "_source" -> Json.obj(
                "text" -> textChunk.mkString(" ")
              )
            )
            results.append(es_json.toString())
            println(es_json.toString())
            textChunk.clear()
          }
          case _ => {
            if (startDoc && !endDoc && startText) {
              textChunk += current_line.trim
            }
          }
        }
      }
    
      results
    }
    .saveAsTextFile("test.json")
    

    【讨论】:

    • 我找不到你。我试图处理es_json 的类是serializable
    • 是的,但PrinterWriter 不可序列化,无法序列化。所以错误来自在for (data &lt;- temp) 循环中使用writer val。
    猜你喜欢
    • 2019-08-13
    • 2018-01-15
    • 2016-08-26
    • 1970-01-01
    • 2021-08-21
    • 1970-01-01
    • 1970-01-01
    • 2018-04-18
    • 2012-07-02
    相关资源
    最近更新 更多