【发布时间】:2018-07-05 09:28:44
【问题描述】:
我在 Spark 2.2.0 闭包中使用 Json4s 类。无法序列化DefaultFormats 的“解决方法”是包含需要它们的定义inside every closure executed by Spark。我相信我做的比下面需要做的多,但仍然出现序列化失败。
使用 Spark 2.2.0、Scala 2.11、Json4s 3.2.x(Spark 中的任何内容),还尝试使用 Json4s 3.5.3,通过使用 sbt 将其引入我的工作。在所有情况下,我都使用了下面显示的解决方法。
有谁知道我做错了什么?
logger.info(s"Creating an RDD for $actionName")
implicit val formats = DefaultFormats
val itemProps = df.rdd.map[(ItemID, ItemProps)](row => { <--- error points to this line
implicit val formats = DefaultFormats
val itemId = row.getString(0)
val correlators = row.getSeq[String](1).toList
(itemId, Map(actionName -> JArray(correlators.map { t =>
implicit val formats = DefaultFormats
JsonAST.JString(t)
})))
})
我还尝试了另一个建议,即在类构造函数区域而不是在闭包中设置 DefaultFormats 隐式,任何地方都没有运气。
JVM 错误跟踪来自 Spark,抱怨该任务不可序列化并指向上面的行(无论如何,我的代码中的最后一行)然后解释了根本原因:
Serialization stack:
- object not serializable (class: org.json4s.DefaultFormats$, value: org.json4s.DefaultFormats$@7fdd29f3)
- field (class: com.actionml.URAlgorithm, name: formats, type: class org.json4s.DefaultFormats$)
- object (class com.actionml.URAlgorithm, com.actionml.URAlgorithm@2dbfa972)
- field (class: com.actionml.URAlgorithm$$anonfun$udfLLR$1, name: $outer, type: class com.actionml.URAlgorithm)
- object (class com.actionml.URAlgorithm$$anonfun$udfLLR$1, <function3>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$4, name: func$4, type: interface scala.Function3)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$4, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(input[2, bigint, false], input[3, bigint, false], input[5, bigint, false]))
- element of array (index: 1)
- array (class [Ljava.lang.Object;, size 3)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 128 more
【问题讨论】:
-
你能解决吗?我认为你应该在 map 函数中创建 json4s 实例。 json 实例可能在驱动节点中创建,但在执行节点中执行。
标签: json apache-spark serialization json4s