【问题标题】:Scala reflection with Serialization (over Spark) - Symbols not serializable带有序列化的 Scala 反射(通过 Spark) - 符号不可序列化
【发布时间】:2016-05-22 03:00:51
【问题描述】:

首先我使用的是 scala 2.10.4,上面的示例在 Spark 1.6 中运行(尽管我怀疑 Spark 与此有关,但这只是一个序列化问题)。

所以这是我的问题:假设我有一个特征 Base 由两个类 B1B2 实现。现在我有一个由一组类扩展的通用特征,其中一个是Base 的子类型,例如(这里我保留了 Spark 的 RDD 概念,但它实际上可能是其他东西,一旦它被序列化;不管实际是什么,东西都只是结果):

trait Foo[T] { def function(rdd: RDD[T]): Something }
class Foo1[B <: Base] extends Foo[B] { def function(rdd: RDD[B]): Something  = ... }
class Foo2 extends Foo[A] { def function(rdd: RDD[A]): Something  = ... }
...

现在我需要一个对象,该对象将采用RDD[T](假设这里没有歧义,它只是一个简化版本)并返回与T 类型对应的函数结果对应的Something。但它也应该适用于具有合并策略的Array[T]。到目前为止它看起来像:

object Obj {
   def compute[T: TypeTag](input: RDD[T]): Something = {
      typeOf[T] match {
         case t if t <:< typeOf[A] => 
            val foo = new Foo[T]
            foo.function(rdd)
         case t if t <:< typeOf[Array[A]] => 
            val foo = new Foo[A]
            foo.function(rdd.map(x => mergeArray(x.asInstance[Array[A]])))
         case t if t <:< typeOf[Base] => 
            val foo = new Foo[T]
            foo.function(rdd)
         // here it gets ugly...
         case t if t <:< typeOf[Array[_]] => // doesn't fall through with Array[Base]... why?
            val tt = getSubInfo[T](0)
            val tpe = tt.tpe
            val foo = new Foo[tpe.type]
            foo.function(rdd.map(x => (x._1, mergeArray(x._2.asInstanceOf[Array[tpe.type]]))
      }
   }

   // strategy to transform arrays of T into a T object when possible
   private def mergeArray[T: TypeTag](a: Array[T]): T = ... 

  // extract the subtype, e.g. if Array[Int] then at position 0 extracts a type tag for Int, I can provide the code but not fondamental for the comprehension of the problem though
   private def getSubInfo[T: TypeTag](i: Int): TypeTag[_] = ... 
}

不幸的是,它似乎在本地机器上工作正常,但是当它被发送到 Spark(序列化)时,我得到一个 org.apache.spark.SparkException: Task not serializable

Caused by: java.io.NotSerializableException: scala.reflect.internal.Symbols$PackageClassSymbol
Serialization stack:
    - object not serializable (class: scala.reflect.internal.Symbols$PackageClassSymbol, value: package types)
    - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: class scala.reflect.internal.Symbols$Symbol)

我确实有一个解决方法(很明显,列举可能性),但出于我的好奇心,有没有办法解决这个问题?为什么 Symbol 不是可序列化的,而它们在 Manifests 中的等价物是可序列化的?

感谢您的帮助。

【问题讨论】:

    标签: scala serialization reflection apache-spark


    【解决方案1】:

    TypeTag 现在通常可以在 scala 中序列化,但奇怪的是,不是直接类型(这很奇怪,因为 typetag 包含不是 :-/ 的符号)。

    这可能会做你想做的事

    // implicit constructor TypeTag parameter is serialized.
    abstract class TypeAware[T:TypeTag] extends Serializable {
      def typ:Type = _typeCached
    
      @transient
      lazy val _typeCached:Type = typeOf[T]
    }
    
    trait Foo[T] extends Serializable { 
      def function(rdd: RDD[T]): Something  {... impl here?...}
      def typ:Type 
    }
    
    class Concrete[T:TypeTag] extends TypeAware[T] with Foo[T] with Serializable{
       def function(rdd: RDD[T]): Something  {... impl here?...}
    }
    

    【讨论】:

    • 我相信 TypeApi 也包含 scala.reflect.internal.Symbols$PackageClassSymbol 所以这也行不通
    • 确实有效。 (在 scala 2.11 中)这里是合并请求 github.com/scala/scala/pull/3817
    猜你喜欢
    • 2021-08-12
    • 2015-12-16
    • 1970-01-01
    • 2015-04-28
    • 2017-09-21
    • 2012-09-17
    • 1970-01-01
    • 2020-08-05
    • 1970-01-01
    相关资源
    最近更新 更多