【问题标题】:How spark handles objectspark如何处理对象
【发布时间】:2016-11-14 20:01:02
【问题描述】:

为了测试 Spark 中的序列化异常,我以 2 种方式编写了一个任务。
第一种方式:

package examples
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object dd {
  def main(args: Array[String]):Unit = {
    val sparkConf = new SparkConf
    val sc = new SparkContext(sparkConf)

    val data = List(1,2,3,4,5)
    val rdd = sc.makeRDD(data)
    val result = rdd.map(elem => {
      funcs.func_1(elem)
    })        
    println(result.count())
  }
}

object funcs{
  def func_1(i:Int): Int = {
    i + 1
  }
}

这种方式 spark 效果很好。
当我将其更改为以下方式时,它不起作用并抛出 NotSerializableException。
第二种方式:

package examples
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object dd {
  def main(args: Array[String]):Unit = {
    val sparkConf = new SparkConf
    val sc = new SparkContext(sparkConf)

    val data = List(1,2,3,4,5)
    val rdd = sc.makeRDD(data)

    val handler = funcs
    val result = rdd.map(elem => {
      handler.func_1(elem)
    })

    println(result.count())

  }
}

object funcs{
  def func_1(i:Int): Int = {
    i + 1
  }
}

我知道我收到错误“任务不可序列化”的原因是因为我试图在第二个示例中将一个不可序列化的对象 funcs 从驱动程序节点发送到工作程序节点。对于第二个例子,如果我让对象funcs 扩展Serializable,这个错误就会消失。

但在我看来,因为funcs 是一个对象而不是一个类,所以它是一个单例,应该被序列化并从驱动程序传送到工作程序,而不是在工作程序节点本身内实例化。在这种情况下,虽然使用对象funcs 的方式不同,但我猜想在这两个示例中,不可序列化对象funcs 是从驱动节点传送到工作节点的。

我的问题是为什么第一个示例可以成功运行,但第二个示例失败并出现“任务不可序列化”异常。

【问题讨论】:

    标签: serialization apache-spark rdd


    【解决方案1】:

    当您在 RDD 闭包(映射、过滤器等)中运行代码时,执行该代码所需的所有内容都将被打包、序列化并发送到执行程序以运行。任何被引用的对象(或其字段被引用)都将在这个任务中被序列化,这就是你有时会得到NotSerializableException 的地方。

    不过,您的用例稍微复杂一些,并且涉及到 scala 编译器。通常,在 scala 对象上调用函数等同于调用 java 静态方法。该对象从未真正存在过——它基本上就像编写内联代码一样。但是,如果您将一个对象分配给一个变量,那么您实际上是在内存中创建对该对象的引用,并且该对象的行为更像一个类,并且可能存在序列化问题。

    scala> object A { 
      def foo() { 
        println("bar baz")
      }
    }
    defined module A
    
    scala> A.foo()  // static method
    bar baz
    
    scala> val a = A  // now we're actually assigning a memory location
    a: A.type = A$@7e0babb1
    
    scala> a.foo()  // dereferences a before calling foo
    bar baz
    

    【讨论】:

    • 感谢蒂姆的回复。我试图将函数从一个对象更改为一个类(不扩展可序列化)。然后我在 RDD 闭包中从这个类实例化一个对象。它也有效。代码就像rdd.map(x => {val handler = new funcs; funcs.func_1(x)})。在这种情况下,我没有调用静态方法。在 RDD 闭包中实例化一个不可序列化的类后,我调用了一个方法。该对象存在且不可序列化,但程序可以工作。
    • 在这种情况下,您在闭包内实例化一个对象,因此它没有被序列化。这里的funcs类在驱动JVM上不存在任何形式。
    • 好的,我想我明白你的意思了。如果我在 RDD 闭包中实例化一个类,那么该类是否可序列化并不重要,因为 RDD 中的每个元素都会创建一个新对象。如果我想在 RDD 闭包中使用一个方法,并且这个方法引用了一个在驱动程序中实例化的对象,那么只有在类是可序列化的情况下才能执行任务。对吗?
    【解决方案2】:

    为了让 Spark 分发给定的操作,操作中使用的函数需要被序列化。在序列化之前,这些函数会通过一个称为“ClosureCleaner”的复杂过程。

    目的是从它们的上下文中“切断”闭包,以减少需要序列化的对象图的大小并降低过程中出现序列化问题的风险。换句话说,确保只有执行函数所需的代码被序列化并发送到“另一端”进行反序列化和执行

    在该过程中,闭包也被评估为可序列化,以便在运行时主动检测序列化问题 (SparkContext#clean)。

    该代码密集且复杂,因此很难找到导致这种情况的正确代码路径。

    直观地,当ClosureCleaner 发现时发生的情况是:

    val result = rdd.map{elem => 
      funcs.func_1(elem)
    } 
    

    它将闭包的内部成员评估为来自can be recreated and there are no further references的对象,因此清理后的闭包仅包含{elem => funcs.func_1(elem)},可以由JavaSerializer序列化。

    相反,当闭包清理器评估时:

    val handler = funcs
    val result = rdd.map(elem => {
      handler.func_1(elem)
    })
    

    它发现闭包有一个对$outer (handler) 的引用,因此它检查外部作用域并将and 变量实例添加到已清理的闭包中。我们可以想象得到的清洁后的封闭是这种形状的东西(这仅用于说明目的):

    {elem => 
      val handler = funcs
      handler.func_1(elem)
    } 
    

    当针对serialization 测试闭包时,它无法序列化。根据 JVM 序列化规则,如果递归地其所有成员都是可序列化的,则对象是可序列化的。在这种情况下,handler 引用了一个不可序列化的对象并且检查失败。

    【讨论】:

    • 感谢您的回复。根据您的回答,据我所知,对象funcs以某种方式在工作节点中重新创建,并且在工作节点中将有另一个全新的对象func。如果我在对象funcs 中有一个var counter,并且我在RDD.map 中为var counter 做了一个增量,那么驱动节点中的funcs.counter 会改变吗?
    • 不,它不会。静态对象是 JVM 实例的本地对象。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-12-06
    • 2011-10-13
    • 2016-01-16
    • 2012-06-06
    • 2018-08-19
    • 1970-01-01
    相关资源
    最近更新 更多