【发布时间】:2019-10-27 09:31:13
【问题描述】:
我有两个 Scala 代码 - MyMain.scala 和 MyFunction.scala,分别构建,MyFunction 构建的 jar 将充当 MyMain 中的 UDF。
MyFunction.scala 基本上包含一个带有公共方法public String myFunc(String val0, String val1) 的Java 类。该项目是在 SBT 中构建的,并且 build_jar 编译输出存储为工件(只有所需的类,即 MyFunction.class,而不是依赖项)。
MyMain.scala 将上述工件 jar 导入到 lib 文件夹下,并在 build.sbt 中使用 unmanagedBase := baseDirectory.value / "lib" 添加到类路径中
所以MyMain.scala项目结构如下:
MyMain
|
-lib/MyFunction.jar
|
- META-INF/MANIFEST.MF
- MyFunction.class
-project
-src/main/scala/MyMain.scala
-build.sbt
/我需要做什么/
我想在 MyFunction.jar 中的 MyFunction.class 上的 MyMain.scala 中定义一个 UDF,该文件已添加到 lib 类路径中。我已经定义了 UDF,但是当我尝试在 MyMain.scala 中的 Spark 数据帧上使用它时,它会抛出“Task not serializable”java.io.NotSerializableException,如下所示:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
at org.apache.spark.sql.Dataset.show(Dataset.scala:747)
at org.apache.spark.sql.Dataset.show(Dataset.scala:724)
at MyMain$.main(<pastie>:253)
... 58 elided
Caused by: java.io.NotSerializableException: MyMain$
Serialization stack:
- object not serializable (class: MyMain$, value: MyMain$@11f25cf)
- field (class: $iw, name: MyMain$module, type: class MyMain$)
- object (class $iw, $iw@540705e8)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@7e6e1038)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@7587f2a0)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5e00f263)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3fbfe419)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5172e87b)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5ec96f75)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@26f6de78)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@18c3bc83)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@35d674ee)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5712092f)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6980c2e6)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6ce299e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@406b8acb)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@73d71e61)
- field (class: $line47.$read, name: $iw, type: class $iw)
- object (class $line47.$read, $line47.$read@72ee2f87)
- field (class: $iw, name: $line47$read, type: class $line47.$read)
- object (class $iw, $iw@22c4de5a)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@3daea539)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function2>)
- element of array (index: 9)
- array (class [Ljava.lang.Object;, size 15)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 92 more
/可能是什么原因/
MyMain.scala 指的是 Spark 数据帧上的一些转换中的一些不可序列化的类实例
/我尝试了什么/
object MyFunction extends Serializable {
val myFuncSingleton = new MyFunction()
def getMyFunc(var0:String,var1:String) : String = {
myFuncSingleton.myFunc(var0,var1)
}
}
import org.apache.spark.sql.functions.udf
val myUDF = udf((val0: String, val1: String) => { MyFunction.getMyFunc(val0, val1) })
object MyMain {
val spark = ...
val hadoopfs = ...
def main(args: Array[String]) : Unit = {
val df1 = ...
val df2 = df1.withColumn("reg_id", myUDF(lit("Subscriber"), col("id")))
}
}
参考以下链接 how-to-solve-non-serializable-errors-when-instantiating-objects-in-spark-udfs
【问题讨论】:
-
你标记
MyFunction类也Serializable? -
是的,从 Java 类开始,
public class MyFunction implements java.io.Serializable
标签: scala apache-spark