【问题标题】:How to run Spark application on Spark cluster from Servlet's doGet?如何从 Servlet 的 doGet 在 Spark 集群上运行 Spark 应用程序?
【发布时间】:2016-03-28 20:53:11
【问题描述】:

我正在尝试在集群模式下从 servlet 运行 Spark 应用程序。

当使用spark-submitlocal 以及集群模式时,它工作正常。但在集群模式下从 Servlet 的 doGet() 方法运行时不会。

这是一个例外:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5.0 failed 4 times, most recent failure: Lost task 1.3 in stage 5.0 (TID 24, 192.168.1.38): java.lang.ClassNotFoundException: ngi.spark.manipulator.util.UtilSparkRDD$1
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:278)
    at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:

我在 Apache Tomcat 上部署我的 servlet,并且在发送 GET 请求以运行我的 Spark 程序时发生上述 SparkException。

这是即将发生异常的类:

public class UtilSparkRDD implements Serializable {

    private static final long serialVersionUID = 8077220930563809961L;

    public JavaRDD<Row> getDataFromFile(JavaSparkContext sparkContext,
            String filePath, final String rowDelimeter, final String  fieldDelimeter) {
        JavaRDD<String> javaRDD = sparkContext.textFile(filePath).flatMap(
                new FlatMapFunction<String, String>() {
                    private static final long serialVersionUID = 8248977466300597565L;

                    public Iterable<String> call(String data) {
                        return Arrays.asList(data.split(rowDelimeter));
                    }
                });

        // Convert records of the RDD to Rows.
        JavaRDD<Row> rowRDD = javaRDD.map(
                new Function<String, Row>() {

                    private static final long serialVersionUID = 4608543635003549767L;

                    public Row call(String record) throws Exception {
                        Object[] fields = record.split(fieldDelimeter);
                        return RowFactory.create(fields);


                    }   
                });
        return rowRDD;
    }
}

【问题讨论】:

    标签: java tomcat servlets apache-spark


    【解决方案1】:

    真正的原因在于在哪里以及如何创建 SparkContext 的实例。对于序列化程序来说,通过网络发送给执行程序以启动任务是很常见的。

    ngi.spark.manipulator.util.UtilSparkRDDUtilSparkRDD 类的完全限定包名,不是吗?

    整个类被序列化并发送给执行者,但他们在本地没有它,在他们的 CLASSPATH 上,反序列化失败。

    一种解决方案是使用不应通过网络发送的静态或瞬态字段,确切地说,有必要查看整个ngi.spark.manipulator.util.UtilSparkRDD

    【讨论】:

    • 我想在 init 方法中创建火花上下文,这样它就不会每次都创建火花上下文,所以每当我点击 do get 方法时,它都会从全局变量中获取上下文并将其传递给每次使用.但它不能在集群上工作,而在本地模式下工作得很好。
    • 您正在init 中创建SparkContext,我您的servlet 确实已初始化,这意味着上下文已创建。 在您执行 GET 时抛出异常,而这是您尚未在附加代码中显示的内容。你能显示整个 doGet 你在哪里执行一个动作(触发导致异常的作业)?
    • 在 doGet 方法中,我粘贴了我的主类的代码,但用于创建 spark 上下文的代码除外。所以基本上 servlet doget 方法就像我在运行我的应用程序一样。(它也在本地模式下工作)
    【解决方案2】:

    我在我的 spark 上下文中添加了 jar...并解决了我的问题。

    sparkContext.addJar("/home/ngi/ManipulatorNew-0.0.1-SNAPSHOT-jar-with-dependencies.jar");
    

    【讨论】:

      猜你喜欢
      • 2023-03-11
      • 2023-03-11
      • 2014-05-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-12-13
      • 2018-09-18
      • 1970-01-01
      相关资源
      最近更新 更多