【问题标题】:Why does this Spark code throw java.io.NotSerializableException为什么这个 Spark 代码会抛出 java.io.NotSerializableException
【发布时间】:2017-10-05 05:42:24
【问题描述】:

我想在 RDD 的转换中访问伴随对象的方法。为什么以下不起作用:

import org.apache.spark.rdd.RDD
import spark.implicits._
import org.apache.spark.sql.{Encoder, Encoders}

class Abc {
    def transform(x: RDD[Int]): RDD[Double] = { x.map(Abc.fn) }
}

object Abc {
  def fn(x: Int): Double = { x.toDouble }
}

implicit def abcEncoder: Encoder[Abc] = Encoders.kryo[Abc]

new Abc().transform(sc.parallelize(1 to 10)).collect

上面的代码抛出了java.io.NotSerializableException:

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.map(RDD.scala:369)
  at Abc.transform(<console>:19)
  ... 47 elided
Caused by: java.io.NotSerializableException: Abc
Serialization stack:
        - object not serializable (class: Abc, value: Abc@4f598dfb)
        - field (class: Abc$$anonfun$transform$1, name: $outer, type: class Abc)
        - object (class Abc$$anonfun$transform$1, <function1>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
  ... 57 more

即使为类 Abc 定义 Encoder 在这里也无济于事。但更重要的问题是,为什么要尝试对 Abc 类的对象进行序列化?我的第一个想法是伴生对象是类的单例对象,所以也许有人尝试序列化它。但似乎并非如此,因为当我从另一个类调用 Abc.fn 时:

class Xyz {
    def transform(x: RDD[Int]): RDD[Double] = { x.map(Abc.fn) }
}

implicit def xyzEncoder: Encoder[Xyz] = Encoders.kryo[Xyz]

new Xyz().transform(sc.parallelize(1 to 10)).collect

我收到了java.io.NotSerializableException: Xyz

【问题讨论】:

  • 工作没有发生在边缘节点上;类(或对象)必须被序列化,以便数据节点可以运行它。
  • 因为您实际上并没有定义序列化/反序列化函数,更不用说实现正确的接口了? (docs.oracle.com/javase/7/docs/api/java/io/…) 默认情况下 sereilsiation 只能访问公开设置和可获取的内容。除此之外,您需要提供自己的功能。

标签: java scala apache-spark serialization rdd


【解决方案1】:

这是一篇很棒的文章,讨论了 Apache Spark 中的“可序列化”与“不可序列化对象”:

Using Non-Serializable Objects in Apache Spark, Nicola Ferraro

这篇文章提供了几个建议:

  • 在您的特定情况下发生了什么

  • 一些替代方案,因此您的对象不需要“可序列化”

【讨论】:

    【解决方案2】:

    主要的抽象os spark是跨集群节点分区的RDD。所以当我们运行 RDD 时,它会在驱动节点中被序列化,然后分发给其他合适的节点。然后工作节点对其进行反序列化并执行。

    在您的情况下,ABC 类无法序列化并分发到其他工作节点。 您需要使用 Serializable 序列化 Class ABC

    class Abc with Serializable{
        def transform(x: RDD[Int]): RDD[Double] = { x.map(Abc.fn) }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多