【问题标题】:ClassNotFoundException spark-submit scalaClassNotFoundException spark-submit scala
【发布时间】:2016-09-04 22:31:56
【问题描述】:

您好,我正在尝试生成 Salt Examples 的输出,但没有使用文档中提到的 docker。我找到了有助于生成Main.scala 的输出的scala 代码。我把 Main.scala 修改成了一个方便的,

package BinExTest
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row

import software.uncharted.salt.core.projection.numeric._
import software.uncharted.salt.core.generation.request._
import software.uncharted.salt.core.generation.Series
import software.uncharted.salt.core.generation.TileGenerator
import software.uncharted.salt.core.generation.output.SeriesData
import software.uncharted.salt.core.analytic.numeric._

import java.io._

import scala.util.parsing.json.JSONObject

object Main {

  // Defines the tile size in both x and y bin dimensions
  val tileSize = 256

  // Defines the output layer name
  val layerName = "pickups"

  // Creates and returns an Array of Double values encoded as 64bit Integers
  def createByteBuffer(tile: SeriesData[(Int, Int, Int), (Int, Int), Double, (Double, Double)]): Array[Byte] = {
    val byteArray = new Array[Byte](tileSize * tileSize * 8)
    var j = 0
    tile.bins.foreach(b => {
      val data = java.lang.Double.doubleToLongBits(b)
      for (i <- 0 to 7) {
        byteArray(j) = ((data >> (i * 8)) & 0xff).asInstanceOf[Byte]
        j += 1
      }
    })
    byteArray
  }

  def main(args: Array[String]): Unit = {

    val jarFile = "/home/kesava/Studies/BinExTest/BinExTest.jar"; 
    val inputPath = "/home/kesava/Downloads/taxi_micro.csv"
    val outputPath = "/home/kesava/SoftWares/salt/salt-examples/bin-example/Output"

    val conf = new SparkConf().setAppName("salt-bin-example").setJars(Array(jarFile))
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    sqlContext.read.format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load(s"file://$inputPath")
      .registerTempTable("taxi_micro")

    // Construct an RDD of Rows containing only the fields we need. Cache the result
    val input = sqlContext.sql("select pickup_lon, pickup_lat from taxi_micro")
      .rdd.cache()

    // Given an input row, return pickup longitude, latitude as a tuple
    val pickupExtractor = (r: Row) => {
      if (r.isNullAt(0) || r.isNullAt(1)) {
        None
      } else {
        Some((r.getDouble(0), r.getDouble(1)))
      }
    }

    // Tile Generator object, which houses the generation logic
    val gen = TileGenerator(sc)

    // Break levels into batches. Process several higher levels at once because the
    // number of tile outputs is quite low. Lower levels done individually due to high tile counts.
    val levelBatches = List(List(0, 1, 2, 3, 4, 5, 6, 7, 8), List(9, 10, 11), List(12), List(13), List(14))

    // Iterate over sets of levels to generate.
    val levelMeta = levelBatches.map(level => {

      println("------------------------------")
      println(s"Generating level $level")
      println("------------------------------")

      // Construct the definition of the tiling jobs: pickups
      val pickups = new Series((tileSize - 1, tileSize - 1),
        pickupExtractor,
        new MercatorProjection(level),
        (r: Row) => Some(1),
        CountAggregator,
        Some(MinMaxAggregator))

      // Create a request for all tiles on these levels, generate
      val request = new TileLevelRequest(level, (coord: (Int, Int, Int)) => coord._1)
      val rdd = gen.generate(input, pickups, request)

      // Translate RDD of Tiles to RDD of (coordinate,byte array), collect to master for serialization
      val output = rdd
        .map(s => pickups(s).get)
        .map(tile => {
          // Return tuples of tile coordinate, byte array
          (tile.coords, createByteBuffer(tile))
        })
        .collect()

      // Save byte files to local filesystem
      output.foreach(tile => {
        val coord = tile._1
        val byteArray = tile._2
        val limit = (1 << coord._1) - 1
        // Use standard TMS path structure and file naming
        val file = new File(s"$outputPath/$layerName/${coord._1}/${coord._2}/${limit - coord._3}.bins")
        file.getParentFile.mkdirs()
        val output = new FileOutputStream(file)
        output.write(byteArray)
        output.close()
      })

      // Create map from each level to min / max values.
      rdd
        .map(s => pickups(s).get)
        .map(t => (t.coords._1.toString, t.tileMeta.get))
        .reduceByKey((l, r) => {
          (Math.min(l._1, r._1), Math.max(l._2, r._2))
        })
        .mapValues(minMax => {
          JSONObject(Map(
            "min" -> minMax._1,
            "max" -> minMax._2
          ))
        })
        .collect()
        .toMap
    })

    // Flatten array of maps into a single map
    val levelInfoJSON = JSONObject(levelMeta.reduce(_ ++ _)).toString()
    // Save level metadata to filesystem
    val pw = new PrintWriter(s"$outputPath/$layerName/meta.json")
    pw.write(levelInfoJSON)
    pw.close()

  }
}

