【问题标题】:org.apache.spark.SparkException: Task not serializable Caused by: java.io.NotSerializableExceptionorg.apache.spark.SparkException:任务不可序列化原因:java.io.NotSerializableException
【发布时间】:2019-10-27 09:31:13
【问题描述】:

我有两个 Scala 代码 - MyMain.scala 和 MyFunction.scala,分别构建,MyFunction 构建的 jar 将充当 MyMain 中的 UDF。

MyFunction.scala 基本上包含一个带有公共方法public String myFunc(String val0, String val1) 的Java 类。该项目是在 SBT 中构建的,并且 build_jar 编译输出存储为工件(只有所需的类,即 MyFunction.class,而不是依赖项)。

MyMain.scala 将上述工件 jar 导入到 lib 文件夹下,并在 build.sbt 中使用 unmanagedBase := baseDirectory.value / "lib" 添加到类路径中

所以MyMain.scala项目结构如下:

MyMain
| 
-lib/MyFunction.jar
       |
       - META-INF/MANIFEST.MF
       - MyFunction.class
-project
-src/main/scala/MyMain.scala
-build.sbt

/我需要做什么/

我想在 MyFunction.jar 中的 MyFunction.class 上的 MyMain.scala 中定义一个 UDF,该文件已添加到 lib 类路径中。我已经定义了 UDF,但是当我尝试在 MyMain.scala 中的 Spark 数据帧上使用它时,它会抛出“Task not serializable”java.io.NotSerializableException,如下所示:

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:747)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:724)
  at MyMain$.main(<pastie>:253)
  ... 58 elided
Caused by: java.io.NotSerializableException: MyMain$
Serialization stack:
    - object not serializable (class: MyMain$, value: MyMain$@11f25cf)
    - field (class: $iw, name: MyMain$module, type: class MyMain$)
    - object (class $iw, $iw@540705e8)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@7e6e1038)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@7587f2a0)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@5e00f263)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@3fbfe419)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@5172e87b)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@5ec96f75)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@26f6de78)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@18c3bc83)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@35d674ee)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@5712092f)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@6980c2e6)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@6ce299e)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@406b8acb)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@73d71e61)
    - field (class: $line47.$read, name: $iw, type: class $iw)
    - object (class $line47.$read, $line47.$read@72ee2f87)
    - field (class: $iw, name: $line47$read, type: class $line47.$read)
    - object (class $iw, $iw@22c4de5a)
    - field (class: $iw, name: $outer, type: class $iw)
    - object (class $iw, $iw@3daea539)
    - field (class: $anonfun$1, name: $outer, type: class $iw)
    - object (class $anonfun$1, <function2>)
    - element of array (index: 9)
    - array (class [Ljava.lang.Object;, size 15)
    - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, name: references$1, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, <function2>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
  ... 92 more

/可能是什么原因/

MyMain.scala 指的是 Spark 数据帧上的一些转换中的一些不可序列化的类实例

/我尝试了什么/

object MyFunction extends Serializable {
  val myFuncSingleton = new MyFunction()
  def getMyFunc(var0:String,var1:String) : String = {
    myFuncSingleton.myFunc(var0,var1)
  }
}

import org.apache.spark.sql.functions.udf
val myUDF = udf((val0: String, val1: String) => { MyFunction.getMyFunc(val0, val1) })

object MyMain {
  val spark = ...
  val hadoopfs = ...
  def main(args: Array[String]) : Unit = {
    val df1 = ...
    val df2 = df1.withColumn("reg_id", myUDF(lit("Subscriber"), col("id")))
  }
}

参考以下链接 how-to-solve-non-serializable-errors-when-instantiating-objects-in-spark-udfs

【问题讨论】:

  • 你标记MyFunction类也Serializable
  • 是的,从 Java 类开始,public class MyFunction implements java.io.Serializable

标签: scala apache-spark


【解决方案1】:

对代码进行了细微的调整,它解决了我的问题。

虽然我不完全了解 Scala 编译器的内部工作原理以及它如何处理 UDF,但我将尝试解释我的解决方案以及 Task not serializable 错误的可能原因:

  1. myUDF 变量在 withColumn(...) 中的使用不在任何 RDD 闭包中。
  2. 在驱动程序外部的udf(...) 定义内,在Scala 对象MyFunction 上调用getMyFunc(...) 相当于调用静态方法,因此MyFunction 对象不需要序列化,因为它用作单例对象和不是 MyFunction 类的实例(在 MyFunction.jar 中定义)。这解释了 MyFunction 定义从 object MyFunction extends Serializableobject MyFunction 的变化。
  3. 但是,在“包装器”单例 MyFunction 对象中,myFuncSingleton 被定义为 MyFunction 类的实例(在 jar 中),myFuncSingleton.myFunc(...) 调用此实例的 myFunc(...) 方法。
  4. 但是,myFuncSingleton 实例和它的 MyFunction 类在驱动程序中通过 myUDF 被引用在 RDD 闭包之外(如 1. 中所述),因此需要显式序列化 MyFunction 类,即 public class MyFunction implements java.io.Serializable (由于jar内置Java类)
  5. 如 1. 中所述,由于 withColumn(...) 内的 UDF 调用不在 RDD 闭包内,因此需要对 MyMain 对象进行序列化以使 UDF 跨分区可用,即 object MyMain extends Serializable

    object MyFunction {
      val myFuncSingleton = new MyFunction()
      def getMyFunc(var0:String,var1:String) : String = {
        myFuncSingleton.myFunc(var0,var1)
      }
    }
    
    import org.apache.spark.sql.functions.udf
    val myUDF = udf((val0: String, val1: String) => { MyFunction.getMyFunc(val0, val1) })
    
    object MyMain extends Serializable {
      val spark = ...
      val hadoopfs = ...
      def main(args: Array[String]) : Unit = {
        val df1 = ...
        val df2 = df1.withColumn("reg_id", myUDF(lit("Subscriber"), col("id")))
      }
    }
    

注意:

  • 综上所述,我是通过MyFunction单例对象的静态方法调用来调用一个MyFunction实例方法。因此,val myFuncVar = new MyFunction() 应该比 val myFuncSingleton = new MyFunction() 更合适。
  • 我不完全理解 RDD 闭包的细微差别,也不确定 withColumn() 是否在 RDD 闭包之外,但为了解释起见假设。

这里有一些很好的解释:How Spark handles object

【讨论】:

    猜你喜欢
    • 2015-05-31
    • 2016-07-27
    • 2015-08-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-02-07
    相关资源
    最近更新 更多