【问题标题】:Integrating spark and spring boot集成火花和弹簧靴
【发布时间】:2024-05-18 13:40:02
【问题描述】:

在与记录器依赖关系斗争之后,我终于成功地使用通常的“java -jar”命令启动了 Spring Boot 应用程序。

在应用程序中有一个 REST 服务,它使用 Spark 从 Oracle 和 MongoDB 中提取数据。 当我调用这个 REST 服务时,我得到了这个异常:

Driver stacktrace:
Job 0 failed: treeAggregate at MongoInferSchema.scala:80, took 0.233175 s
Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 172.16.212.49, executor 0): java.lang.ClassNotFoundException: com.mongodb.spark.rdd.partitioner.MongoPartition
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1866)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1749)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2040)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:] with root cause
java.lang.ClassNotFoundException: com.mongodb.spark.rdd.partitioner.MongoPartition
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1866)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1749)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2040)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Closing MongoClient: [127.0.0.1:27017]

pom.xml 包含 mongodb 依赖项:

<dependency>
    <groupId>org.mongodb.spark</groupId>
    <artifactId>mongo-spark-connector_2.11</artifactId>
    <version>2.3.0</version>
</dependency>

并且编译后的 Jar 包含 mongodb 库:

....
825351 Mon Jul 30 14:42:22 CEST 2018 BOOT-INF/lib/mongo-spark-connector_2.11-2.3.0.jar
1897919 Mon May 28 23:33:28 CEST 2018 BOOT-INF/lib/mongo-java-driver-3.6.4.jar
....

我也尝试在类路径中添加库,但没有结果。

有人知道如何让 Spark 看到它需要的罐子吗?

编辑:

根据@Ramdev 的建议,我将这部分代码添加到我的代码中:

JavaSparkContext context = new JavaSparkContext(sparkSession.sparkContext());
    context.addJar("/home/user/.m3/repository/org/mongodb/spark/mongo-spark-connector_2.11/2.3.0/mongo-spark-connector_2.11-2.3.0.jar");
    context.addJar("/home/user/.m3/repository/org/mongodb/mongo-java-driver/3.8.1/mongo-java-driver-3.8.1.jar");

结果是 Spark 现在可以看到 jar,但似乎与应用程序 jar 中的冲突:

018-09-25 11:39:51 ERROR [dispatcherServlet]:182 - Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Handler dispatch failed; nested exception is java.lang.NoSuchMethodError: com.mongodb.client.MongoCollection.countDocuments(Lorg/bson/conversions/Bson;)J] with root cause
java.lang.NoSuchMethodError: com.mongodb.client.MongoCollection.countDocuments(Lorg/bson/conversions/Bson;)J
        at com.mongodb.spark.rdd.partitioner.MongoSamplePartitioner$$anonfun$7.apply(MongoSamplePartitioner.scala:88)
        at com.mongodb.spark.rdd.partitioner.MongoSamplePartitioner$$anonfun$7.apply(MongoSamplePartitioner.scala:88)
        at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:186)
        at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:184)
        at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
        at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
        at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154)
        at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171)
        at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:184)
        at com.mongodb.spark.rdd.partitioner.MongoSamplePartitioner.partitions(MongoSamplePartitioner.scala:88)
        at com.mongodb.spark.rdd.partitioner.DefaultMongoPartitioner.partitions(DefaultMongoPartitioner.scala:34)
        at com.mongodb.spark.rdd.MongoRDD.getPartitions(MongoRDD.scala:139)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:318)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
        at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.InputAdapter.doExecute(WholeStageCodegenExec.scala:363)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.joins.SortMergeJoinExec.inputRDDs(SortMergeJoinExec.scala:386)
        at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
        at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
        at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
        at org.apache.spark.sql.execution.BaseLimitExec$class.inputRDDs(limit.scala:62)
        at org.apache.spark.sql.execution.LocalLimitExec.inputRDDs(limit.scala:97)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337)
        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3273)
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:723)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:682)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:691)
        at my.app.common.spark.SparkSpringBootHandler.querySpark(SparkSpringBootHandler.java:92)

SparkSpringBootHandler.java:85-92 行

 String queryJ = "select count(s.idlocalizator) "
                + "from installationOnBoard i join storicpos s on s.installation_uuid = i.uuid ";

 result += sdf.format(new Date()) + " - ***************** QUERY ***************** Start...\n";

 Dataset<Long> counter = sparkSession.sql(queryJ).as(Encoders.LONG());

 counter.show();

【问题讨论】:

  • 你能检查一下 com.mongodb.spark.rdd.partitioner.MongoPartition 类是 build-jars 的一部分吗?
  • 是的,它属于 mongo-spark-connector_2.11-2.3.0.jar 并且是应用程序 jar 的一部分

标签: java spring-boot apache-spark


【解决方案1】:

我不确定您是如何集成 Spark 作业和 Spring Boot 的。我根据我在一个项目中所做的工作来分享我的观点。

我们有一个单独的 Spark/Scala 项目,并使用 sbt 程序集构建了一个包含所有依赖项的胖 jar。

在 Spring Boot 项目方面,我们使用 Apache Livy API 调用 Spark 作业,并使用 Apache Livy 生成的批处理 ID 跟踪作业状态。

Apache Livy 适用于 Spark 1.x 和 Spark 2.x

https://livy.incubator.apache.org/docs/latest/rest-api.html

我希望它可以在某些方面有所帮助。

【讨论】:

  • 为了让您了解我如何在 Spring Boot 应用程序中使用 Spark,您可以查看此示例 github.com/Zhuinden/spring-spark-example/tree/master/src/main/…。我按照他的步骤进行操作,但我没有使用文件连接到两个数据库。
  • 启动的 Spark 作业可能没有从 Spring Boot jar 中获取依赖 jar。您可以尝试使用 Spark 文档中的这个示例来提供 Spark 作业启动器中的依赖项。 spark.apache.org/docs/2.1.1/api/java/index.html?org/apache/…
  • 我听从了你的建议,并用新的例外编辑了我的问题
  • 是否有任何具体原因不能在您的 pom 中使用 3.8.1 版本 org.mongodbmongo-java-driver3.8.1
  • 我没有看到 maven 包含旧版本的 mongo 驱动程序!谢谢!!我需要将我的 pom 依赖项更改为 org.mongodb.sparkmongo-spark-connector_2.11org.mongodbmongo-java-driver2.3.0 然后添加您的建议。这样我也可以删除 JavaSparkContext addJar 行。现在它完美地工作了:)