我为这个 scala 创建了一个单独的文件夹,其中包含另一个名为 lib 的文件夹,其中包含所需的 jars,我使用 scalac 编译它,如下所示,

scalac -cp "lib/salt.jar:lib/spark.jar" Main.scala

这成功运行并在文件夹 BinExTest 下生成了类。

现在,该项目的 build.gradle 具有以下代码行,用于确定这是有助于生成输出数据集的命令,

task run(overwrite: true, type: Exec, dependsOn: [assemble]) {
  executable = 'spark-submit'
  args = ["--class","software.uncharted.salt.examples.bin.Main","/opt/salt/build/libs/salt-bin-example-${version}.jar", "/opt/data/taxi_one_day.csv", "/opt/output"]
}

看到这里,我做了如下命令,

spark-submit --class BinExTest.Main lib/salt.jar

当我这样做时,我收到以下错误,

java.lang.ClassNotFoundException: Main.BinExTest at java.net.URLClassLoader$1.run(URLClassLoader.java:366) 在 java.net.URLClassLoader$1.run(URLClassLoader.java:355) 在 java.security.AccessController.doPrivileged(Native Method) 在 java.net.URLClassLoader.findClass(URLClassLoader.java:354) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:425) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:358) 在 java.lang.Class.forName0(本机方法)在 java.lang.Class.forName(Class.java:278) 在 org.apache.spark.util.Utils$.classForName(Utils.scala:174) 在 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689) 在 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 在 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 在 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 在 org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

有人可以帮我解决这个问题吗?我对此完全陌生,仅仅通过探索就走到了这一步。

[更新 1]

采纳YoYo的建议,

spark-submit --class BinExTest.Main --jars "BinExTest.jar" "lib/salt.jar"

我得到了 ClassNotFoundException 没有生成新的错误,如下所示,

线程“主”org.apache.spark.SparkException 中的异常:作业 由于阶段失败而中止:阶段 3.0 中的任务 1 失败 1 次,大多数 最近失败:在阶段 3.0 中丢失任务 1.0(TID 6,本地主机): java.lang.NoSuchMethodError: scala.runtime.IntRef.create(I)Lscala/runtime/IntRef;在 BinExTest.Main$.createByteBuffer(Main.scala:29) 在 BinExTest.Main$$anonfun$2$$anonfun$6.apply(Main.scala:101) 在 BinExTest.Main$$anonfun$2$$anonfun$6.apply(Main.scala:99) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.Iterator$class.foreach(Ite​​rator.scala:727) 在 scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1157) 在 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 在 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:27​​3) 在 scala.collection.AbstractIterator.to(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 在 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 在 scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 在 org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) 在 org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 在 org.apache.spark.scheduler.Task.run(Task.scala:89) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 在 java.lang.Thread.run(Thread.java:745)

知道发生了什么吗?

[更新 2]

使用 Scala2.11 支持从源代码构建 Spark 解决了我之前的问题。但是我遇到了一个新错误,

