【问题标题】:Spark function not serializableSpark函数不可序列化
【发布时间】:2019-06-25 16:46:21
【问题描述】:

我有一堂课:

class DataLoader {

  def rdd2RddTransform(
    ss: SparkSession,
    inputRDD: RDD[GenericRecord]): RDD[GenericRecord] = {

    inputRDD.asInstanceOf[RDD[TrainingData]]
            .map(reformatTrainingData)
  }

  private def reformatTrainingData: TrainingData => ReFormatedData
               = (trainingData: TrainingData) => {func implement}

}

它工作得很好,但它抛出了一个异常: org.apache.spark.SparkException: Task not serializable我对RDD的地图做了一点小改动后:

inputRDD.asInstanceOf[RDD[TrainingData]].map(reformatTrainingData(_))

我认为这两个功能应该相同,但似乎并非如此。为什么它们不同?

【问题讨论】:

标签: scala apache-spark


【解决方案1】:

这是因为 Scala 中的方法和函数不能完全互换。

函数是独立的对象(即类的实例,例如Function1Function2Function3...),但方法仍然与它们的封闭类相关联。如果封闭类不是Serializable,这可能会在 Spark 中产生问题 - 当 Spark 尝试序列化方法时,它无法序列化关联的类实例。

请注意,您的reformatTrainingData返回函数的方法

所以当你调用类似的东西时:

rdd.map(reformatTrainingData)

您实际上是在调用无参数 reformatTrainingData 方法并返回一个可以安全序列化的独立 Function1 实例。你也可以这样写

private def reformatTrainingData(): TrainingData => ReFormatedData ...

rdd.map(reformatTrainingData())

强调正在发生方法调用。

当您更改为reformatTrainingData(_) 时,您正在使用部分应用的方法;当 Spark 试图序列化这个时,它需要拉入并序列化封闭的DataLoader 类,它没有被标记为Serializable

如果reformatTrainingDataTrainingData => ReFormatedData 类型的简单方法,也会出现同样的问题。

如果您将DataLoader 标记为extends Serializable,那么任何一个版本都应该可以工作。

也可以将reformatTrainingData 变成val,因为序列化时 val 不会拉入封闭类:

private val reformatTrainingData: TrainingData => ReFormatedData ...

rdd.map(reformatTrainingData)

【讨论】:

    猜你喜欢
    • 2018-04-06
    • 2021-03-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-12-16
    • 2017-03-21
    • 2023-03-12
    相关资源
    最近更新 更多