【发布时间】:2015-08-11 02:02:39
【问题描述】:
我正在阅读文本文件并在每次迭代中创建 Json 对象JsValues。我想在每次迭代时将它们保存到文件中。我正在使用 Play Framework 创建 JSON 对象。
class Cleaner {
def getDocumentData() = {
for (i <- no_of_files) {
.... do something ...
some_json = Json.obj("text" -> LARGE_TEXT)
final_json = Json.stringify(some_json)
//save final_json here to a file
}
}
}
我尝试使用PrintWriter 来保存该json,但我得到Exception in thread "main" org.apache.spark.SparkException: Task not serializable 作为错误。
我应该如何纠正这个问题?还是有其他方法可以保存 JsValue?
更新:
我读到在这种情况下必须使用特征serializable。我有以下功能:
class Cleaner() extends Serializable {
def readDocumentData() {
val conf = new SparkConf()
.setAppName("linkin_spark")
.setMaster("local[2]")
.set("spark.executor.memory", "1g")
.set("spark.rdd.compress", "true")
.set("spark.storage.memoryFraction", "1")
val sc = new SparkContext(conf)
val temp = sc.wholeTextFiles("text_doc.dat)
val docStartRegex = """<DOC>""".r
val docEndRegex = """</DOC>""".r
val docTextStartRegex = """<TEXT>""".r
val docTextEndRegex = """</TEXT>""".r
val docnoRegex = """<DOCNO>(.*?)</DOCNO>""".r
val writer = new PrintWriter(new File("test.json"))
for (fileData <- temp) {
val filename = fileData._1
val content: String = fileData._2
println(s"For $filename, the data is:")
var startDoc = false // This is for the
var endDoc = false // whole file
var startText = false //
var endText = false //
var textChunk = new ListBuffer[String]()
var docID: String = ""
var es_json: JsValue = Json.obj()
for (current_line <- content.lines) {
current_line match {
case docStartRegex(_*) => {
startDoc = true
endText = false
endDoc = false
}
case docnoRegex(group) => {
docID = group.trim
}
case docTextStartRegex(_*) => {
startText = true
}
case docTextEndRegex(_*) => {
endText = true
startText = false
}
case docEndRegex(_*) => {
endDoc = true
startDoc = false
es_json = Json.obj(
"_id" -> docID,
"_source" -> Json.obj(
"text" -> textChunk.mkString(" ")
)
)
writer.write(es_json.toString())
println(es_json.toString())
textChunk.clear()
}
case _ => {
if (startDoc && !endDoc && startText) {
textChunk += current_line.trim
}
}
}
}
}
writer.close()
}
}
这是我添加特征的函数,但我仍然遇到相同的异常。 我重写了一个较小的版本:
def foo() {
val conf = new SparkConf()
.setAppName("linkin_spark")
.setMaster("local[2]")
.set("spark.executor.memory", "1g")
.set("spark.rdd.compress", "true")
.set("spark.storage.memoryFraction", "1")
val sc = new SparkContext(conf)
var es_json: JsValue = Json.obj()
val writer = new PrintWriter(new File("test.json"))
for (i <- 1 to 10) {
es_json = Json.obj(
"_id" -> i,
"_source" -> Json.obj(
"text" -> "Eureka!"
)
)
println(es_json)
writer.write(es_json.toString() + "\n")
}
writer.close()
}
这个函数在有和没有serializable 的情况下都能正常工作。我不明白发生了什么?
【问题讨论】:
标签: json scala playframework apache-spark