【问题标题】:How to fix java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List to field type scala.collection.Seq?如何修复 java.lang.ClassCastException:无法将 scala.collection.immutable.List 的实例分配给字段类型 scala.collection.Seq?
【发布时间】:2017-02-18 13:52:37
【问题描述】:

这个错误是最难追踪的。我不确定发生了什么。我在我的位置机器上运行一个 Spark 集群。所以整个火花集群都在一个主机下,它是127.0.0.1,我在独立模式下运行

JavaPairRDD<byte[], Iterable<CassandraRow>> cassandraRowsRDD= javaFunctions(sc).cassandraTable("test", "hello" )
   .select("rowkey", "col1", "col2", "col3",  )
   .spanBy(new Function<CassandraRow, byte[]>() {
        @Override
        public byte[] call(CassandraRow v1) {
            return v1.getBytes("rowkey").array();
        }
    }, byte[].class);

Iterable<Tuple2<byte[], Iterable<CassandraRow>>> listOftuples = cassandraRowsRDD.collect(); //ERROR HAPPENS HERE
Tuple2<byte[], Iterable<CassandraRow>> tuple = listOftuples.iterator().next();
byte[] partitionKey = tuple._1();
for(CassandraRow cassandraRow: tuple._2()) {
    System.out.println("************START************");
    System.out.println(new String(partitionKey));
    System.out.println("************END************");
}

这个错误是最难追踪的。它显然发生在cassandraRowsRDD.collect(),我不知道为什么?

16/10/09 23:36:21 ERROR Executor: Exception in task 2.3 in stage 0.0 (TID 21)
java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

这是我使用的版本

Scala code runner version 2.11.8  // when I run scala -version or even ./spark-shell


compile group: 'org.apache.spark' name: 'spark-core_2.11' version: '2.0.0'
compile group: 'org.apache.spark' name: 'spark-streaming_2.11' version: '2.0.0'
compile group: 'org.apache.spark' name: 'spark-sql_2.11' version: '2.0.0'
compile group: 'com.datastax.spark' name: 'spark-cassandra-connector_2.11' version: '2.0.0-M3': 

我的 gradle 文件在引入了一个名为“provided”的东西之后看起来像这样,实际上它似乎并不存在,但谷歌说要创建一个,所以我的 build.gradle 看起来像这样

group 'com.company'
version '1.0-SNAPSHOT'

apply plugin: 'java'
apply plugin: 'idea'

repositories {
    mavenCentral()
    mavenLocal()
}

configurations {
    provided
}
sourceSets {
    main {
        compileClasspath += configurations.provided
        test.compileClasspath += configurations.provided
        test.runtimeClasspath += configurations.provided
    }
}

idea {
    module {
        scopes.PROVIDED.plus += [ configurations.provided ]
    }
}

dependencies {
    compile 'org.slf4j:slf4j-log4j12:1.7.12'
    provided group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.0.0'
    provided group: 'org.apache.spark', name: 'spark-streaming_2.11', version: '2.0.0'
    provided group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.0.0'
    provided group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.11', version: '2.0.0-M3'
}



jar {
    from { configurations.provided.collect { it.isDirectory() ? it : zipTree(it) } }
   // with jar
    from sourceSets.test.output
    manifest {
        attributes 'Main-Class': "com.company.batchprocessing.Hello"
    }
    exclude 'META-INF/.RSA', 'META-INF/.SF', 'META-INF/*.DSA'
    zip64 true
}

【问题讨论】:

标签: java apache-spark spark-cassandra-connector


【解决方案1】:

我遇到了同样的问题,可以通过将我的应用程序的 jar 添加到 spark 的类路径来解决它

spark = SparkSession.builder()
        .appName("Foo")
        .config("spark.jars", "target/scala-2.11/foo_2.11-0.1.jar")

【讨论】:

  • 这对我有用,但为什么有必要这样做?有人知道吗?如果真的需要,我希望 spark 能自动解决这个问题。
  • 我认为,每当您使用引用项目方法/类的 lambda 执行任何类型的 map 操作时,您都需要将它们作为附加 jar 提供。 Spark 确实序列化了 lambda 本身,但没有将其依赖关系整合在一起。不知道为什么错误消息根本没有信息。
【解决方案2】:

我遇到了同样的异常并深入研究了多个相关的 Jiras(92191267518075)。

