【问题标题】:Decoupling non-serializable object to avoid Serialization error in Spark解耦不可序列化的对象以避免 Spark 中的序列化错误
【发布时间】:2016-03-10 21:44:28
【问题描述】:

以下类包含尝试从 Elasticsearch 读取并打印返回的文档的主函数:

object TopicApp extends Serializable {

  def run() {

    val start = System.currentTimeMillis()

    val sparkConf = new Configuration()
    sparkConf.set("spark.executor.memory","1g")
    sparkConf.set("spark.kryoserializer.buffer","256")

    val es = new EsContext(sparkConf)
    val esConf = new Configuration()
    esConf.set("es.nodes","localhost")
    esConf.set("es.port","9200")
    esConf.set("es.resource", "temp_index/some_doc")
    esConf.set("es.query", "?q=*:*")
    esConf.set("es.fields", "_score,_id")

    val documents = es.documents(esConf)
    documents.foreach(println)

    val end = System.currentTimeMillis()
    println("Total time: " + (end-start) + " ms")

    es.shutdown()

  }

  def main(args: Array[String]) {
    run()
  }

}

以下类使用org.json4s将返回的文档转换为JSON

class EsContext(sparkConf:HadoopConfig) extends SparkBase {
  private val sc = createSCLocal("ElasticContext", sparkConf)

  def documentsAsJson(esConf:HadoopConfig):RDD[String] = {
    implicit val formats = DefaultFormats
    val source = sc.newAPIHadoopRDD(
      esConf,
      classOf[EsInputFormat[Text, MapWritable]],
      classOf[Text],
      classOf[MapWritable]
    )
    val docs = source.map(
      hit => {
        val doc = Map("ident" -> hit._1.toString) ++ mwToMap(hit._2)
        write(doc)
      }
    )
    docs
  }

  def shutdown() = sc.stop()

  // mwToMap() converts MapWritable to Map

}

以下类为应用程序创建本地SparkContext

trait SparkBase extends Serializable {
  protected def createSCLocal(name:String, config:HadoopConfig):SparkContext = {
    val iterator = config.iterator()
    for (prop <- iterator) {
      val k = prop.getKey
      val v = prop.getValue
      if (k.startsWith("spark."))
        System.setProperty(k, v)
    }
    val runtime = Runtime.getRuntime
    runtime.gc()

    val conf = new SparkConf()
    conf.setMaster("local[2]")

    conf.setAppName(name)
    conf.set("spark.serializer", classOf[KryoSerializer].getName)

    conf.set("spark.ui.port", "0")

    new SparkContext(conf)
  }
}

当我运行 TopicApp 时,我收到以下错误:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.map(RDD.scala:323)
    at TopicApp.EsContext.documents(EsContext.scala:51)
    at TopicApp.TopicApp$.run(TopicApp.scala:28)
    at TopicApp.TopicApp$.main(TopicApp.scala:39)
    at TopicApp.TopicApp.main(TopicApp.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@14f70e7d)
    - field (class: TopicApp.EsContext, name: sc, type: class org.apache.spark.SparkContext)
    - object (class TopicApp.EsContext, TopicApp.EsContext@2cf77cdc)
    - field (class: TopicApp.EsContext$$anonfun$documents$1, name: $outer, type: class TopicApp.EsContext)
    - object (class TopicApp.EsContext$$anonfun$documents$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    ... 13 more

浏览其他涵盖类似问题的帖子,大多建议创建类Serializable 或尝试将不可序列化的对象与类分开。

根据我得到的错误,我推断SparkContextsc 是不可序列化的,因为 SparkContext 不是可序列化的类。

我应该如何解耦 SparkContext,让应用程序正常运行?

【问题讨论】:

    标签: scala serialization apache-spark


    【解决方案1】:

    我无法确定你的程序是否运行,但一般规则是,如果必须在 RDD 的数据上执行它们,则不要创建引用不可序列化类成员的匿名函数。在你的情况下:

    • EsContext 有一个 SparkContext 类型的 val,它(故意)不可序列化
    • EsContext.documentsAsJson 中传递给RDD.map 的匿名函数中,您调用此EsContext 实例(mwToMap) 的另一个函数,它强制Spark 序列化该实例以及它所拥有的SparkContext

    一种可能的解决方案是从EsContext 类中删除mwToMap可能进入EsContext 的伴随对象 - 对象不需要可序列化,因为它们是静态的)。如果还有其他相同性质的方法(write?),它们也必须被移动。这看起来像:

    import EsContext._
    
    class EsContext(sparkConf:HadoopConfig) extends SparkBase {
       private val sc = createSCLocal("ElasticContext", sparkConf)
    
       def documentsAsJson(esConf: HadoopConfig): RDD[String] = { /* unchanged */ }
       def documents(esConf: HadoopConfig): RDD[EsDocument] = { /* unchanged */ }
       def shutdown() = sc.stop()
    }
    
    object EsContext {
       private def mwToMap(mw: MapWritable): Map[String, String] = { ... }
    }
    

    如果将这些方法移出是不可能的(即,如果它们需要 EsContext 的一些成员) - 然后考虑将执行实际映射的类与此上下文分开(这似乎是某种包装SparkContext - 如果是这样,那就是它应该是的all)。

    【讨论】:

    • 我添加了完整的文件here。想看的可以去看看
    • 我复制了你的代码 - 仍然无法编译,我想有一些版本差异......无论如何,我做了我能做的,希望它有帮助,接受或离开它:)
    • write 是来自org.json4s.native 的函数。那么会有什么不同吗?
    • 可能不会——因为它是一个静态函数。我会从答案中删除它
    • 既然你说你无法运行代码。我将使用我使用的 POM 文件对其进行更新。有空的时候可以试试。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-06-19
    相关资源
    最近更新 更多