【问题标题】:Task not serializable in scalascala中不可序列化的任务
【发布时间】:2017-05-12 15:09:14
【问题描述】:

在我的应用程序中,我使用并行化方法将数组保存到文件中。

代码如下:

 val sourceRDD = sc.textFile(inputPath + "/source")

 val destinationRDD = sc.textFile(inputPath + "/destination")

val source_primary_key = sourceRDD.map(rec => (rec.split(",")(0).toInt, rec))
val destination_primary_key = destinationRDD.map(rec => (rec.split(",")(0).toInt, rec))

val extra_in_source = source_primary_key.subtractByKey(destination_primary_key)
val extra_in_destination = destination_primary_key.subtractByKey(source_primary_key)

val source_subtract = source_primary_key.subtract(destination_primary_key)

val Destination_subtract = destination_primary_key.subtract(source_primary_key)

val exact_bestmatch_src = source_subtract.subtractByKey(extra_in_source).sortByKey(true).map(rec => (rec._2))
val exact_bestmatch_Dest = Destination_subtract.subtractByKey(extra_in_destination).sortByKey(true).map(rec => (rec._2))

val exact_bestmatch_src_p = exact_bestmatch_src.map(rec => (rec.split(",")(0).toInt))

val primary_key_distinct = exact_bestmatch_src_p.distinct.toArray()

for (i <- primary_key_distinct) {

  var dummyVar: String = ""
  val src = exact_bestmatch_src.filter(line => line.split(",")(0).toInt.equals(i))
  var dest = exact_bestmatch_Dest.filter(line => line.split(",")(0).toInt.equals(i)).toArray

  for (print1 <- src) {

    var sourceArr: Array[String] = print1.split(",")
    var exactbestMatchCounter: Int = 0
    var index: Array[Int] = new Array[Int](1)

    println(print1 + "source")

    for (print2 <- dest) {

      var bestMatchCounter = 0
      var i: Int = 0

      println(print1 + "source + destination" + print2)

      for (i <- 0 until sourceArr.length) {
        if (print1.split(",")(i).equals(print2.split(",")(i))) {
          bestMatchCounter += 1
        }
      }
      if (exactbestMatchCounter < bestMatchCounter) {
        exactbestMatchCounter = bestMatchCounter
        dummyVar = print2
        index +:= exactbestMatchCounter //9,8,9      
      }
    }

    var z = index.zipWithIndex.maxBy(_._1)._2

    if (exactbestMatchCounter >= 0) {
      var samparr: Array[String] = new Array[String](4)
      samparr +:= print1 + "  BEST_MATCH  " + dummyVar     
      var deletedest: Array[String] = new Array[String](1)
      deletedest = dest.take(z) ++ dest.drop(1)
      dest = deletedest
val myFile = sc.parallelize((samparr)).saveAsTextFile(outputPath)

我使用了并行化方法,我什至尝试使用以下方法将其保存为文件

val myFile = sc.textFile(samparr.toString())
val finalRdd = myFile
finalRdd.coalesce(1).saveAsTextFile(outputPath)

但它不断抛出错误:

线程“主”org.apache.spark.SparkException 中的异常:任务不可序列化

【问题讨论】:

  • 我们需要知道samparr的类型是哪个si。你能用这些信息完成你的问题吗?
  • samparr 是一个数组,我想将该数组保存到一个文件中,所以我正在尝试使用并行化方法
  • 是的,我知道。但是一系列什么? transformationsactions 中使用的代码是可序列化的,这一点很重要。
  • 它是一个字符串数组 var samparr:Array[String] = new Array[String](4)
  • 好的,那么请务必将 Spark 作业中的所有代码提供给我们。我不认为您发布的线路给您带来了问题。

标签: scala apache-spark


【解决方案1】:

您不能将 RDD 视为本地集合。针对它的所有操作都发生在分布式集群上。要工作,您在该 rdd 中运行的所有函数都必须是可序列化的。

线

for (print1 <- src) {

在这里,您正在迭代 RDD src,循环内的所有内容都必须序列化,因为它将在执行程序上运行。

但是,在内部,您尝试运行sc.parallelize(,同时仍在该循环中。 SparkContext 不可序列化。使用 rdds 和 sparkcontext 是您在驱动程序上执行的操作,不能在 RDD 操作中执行。

我完全确定您要完成什么,但它看起来像是某种手工编码的源和目标连接操作。您不能像使用本地集合一样使用 rdds 中的循环。利用 apis map、join、groupby 等来创建最终的 rdd,然后保存。

如果你绝对觉得你必须像这样在 rdd 上使用 foreach 循环,那么你不能使用 sc.parallelize().saveAsTextFile() 而是使用 hadoop 文件 api 打开一个输出流并手动将你的数组写入文件。

【讨论】:

    【解决方案2】:

    最后这段代码帮助我将数组保存到文件中。

     new PrintWriter(outputPath) { write(array.mkString(" ")); close }
    

    【讨论】:

      猜你喜欢
      • 2018-09-19
      • 2015-12-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-08-14
      • 2021-08-12
      • 2023-04-02
      • 2020-08-30
      相关资源
      最近更新 更多