【发布时间】:2016-07-27 16:32:27
【问题描述】:
当我实现自己的partioner并尝试对原始rdd进行洗牌时,我遇到了一个问题。我知道这是由不可序列化的引用函数引起的,但是在添加
扩展可序列化
对于每个相关的类,这个问题仍然存在。我该怎么办?
线程“main”org.apache.spark.SparkException 中的异常:任务不可序列化 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 在 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 在 org.apache.spark.SparkContext.clean(SparkContext.scala:1622)
object STRPartitioner extends Serializable{
def apply(expectedParNum: Int,
sampleRate: Double,
originRdd: RDD[Vertex]): Unit= {
val bound = computeBound(originRdd)
val rdd = originRdd.mapPartitions(
iter => iter.map(row => {
val cp = row
(cp.coordinate, cp.copy())
}
)
)
val partitioner = new STRPartitioner(expectedParNum, sampleRate, bound, rdd)
val shuffled = new ShuffledRDD[Coordinate, Vertex, Vertex](rdd, partitioner)
shuffled.setSerializer(new KryoSerializer(new SparkConf(false)))
val result = shuffled.collect()
}
class STRPartitioner(expectedParNum: Int,
sampleRate: Double,
bound: MBR,
rdd: RDD[_ <: Product2[Coordinate, Vertex]])
extends Partitioner with Serializable {
...
}
【问题讨论】:
标签: multithreading scala serialization apache-spark serializable