【问题标题】:Scala error: Exception in thread "main" org.apache.spark.SparkException: Task not serializableScala 错误:线程“主”org.apache.spark.SparkException 中的异常:任务不可序列化
【发布时间】:2021-12-20 11:06:39
【问题描述】:

运行此代码时出现不可序列化错误:

import org.apache.spark.{SparkContext, SparkConf}
import scala.collection.mutable.ArrayBuffer
object Task1 {
  def findHighestRatingUsers(movieRating: String): (String) = {
    val tokens = movieRating.split(",", -1)
    val movieTitle = tokens(0)
    val ratings = tokens.slice(1, tokens.size)
    val maxRating = ratings.max
    var userIds = ArrayBuffer[Int]()

    for(i <- 0 until ratings.length){
      if (ratings(i) == maxRating) {
        userIds += (i+1)
      }
    }

    return movieTitle + "," + userIds.mkString(",")

    return movieTitle
  }

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Task 1")
    val sc = new SparkContext(conf)

    val Lines = sc.textFile(args(0))


    val TitleAndMaxUserIds = Lines.map(findHighestRatingUsers)
      .saveAsTextFile(args(1))
  }
}

错误发生在以下行:

val TitleAndMaxUserIds =Lines.map(findHighestRatingUsers)
      .saveAsTextFile(args(1))

我相信这是由于函数“findHighestRatingUsers”中的某些原因造成的。有人可以解释为什么以及如何解决它吗?

异常中的更多信息如下:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2362)
    at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:396)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.RDD.map(RDD.scala:395)
    at Task1$.main(Task1.scala:63)
    at Task1.main(Task1.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: Task1$
Serialization stack:
    - object not serializable (class: Task1$, value: Task1$@3c770db4)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class Task1$, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic Task1$.$anonfun$main$1:(LTask1$;Ljava/lang/String;)Ljava/lang/String;, instantiatedMethodType=(Ljava/lang/String;)Ljava/lang/String;, numCaptured=1])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class Task1$$$Lambda$1023/20408451, Task1$$$Lambda$1023/20408451@4f59a516)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
    ... 22 more

我查看了这篇文章 Difference between object and class in Scala 并尝试使用对象来封装函数:

import org.apache.spark.{SparkContext, SparkConf}
import scala.collection.mutable.ArrayBuffer

object Function{
    def _findHighestRatingUsers(movieRating: String): (String) = {
      val tokens = movieRating.split(",", -1)
      val movieTitle = tokens(0)
      val ratings = tokens.slice(1, tokens.size)
      val maxRating = ratings.max
      var userIds = ArrayBuffer[Int]()

      for(i <- 0 until ratings.length){
        if (ratings(i) == maxRating) {
          userIds += (i+1)
        }
      }

      return movieTitle + "," + userIds.mkString(",")
    }

}

object Task1 {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Task 1")
    val sc = new SparkContext(conf)

    val textFile = sc.textFile(args(0))

    val output = textFile.map(Function._findHighestRatingUsers)
      .saveAsTextFile(args(1))
  }
}

但仍然出现异常,出现大量错误...


这次我尝试将对象函数放在对象task1中,如下所示:

import org.apache.spark.{SparkContext, SparkConf}
import scala.collection.mutable.ArrayBuffer
object Task1 {
    
  object Function{
    def _findHighestRatingUsers(movieRating: String): (String) = {
      val tokens = movieRating.split(",", -1)
      val movieTitle = tokens(0)
      val ratings = tokens.slice(1, tokens.size)
      val maxRating = ratings.max
      var userIds = ArrayBuffer[Int]()

      for(i <- 0 until ratings.length){
        if (ratings(i) == maxRating) {
          userIds += (i+1)
        }
      }

      return movieTitle + "," + userIds.mkString(",")
    }
  }

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Task 1")
    val sc = new SparkContext(conf)

    val textFile = sc.textFile(args(0))

    val output = textFile.map(Function._findHighestRatingUsers)
      .saveAsTextFile(args(1))
  }
}

问题解决了。但是我还是不知道为什么嵌套对象解决了这个问题。有人可以解释一下吗? 此外,我有几点不确定:

  1. scala 中的主要功能是什么?是节目的入口吗?
  2. 为什么要用对象来描述主函数?
  3. 谁能给出一个包含函数、类或一些基本组件的 Scala 程序的通用结构?

【问题讨论】:

  • 你能发布完整的堆栈跟踪吗?您是否尝试将方法 findHighestRatingUsers 移动到与包含 main 方法的对象不同的对象中?
  • 您使用的是什么版本的 Java?
  • 我已经在@GaëlJ 上面发布了错误。我不知道这些错误的含义。
  • 它的 java8 @tjheslin1
  • 什么异常。你在第二个代码示例中得到了堆栈跟踪吗?

标签: scala apache-spark


【解决方案1】:

首先,我建议您阅读有关 Scala 和 Spark 的文档来熟悉它,因为您的问题突出表明您才刚刚开始使用它。

我将为您关于“任务不可序列化”的原始问题提供一些见解(但不会准确回答),并让您为您稍后在帖子中添加的问题打开其他问题,否则此答案将搞得一团糟。

您可能知道,Spark 允许分布式计算。为此,Spark 所做的一件事是获取您编写的代码,对其进行序列化,然后将其发送给某处的一些执行程序以实际运行它。这里的关键部分是您的代码必须是可序列化的

你得到的错误是告诉你 Spark 不能序列化你的代码。

现在,如何使其可序列化?这就是它变得具有挑战性的地方,即使 Spark 试图通过提供“序列化堆栈”来帮助您,但有时它提供的信息并没有那么有用。

在您的情况下(代码的第一个示例),findHighestRatingUsers 必须被序列化,但要这样做,它必须序列化整个不可序列化的 object Task1

为什么Task1 不可序列化?我承认我不太确定,但我会打赌main 方法,虽然我希望你的第二个例子可以序列化。

您可以在网络上的各种文档或博客文章中阅读更多相关信息。例如:https://medium.com/swlh/spark-serialization-errors-e0eebcf0f6e6

【讨论】:

    猜你喜欢
    • 2016-12-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-04-02
    • 2015-05-31
    • 2017-01-01
    相关资源
    最近更新 更多