【问题标题】:Spark: Task not Serializable for UDF on DataFrameSpark:DataFrame 上 UDF 的任务不可序列化
【发布时间】:2016-08-16 03:15:17
【问题描述】:

当我尝试在 Spark 1.4.1 上执行以下操作时,我得到 org.apache.spark.SparkException: Task not serializable

import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat

object ConversionUtils {
  val iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX")

  def tsUTC(s: String): Timestamp = new Timestamp(iso8601.parse(s).getTime)

  val castTS = udf[Timestamp, String](tsUTC _)
}

val df = frame.withColumn("ts", ConversionUtils.castTS(frame("ts_str")))
df.first

这里,frame 是一个 DataFrame,它位于 HiveContext 中。该数据框没有任何问题。

我有类似的整数 UDF,它们可以正常工作。但是,带有时间戳的那个似乎会引起问题。根据documentationjava.sql.TimeStamp 实现了Serializable,所以这不是问题。 SimpleDateFormat 也是如此,如 here 所示。

这让我相信是 UDF 导致了问题。但是,我不确定是什么以及如何解决它。

trace的相关部分:

Caused by: java.io.NotSerializableException: ...
Serialization stack:
        - object not serializable (class: ..., value: ...$ConversionUtils$@63ed11dd)
        - field (class: ...$ConversionUtils$$anonfun$3, name: $outer, type: class ...$ConversionUtils$)
        - object (class ...$ConversionUtils$$anonfun$3, <function1>)
        - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2, name: func$2, type: interface scala.Function1)
        - object (class org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2, <function1>)
        - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUdf, name: f, type: interface scala.Function1)
        - object (class org.apache.spark.sql.catalyst.expressions.ScalaUdf, scalaUDF(ts_str#2683))
        - field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression)
        - object (class org.apache.spark.sql.catalyst.expressions.Alias, scalaUDF(ts_str#2683) AS ts#7146)
        - element of array (index: 35)
        - array (class [Ljava.lang.Object;, size 36)
        - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;)
        - object (class scala.collection.mutable.ArrayBuffer,

【问题讨论】:

    标签: scala serialization apache-spark


    【解决方案1】:

    试试:

    object ConversionUtils extends Serializable {
      ...
    }
    

    【讨论】:

    • @CharlieRosenfeld 需要对对象进行序列化才能将其发送到处理节点。所以,需要在节点上运行的函数需要定义在可序列化的对象上
    • 工作就像一个魅力.. 谢谢@DavidGriffin 虽然我想知道为什么会这样。你能评论一些清楚解释这个问题的链接吗?
    • @ChaitanyaVemulakonda 有一篇关于它的深入文章medium.com/swlh/spark-serialization-errors-e0eebcf0f6e6(方法基本上需要有父对象Serializable,以便它们可以发送给执行者)
    猜你喜欢
    • 2016-07-10
    • 1970-01-01
    • 2020-02-04
    • 1970-01-01
    • 2020-09-29
    • 1970-01-01
    • 2018-04-06
    • 2021-03-16
    • 2018-09-19
    相关资源
    最近更新 更多