我认为异常名称令人困惑,真正的问题是火花集群和驱动应用程序之间的环境设置不一致

例如,我在conf/spark-defaults.conf 中使用以下行启动了我的 Spark 集群:

spark.master                     spark://master:7077

当我启动我的驱动程序时(甚至程序以spark-submit 启动),一行:

sparkSession.master("spark://<master ip>:7077")

其中&lt;master ip&gt; 是节点master 的正确IP 地址,但由于这种简单的不一致,程序会失败。

因此,我建议所有驱动程序应用程序都以spark-submit 启动,并且不要在驱动程序代码中重复任何配置(除非您需要覆盖某些配置)。也就是说,让spark-submit在运行的Spark集群中以同样的方式设置你的环境。

【讨论】:

    【解决方案3】:

    在我的情况下,我必须添加 spark-avro jar(我将其放入主 jar 旁边的 /lib 文件夹中):

    SparkSession spark = SparkSession.builder().appName("myapp").getOrCreate();
    ...
    spark.sparkContext().addJar("lib/spark-avro_2.11-4.0.0.jar");
    

    【讨论】:

    • 在我的情况下我不得不使用spark.sparkContext.addJar("lib/spark-avro_2.11-4.0.0.jar");(括号已删除)。
    【解决方案4】:

    您的 call() 方法应该返回 byte[] 如下所示。

    @Override
    public byte[] call(CassandraRow v1) {
      return v1.getBytes("rowkey").array();
    }
    

    如果您仍然遇到问题,请检查 Jira https://issues.apache.org/jira/browse/SPARK-9219 中提到的依赖项的版本

    【讨论】:

    • 嗨!抱歉,我确实有 .array() 并且我刚刚更新了问题。看起来我在这里粘贴代码时搞砸了,但现在应该没问题了。
    • 我也看到了那个链接,但我不知道那里发生了什么,这就是为什么我粘贴了我正在使用的所有版本。我正在使用 Java 8,所以我真的不知道 scala 的东西,我不明白将库标记为“提供”是什么意思
    • 我测试了你的代码,它在独立模式下使用 spark 2.0.0 运行良好。尝试清理您的构建环境,重新构建和测试。 “提供”依赖意味着 jar 将在运行时可用。请查看maven.apache.org/guides/introduction/…
    • 您是否将它们标记为已提供?如果是这样,您在上面标记的库中提供了哪些库?
    • 我正在使用 maven 依赖项在 eclipse 中运行 spark java 程序,所以我没有将它们标记为已提供。如果你想使用 spark-submit 在集群中运行你的构建 jar,并且想使用 spark 提供的 jar,那么你可以将它们标记为已提供。请检查您的构建环境和集群环境中的jar文件和版本。
    【解决方案5】:

    检查代码 - 在 Intellij 中:分析... -> 检查代码。如果您已弃用与序列化相关的方法,请修复它。或者干脆尝试减少 Spark o Scala 版本。就我而言,我将 Scala 版本降低到 2.10 并且一切正常。

    【讨论】:

      【解决方案6】:

      我在 Eclipse 中在 spark 集群中的一个节点(即 ubuntu box)上运行我的工作时遇到了同样的问题。 我将 UDF 创建为一个单独的 java 类。虽然在本地运行 spark 一切都很好,但转向 yarn 会引发与问题中相同的异常。

      我通过将生成的类的路径放到 spark 类路径中解决了这个问题,其中包括类似于 Holger Brandl 建议的 UDF 类。

      我为类路径创建了一个变量:

      String cpVar = "..../target/classes"
      

      并作为配置变量添加到 spark 中:

      .config("spark.driver.extraClassPath", cpVar)
      .config("spark.executorEnv.CLASSPATH", cpVar)
      

      编辑:

      将路径添加到classpath只解决了驱动节点,集群中的其他节点仍然可能有同样的错误。我得到的最终解决方案是在每次构建后将生成的类放到 hdfs 中,并将类路径设置为 hdfs 文件夹,如下所示。

      sparkSession.sparkContext().addJar("hdfs:///user/.../classes");
      

      请看TheMP的回答

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2020-06-30
        • 2019-04-04
        • 2018-03-19
        • 1970-01-01
        • 1970-01-01
        • 2014-12-03
        • 2019-07-12
        • 1970-01-01
        相关资源
        最近更新 更多