【问题标题】:How do you submit a job to Spark from code?如何从代码向 Spark 提交作业?
【发布时间】:2017-06-07 19:46:24
【问题描述】:

我已经启动了一个单节点独立 Spark 集群,并确认我的构建适用于 ./bin/run-example SparkPi 10。然后我在scala中写了一个非常简单的测试项目;

import org.apache.spark.{SparkConf, SparkContext}

object Main {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    val sc = new SparkContext("spark://UbuntuVM:7077", "Simple Application")

    val count = sc.parallelize(1 to 100).map{i =>
      val x = Math.random()
      val y = Math.random()
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / 100)
  }
}

我在我的 IDE (IntelliJ) 中运行它。成功连接集群,我看到它提交作业,但都抛出错误;

INFO TaskSetManager: Lost task 1.3 in stage 0.0 (TID 7) on executor 192.168.1.233: java.lang.ClassNotFoundException (Main$$anonfun$1) [duplicate 7]

如果我对 spark 的理解正确,这是因为集群找不到代码。那么我如何提供代码来激发火花呢?在这个测试中我没有运行 HDFS 或任何东西,但它都在一个盒子上,所以我希望 SparkContext 将当前目录传递给 spark,但它显然没有。

谁能指出正确的设置方法?

【问题讨论】:

  • 我实际上尝试了 sc.addJar(SparkContext.jarOfClass(this.getClass).get) ——但这当然失败了,因为没有 JAR .. IntelliJ 没有创建一个..跨度>
  • sbt-assembly 确实可以生成一个我可以提交的 JAR,但我仍然找不到让 intelliJ 中的“运行”功能运行的方法..
  • 在 IntelliJ 中为您的项目创建 jar 工件配置,然后打开您的运行配置并将“构建工件”添加到“启动前”部分。 sc.addJar 应该包含您的工件的完整路径。显然这是开发设置,不适合生产使用。
  • 所以也许我只是个盲人,但我看不到创建真正遵循 sbt-assembly 设置的构建工件的方法(例如,不包括 spark 的东西)?

标签: scala intellij-idea apache-spark


【解决方案1】:

如果您想在本地测试您的 Spark 程序,您甚至不需要启动单节点独立 Spark。只需像这样将您的主网址设置为local[*]

val sc = new SparkContext("local[*]", "Simple Application", sparkConf)

然后在 sbt 中,输入&gt; run 来运行你的程序(这应该和从 IntelliJ 运行一样,但我以前是使用 sbt 从终端运行程序)。

由于您可能不想在 local[*]spark://... 之间多次更改代码中的主 URL,因此您可以将它们留空

val sc = new SparkContext(new SparkConf())

并在运行时设置你的java属性,例如build.sbt,可以添加

javaOptions := Seq("-Dspark.master=local[*]", "-Dspark.app.name=my-app")

并在 sbt 中使用 run 运行它。


为了获得更全面的本地模式体验,您可能需要在 build.sbt 中添加以下行

run in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))
runMain in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))
fork := true
javaOptions := Seq("-Dspark.master=local[*]", s"-Dspark.app.name=my-app")
outputStrategy := Some(StdoutOutput)

我们创建了一个 sbt 插件,它可以为您添加这些设置,它还可以帮助您在 aws ec2 等云系统上部署独立的 Spark 集群,如果您有兴趣,请查看spark-deployer

【讨论】:

    【解决方案2】:

    你错过了一个关键步骤:

    org.apache.spark.deploy.SparkSubmit

    https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

    它实际上将作业提交到集群。不幸的是,除了spark-submit 之外,目前还没有一个可靠的工作包装器。因此,目前还没有一种可靠的方式来以编程方式提交 Spark 作业。有一个 jira 已在 2015 年 2 月部分解决:但它缺乏文档。

     https://github.com/apache/spark/pull/3916/files
    

    困难在于spark-submit 提供的环境机制的复杂性。还没有发现可以仅在 scala/java 代码中复制它们。

    【讨论】:

      【解决方案3】:

      这是类。

      https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/launcher/package-summary.html

          import org.apache.spark.launcher.SparkLauncher;
      
         public class MyLauncher {
           public static void main(String[] args) throws Exception {
             Process spark = new SparkLauncher()
               .setAppResource("/my/app.jar")
               .setMainClass("my.spark.app.Main")
               .setMaster("local")
               .setConf(SparkLauncher.DRIVER_MEMORY, "2g")
               .launch();
             spark.waitFor();
           }
         }
      

      【讨论】:

        猜你喜欢
        • 2016-04-13
        • 1970-01-01
        • 2019-05-30
        • 2015-09-25
        • 2018-11-13
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-11-19
        相关资源
        最近更新 更多