6/05/10 18:39:15 错误 TaskSetManager: 阶段 2.0 中的任务 0 失败 1 次;线程“main”中的中止作业异常 org.apache.spark.SparkException:作业因阶段失败而中止: 阶段 2.0 中的任务 0 失败 1 次,最近一次失败:丢失任务 0.0 在 2.0 阶段(TID 3,本地主机):java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class at software.uncharted.salt.core.util.SparseArray.(SparseArray.scala:37) 在 software.uncharted.salt.core.util.SparseArray.(SparseArray.scala:57) 在 software.uncharted.salt.core.generation.rdd.RDDSeriesWrapper.makeBins(RDDTileGenerator.scala:224) 在 software.uncharted.salt.core.generation.rdd.RDDTileGeneratorCombiner.createCombiner(RDDTileGenerator.scala:128) 在 software.uncharted.salt.core.generation.rdd.RDDTileGenerator$$anonfun$3.apply(RDDTileGenerator.scala:100) 在 software.uncharted.salt.core.generation.rdd.RDDTileGenerator$$anonfun$3.apply(RDDTileGenerator.scala:100) 在 org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:187) 在 org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:186) 在 org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:148) 在 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) 在 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192) 在 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) 在 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 在 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 在 org.apache.spark.scheduler.Task.run(Task.scala:89) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 在 java.lang.Thread.run(Thread.java:745) 引起: java.lang.ClassNotFoundException: scala.collection.GenTraversableOnce$class 在 java.net.URLClassLoader$1.run(URLClassLoader.java:366) 在 java.net.URLClassLoader$1.run(URLClassLoader.java:355) 在 java.security.AccessController.doPrivileged(Native Method) 在 java.net.URLClassLoader.findClass(URLClassLoader.java:354) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:425) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:358)

这是因为 scala2.11 没有提到的类吗?

[最终更新]

将 scala2.10 添加到 spark-submit 就可以了。

spark-submit --class "BinExTest.Main" --jars "BinExTest.jar,lib/scala210.jar" "lib/salt.jar"

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    要运行 Spark 作业,它需要在组成 Spark 集群的不同节点上自我复制其代码。它通过将 jar 文件复制到其他节点来实现这一点。

    这意味着您需要确保您的类文件打包在 .jar 文件中。在我的典型解决方案中,我会构建一个 Uber jar,将类文件和依赖的 jar 文件打包在一个 .jar 文件中。为此,我使用Maven Shade plugin。这不一定是您的解决方案,但至少您应该从生成的类中构建一个 .jar 文件。

    要手动提供额外的 jar 文件 - 您需要使用 --jars 选项添加它们,该选项需要一个逗号分隔的列表。

    更新 1

    实际上,即使对我来说,对所有可用选项也有很多困惑,特别是 jar 文件及其分布方式,或者修改 spark 中的类路径。 See another topic I just posted.

    更新 2

    关于您问题的第二部分,another thread 已经回答。

    【讨论】:

    • 谢谢@YoYo。那行得通。你真棒。 spark-submit --class BinExTest.Main --jars "BinExTest.jar" "lib/salt.jar" 这会产生另一个错误,java.lang.NoSuchMethodError: scala. runtime.IntRef.create(I)Lscala/runtime/IntRef; 知道为什么会这样吗?您可以查看我的完整错误答案更新。谢谢。
    • 你的那部分问题已经回答了somewhere else
    • 谢谢YoYo。然而,这并没有多大帮助。它说要重新构建 scala,因为最新版本的 scala 只有 create 功能。我想这里的 scala 类是从 spark-assembly jar 文件中获取的。知道如何更新 spark-assembly jar 文件中的 scala 吗?再次感谢。
    • YoYo,正如我所提到的,问题出在 spark jar 上。我用 scala2.11 再次构建了 spark 并且成功了。然而得到了新的错误。我更新了问题。你能帮帮我吗?
    • 问题已解决 YoYo。谢谢你。你可以看到我有问题的更新。
    猜你喜欢
    • 2021-08-15
    • 2019-09-08
    • 2019-03-01
    • 2018-05-19
    • 2019-03-10
    • 2017-08-26
    • 2017-12-25
    • 2020-04-24
    • 1970-01-01
    相关资源
    最近更新 更多