【发布时间】:2017-12-23 08:07:35
【问题描述】:
线程“主”org.apache.spark.SparkException 中的异常:任务不是 可序列化在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 在 org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 在 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 在 org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1857) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) 在 org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 在 org.apache.spark.rdd.RDD.collect(RDD.scala:926) 在 org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2.apply(DateFormatConstraint.scala:32) 在 org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2.apply(DateFormatConstraint.scala:16) 在 org.exadatum.ddq.core.Runner$$anonfun$run$1$$anonfun$3.apply(Runner.scala:22) 在 org.exadatum.ddq.core.Runner$$anonfun$run$1$$anonfun$3.apply(Runner.scala:22) 在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 在 scala.collection.immutable.List.foreach(List.scala:318) 在 scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 在 scala.collection.AbstractTraversable.map(Traversable.scala:105) 在 org.exadatum.ddq.core.Runner$$anonfun$run$1.apply(Runner.scala:22) 在 org.exadatum.ddq.core.Runner$$anonfun$run$1.apply(Runner.scala:20) 在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 在 scala.collection.immutable.List.foreach(List.scala:318) 在 scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 在 scala.collection.AbstractTraversable.map(Traversable.scala:105) 在 org.exadatum.ddq.core.Runner$.run(Runner.scala:20) 在 org.exadatum.ddq.core.RunCheck.(RunCheck.scala:104) 在 org.exadatum.ddq.core.DQJobTrigger$.main(DQJobTrigger.scala:39) 在 org.exadatum.ddq.core.DQJobTrigger.main(DQJobTrigger.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 在 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 在 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 在 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 在 org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 引起 作者:java.io.NotSerializableException:org.apache.spark.SparkContext 序列化栈: - 对象不可序列化(类:org.apache.spark.SparkContext,值:org.apache.spark.SparkContext@1d9bd4d6) - 字段(类:org.exadatum.ddq.constraints.DateFormatConstraint,名称:sc,类型:类 org.apache.spark.SparkContext) - 对象(类 org.exadatum.ddq.constraints.DateFormatConstraint,DateFormatConstraint(startdate,java.text.SimpleDateFormat@4f76f1a0,org.apache.spark.SparkContext@1d9bd4d6,xdqdemo.customer_details)) - 字段(类:org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2,名称: $outer,类型:类 org.exadatum.ddq.constraints.DateFormatConstraint) - 对象(类 org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2, ) - 字段(类:org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2$$anonfun$3, 名称:$outer,类型:类 org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2) - 对象(类 org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2$$anonfun$3, ) - 字段(类:org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2,名称: func$2,类型:接口 scala.Function1) - 对象(类 org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, ) - 字段(类:org.apache.spark.sql.catalyst.expressions.ScalaUDF,名称:f,类型:接口 scala.Function1) - 对象(类 org.apache.spark.sql.catalyst.expressions.ScalaUDF,UDF(startdate#2)) - writeObject 数据(类:scala.collection.immutable.$colon$colon) - 对象(类 scala.collection.immutable.$colon$colon, List(UDF(startdate#2))) - 字段(类:org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan, 名称:谓词,类型:接口 scala.collection.Seq) - 对象(类 org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan, InMemoryColumnarTableScan [phone_number#0,name#1,startdate#2], [UDF(startdate#2)],InMemoryRelation [phone_number#0,name#1,startdate#2], true, 10000, StorageLevel(false, 真、假、真、1)、ConvertToUnsafe、无) - 字段(类:org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan$$anonfun$doExecute$1, 名称:$outer,类型:类 org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan) - 对象(类 org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan$$anonfun$doExecute$1, ) - 字段(类:org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1,名称:f$22, 类型:接口 scala.Function1) - 对象(类 org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, ) - 字段(类:org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21, 名称:$outer,类型:类 org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1) - 对象(类 org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21, ) - 字段(类:org.apache.spark.rdd.MapPartitionsRDD,名称:f,类型:接口 scala.Function3) - 对象(类 org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[8] at rdd at DateFormatConstraint.scala:32) - 字段(类:org.apache.spark.NarrowDependency,名称:rdd,类型:类 org.apache.spark.rdd.RDD) - 对象(类 org.apache.spark.OneToOneDependency,org.apache.spark.OneToOneDependency@316975be) - writeObject 数据(类:scala.collection.immutable.$colon$colon) - 对象(类 scala.collection.immutable.$colon$colon,列表(org.apache.spark.OneToOneDependency@316975be)) - 字段(类:org.apache.spark.rdd.RDD,名称:org$apache$spark$rdd$RDD$$dependencies,类型:接口 scala.collection.seq) - 对象(类 org.apache.spark.rdd.MapPartitionsRDD,MapPartitionsRDD[9] at rdd at DateFormatConstraint.scala:32) - 字段(类:org.apache.spark.NarrowDependency,名称:rdd,类型:类 org.apache.spark.rdd.RDD) - 对象(类 org.apache.spark.OneToOneDependency,org.apache.spark.OneToOneDependency@526fbb80) - writeObject 数据(类:scala.collection.immutable.$colon$colon) - 对象(类 scala.collection.immutable.$colon$colon,列表(org.apache.spark.OneToOneDependency@526fbb80)) - 字段(类:org.apache.spark.rdd.RDD,名称:org$apache$spark$rdd$RDD$$dependencies,类型:接口 scala.collection.seq) - 对象(类 org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[10] at rdd at DateFormatConstraint.scala:32) - 字段(类:org.apache.spark.rdd.RDD$$anonfun$collect$1,名称:$outer,类型:类 org.apache.spark.rdd.RDD) - 对象(类 org.apache.spark.rdd.RDD$$anonfun$collect$1, ) - 字段(类:org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12,名称:$outer, 类型:类 org.apache.spark.rdd.RDD$$anonfun$collect$1) - 对象(类 org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12,) 在 org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 在 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 在 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 39 更多
代码片段:
val fun = (df: DataFrame) => {
format.setLenient(false)
val cannotBeDate = udf((column: String) => column != null && Try(format.parse(column)).isFailure)
val maybeCannotBeDateCount = Try(df.filter(cannotBeDate(new Column(columnName))).count);
/** Utility to persist all of the bad records **/
val hiveContext = new HiveContext(sc)
import hiveContext.implicits._
//Writing all Bad records
//val intermediateYriteToHiveDf = df.filter(cannotBeDate(new Column(columnName)))
val writeToHiveDf = df.filter(cannotBeDate(new Column(columnName)))
var recordLists = new ListBuffer[List[(String, String, String)]]()
writeToHiveDf.rdd.collect().foreach {
row =>
val item = row.mkString("-")
val recordList: List[(String, String, String)] = List(List(tableName, "ALWAYS_NULL_CONSTRAINT", item))
.map { case List(a, b, c) => (a, b, c) }
recordLists += recordList
}
val listRDD = sc.parallelize(recordLists.flatten)
val dataFrameToHive: DataFrame = listRDD.toDF("table_name", "constraint_applied", "data")
dataFrameToHive.write.mode("append").saveAsTable("xdqdemo.bad_records")
DateFormatConstraintResult(
this,
data = maybeCannotBeDateCount.toOption.map(DateFormatConstraintResultData),
status = ConstraintUtil.tryToStatus[Long](maybeCannotBeDateCount, _ == 0)
)
}
【问题讨论】:
-
我猜是
format.parse导致了问题。您尚未在 sn-p 中包含此format-variable 的初始化,但format-class 本身不可序列化,或者您正在初始化format的类不可序列化(很可能后者) -
format 是一个参数,它被初始化为 format = SimpleDateFormat("some-date-format");
-
感谢您的回复
-
您的问题解决了吗?否则,请包含更多代码。特别是,这个
fun-函数在哪个对象/类中声明,format变量在哪里实例化?这些类是否包含任何类变量或其他内部类? -
样本输入数据也应该有所帮助。
标签: scala apache-spark serialization