【问题标题】:Spark Dataframe stat throwing Task not serializableSpark Dataframe stat throwing Task not serializable
【发布时间】:2020-03-05 03:48:50
【问题描述】:

我想做什么? (上下文)

我正在尝试计算 spark 中的数据帧/集的一些统计数据,该数据帧/集是从包含 2013 年至 2015 年美国航班的 .parquet 文件的目录中读取的。更具体地说,我使用的是 approxQuantile 方法在DataFrameStatFunction 中,可以在Dataset 上调用stat 方法来访问它。 See docu

import airportCaseStudy.model.Flight
import org.apache.spark.sql.SparkSession

object CaseStudy {

  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder
      .master("local[*]")
      .getOrCreate
    val sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    import spark.sqlContext.implicits._

    val flights = spark
      .read
      .parquet("C:\\Users\\Bluetab\\IdeaProjects\\GraphFramesSparkPlayground\\src\\resources\\flights")
      .as[Flight]

    flights.show()
    flights.printSchema()
    flights.describe("year", "flightEpochSeconds").show()

    val approxQuantiles = flights.stat
      .approxQuantile(Array("year", "flightEpochSeconds"), Array(0.25, 0.5, 0.75), 0.25)
    // whatever...
  }
}

Flight 只是一个案例类。

package airportCaseStudy.model

case class Flight(year: Int, quarter: Int, month: Int, dayOfMonth: Int, dayOfWeek: Int, flightDate: String,
                  uniqueCarrier: String, airlineID: String, carrier: String, tailNum: String, flightNum: Int,
                  originAirportID: String, origin: String, originCityName: String, dstAirportID: String,
                  dst: String, dstCityName: String, taxiOut: Float, taxiIn: Float, cancelled: Boolean,
                  diverted: Float, actualETMinutes: Float, airTimeMinutes: Float, distanceMiles: Float, flightEpochSeconds: Long)

有什么问题?

我使用的是 Spark 2.4.0。

在执行val approxQuantiles = flights.stat.approxQuantile(Array("year", "flightEpochSeconds"), Array(0.25, 0.5, 0.75), 0.25) 时,我没有完成它,因为肯定有这样一个不能序列化的任务。我花了一些时间查看以下链接,但无法弄清楚为什么会出现此异常。

异常

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
    at org.apache.spark.rdd.PairRDDFunctions.$anonfun$combineByKeyWithClassTag$1(PairRDDFunctions.scala:88)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.PairRDDFunctions.combineByKeyWithClassTag(PairRDDFunctions.scala:77)
    at org.apache.spark.rdd.PairRDDFunctions.$anonfun$foldByKey$1(PairRDDFunctions.scala:222)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.PairRDDFunctions.foldByKey(PairRDDFunctions.scala:211)
    at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1158)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1137)
    at org.apache.spark.sql.execution.stat.StatFunctions$.multipleApproxQuantiles(StatFunctions.scala:102)
    at org.apache.spark.sql.DataFrameStatFunctions.approxQuantile(DataFrameStatFunctions.scala:104)
    at airportCaseStudy.CaseStudy$.main(CaseStudy.scala:27)
    at airportCaseStudy.CaseStudy.main(CaseStudy.scala)
Caused by: java.io.NotSerializableException: scala.runtime.LazyRef
Serialization stack:
    - object not serializable (class: scala.runtime.LazyRef, value: LazyRef thunk)
    - element of array (index: 2)
    - array (class [Ljava.lang.Object;, size 3)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.rdd.PairRDDFunctions, functionalInterfaceMethod=scala/Function0.apply:()Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/rdd/PairRDDFunctions.$anonfun$foldByKey$2:(Lorg/apache/spark/rdd/PairRDDFunctions;[BLscala/runtime/LazyRef;)Ljava/lang/Object;, instantiatedMethodType=()Ljava/lang/Object;, numCaptured=3])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class org.apache.spark.rdd.PairRDDFunctions$$Lambda$2158/61210602, org.apache.spark.rdd.PairRDDFunctions$$Lambda$2158/61210602@165a5979)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 2)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.rdd.PairRDDFunctions, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/rdd/PairRDDFunctions.$anonfun$foldByKey$3:(Lscala/Function0;Lscala/Function2;Ljava/lang/Object;)Ljava/lang/Object;, instantiatedMethodType=(Ljava/lang/Object;)Ljava/lang/Object;, numCaptured=2])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class org.apache.spark.rdd.PairRDDFunctions$$Lambda$2159/758750856, org.apache.spark.rdd.PairRDDFunctions$$Lambda$2159/758750856@6a6e410c)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
    ... 22 more

感谢您提供的任何帮助。

【问题讨论】:

标签: scala apache-spark


【解决方案1】:

为您的类或对象添加“扩展可序列化”。

class/Object Test extends Serializable{
//type you code
}

【讨论】:

    猜你喜欢
    • 2020-11-08
    • 2018-07-08
    • 2018-01-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-12-14
    • 2020-09-22
    • 2020-11-13
    相关资源
    最近更新 更多