【问题标题】:org.apache.spark.SparkException: Task not serializable, whorg.apache.spark.SparkException:任务不可序列化,wh
【发布时间】:2016-07-27 16:32:27
【问题描述】:

当我实现自己的partioner并尝试对原始rdd进行洗牌时,我遇到了一个问题。我知道这是由不可序列化的引用函数引起的,但是在添加

扩展可序列化

对于每个相关的类,这个问题仍然存在。我该怎么办?

线程“main”org.apache.spark.SparkException 中的异常:任务不可序列化 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 在 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 在 org.apache.spark.SparkContext.clean(SparkContext.scala:1622)

object STRPartitioner extends Serializable{
  def apply(expectedParNum: Int,
        sampleRate: Double,
        originRdd: RDD[Vertex]): Unit= {
    val bound = computeBound(originRdd)
    val rdd = originRdd.mapPartitions(
      iter => iter.map(row => {
        val cp = row
        (cp.coordinate, cp.copy())
      }
      )
    )
    val partitioner = new STRPartitioner(expectedParNum, sampleRate, bound, rdd)
    val shuffled = new ShuffledRDD[Coordinate, Vertex, Vertex](rdd,  partitioner)
    shuffled.setSerializer(new KryoSerializer(new SparkConf(false)))
    val result = shuffled.collect()
  }

class STRPartitioner(expectedParNum: Int,
                     sampleRate: Double,
                     bound: MBR,
                     rdd: RDD[_ <: Product2[Coordinate, Vertex]])
  extends Partitioner with  Serializable {
    ... 
}

【问题讨论】:

    标签: multithreading scala serialization apache-spark serializable


    【解决方案1】:

    我只是解决问题!将 -Dsun.io.serialization.extendedDebugInfo=true 添加到您的 VM 配置中,您将针对不可序列化的类!

    【讨论】:

      猜你喜欢
      • 2015-05-31
      • 2015-08-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-02-07
      • 1970-01-01
      相关资源
      最近更新 更多