【问题标题】:Spark Launcher waiting for job completion infinitelySpark Launcher 无限等待作业完成
【发布时间】:2015-10-23 14:44:41
【问题描述】:

我正在尝试将带有 Spark 作业的 JAR 从 Java 代码提交到 YARN 集群。我正在使用 SparkLauncher 提交 SparkPi 示例:

Process spark = new SparkLauncher()
    .setAppResource("C:\\spark-1.4.1-bin-hadoop2.6\\lib\\spark-examples-1.4.1-hadoop2.6.0.jar")
    .setMainClass("org.apache.spark.examples.SparkPi")
    .setMaster("yarn-cluster")
    .launch();
System.out.println("Waiting for finish...");
int exitCode = spark.waitFor();
System.out.println("Finished! Exit code:" + exitCode);

有两个问题:

  1. 在“yarn-cluster”模式下提交时,应用程序成功提交到 YARN 并成功执行(在 YARN UI 中可见,报告为 SUCCESS,输出中打印 pi)。但是,提交应用程序永远不会被通知处理完成 - 在打印“Waiting to finish...”后它会无限挂起容器的日志可以找到here
  2. 在“yarn-client”模式下提交时,应用程序不会出现在 YARN UI 中,并且提交应用程序挂起在“Waiting to finish...” 当挂起代码被杀死时,应用程序显示在 YARN UI 中并且它报告为SUCCESS,但输出为空(pi没有打印出来)。容器的日志可以找到here

我尝试使用 Oracle Java 7 和 8 执行提交应用程序。

【问题讨论】:

    标签: java apache-spark hadoop-yarn spark-launcher


    【解决方案1】:

    我在 Spark 邮件列表中获得了帮助。关键是在Process上读取/清除getInputStream和getErrorStream()。子进程可能会填满缓冲区并导致死锁 - 请参阅Oracle docs regarding Process。流应该在单独的线程中读取:

    Process spark = new SparkLauncher()
        .setSparkHome("C:\\spark-1.4.1-bin-hadoop2.6")
        .setAppResource("C:\\spark-1.4.1-bin-hadoop2.6\\lib\\spark-examples-1.4.1-hadoop2.6.0.jar")
        .setMainClass("org.apache.spark.examples.SparkPi").setMaster("yarn-cluster").launch();
    
    InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(spark.getInputStream(), "input");
    Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
    inputThread.start();
    
    InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(spark.getErrorStream(), "error");
    Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
    errorThread.start();
    
    System.out.println("Waiting for finish...");
    int exitCode = spark.waitFor();
    System.out.println("Finished! Exit code:" + exitCode);
    

    InputStreamReaderRunnable 类在哪里:

    public class InputStreamReaderRunnable implements Runnable {
    
        private BufferedReader reader;
    
        private String name;
    
        public InputStreamReaderRunnable(InputStream is, String name) {
            this.reader = new BufferedReader(new InputStreamReader(is));
            this.name = name;
        }
    
        public void run() {
            System.out.println("InputStream " + name + ":");
            try {
                String line = reader.readLine();
                while (line != null) {
                    System.out.println(line);
                    line = reader.readLine();
                }
                reader.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    【讨论】:

    • 就我而言,我遇到了类路径问题,因此火花立即退出。因此,如果在其他人看来它根本没有调用您的 spark 应用程序,那么这个答案也可以。
    【解决方案2】:

    由于这是一篇旧帖子,我想添加一个更新,以帮助以后阅读此帖子的人。在 Spark 1.6.0 中,SparkLauncher 类中添加了一些功能。即:

    def startApplication(listeners: <repeated...>[Listener]): SparkAppHandle
    

    http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.launcher.SparkLauncher

    您可以运行应用程序而无需额外的线程来处理 stdout 和 stderr 处理毛绒绒有一个很好的应用程序运行状态报告。使用此代码:

      val env = Map(
          "HADOOP_CONF_DIR" -> hadoopConfDir,
          "YARN_CONF_DIR" -> yarnConfDir
        )
      val handler = new SparkLauncher(env.asJava)
          .setSparkHome(sparkHome)
          .setAppResource("Jar/location/.jar")
          .setMainClass("path.to.the.main.class")
          .setMaster("yarn-client")
          .setConf("spark.app.id", "AppID if you have one")
          .setConf("spark.driver.memory", "8g")
          .setConf("spark.akka.frameSize", "200")
          .setConf("spark.executor.memory", "2g")
          .setConf("spark.executor.instances", "32")
          .setConf("spark.executor.cores", "32")
          .setConf("spark.default.parallelism", "100")
          .setConf("spark.driver.allowMultipleContexts","true")
          .setVerbose(true)
          .startApplication()
    println(handle.getAppId)
    println(handle.getState)
    

    如果 spark 应用程序成功,您可以继续查询状态。 有关 Spark Launcher 服务器在 1.6.0 中如何工作的信息。看到这个链接: https://github.com/apache/spark/blob/v1.6.0/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java

    【讨论】:

    • 我想强调这仅在客户端模式下有效。
    • @msemelman 非常感谢您的澄清,我被困在这上面了。你是怎么知道这个事实的?
    • 也可以在集群模式下工作。我正在使用 Spark-1.6.1
    • 我错了,但是根据 SparkLauncher 中的隐藏文档:github.com/apache/spark/blob/master/launcher/src/main/java/org/…(如答案所示)“启动器服务器仅在本地主机上侦听...” i> 因此,只有当驱动程序恰好与您所在的主机位于同一主机时,您才能启动。
    • 有时您可能还需要 Thread.sleep(10000L) 来查看您的工作状态。由于此代码不等待 spark-submit 完成。我有下面的代码让它工作:while(!spark.getState.isFinal){ println(spark.getAppId) Thread.sleep(10000L) }
    【解决方案3】:

    我使用 CountDownLatch 实现,它按预期工作。 这适用于 SparkLauncher 版本 2.0.1,它也适用于 Yarn-cluster 模式。

        ...
    final CountDownLatch countDownLatch = new CountDownLatch(1);
    SparkAppListener sparkAppListener = new SparkAppListener(countDownLatch);
    SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener);
    Thread sparkAppListenerThread = new Thread(sparkAppListener);
    sparkAppListenerThread.start();
    long timeout = 120;
    countDownLatch.await(timeout, TimeUnit.SECONDS);    
        ...
    
    private static class SparkAppListener implements SparkAppHandle.Listener, Runnable {
        private static final Log log = LogFactory.getLog(SparkAppListener.class);
        private final CountDownLatch countDownLatch;
        public SparkAppListener(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }
        @Override
        public void stateChanged(SparkAppHandle handle) {
            String sparkAppId = handle.getAppId();
            State appState = handle.getState();
            if (sparkAppId != null) {
                log.info("Spark job with app id: " + sparkAppId + ",\t State changed to: " + appState + " - "
                        + SPARK_STATE_MSG.get(appState));
            } else {
                log.info("Spark job's state changed to: " + appState + " - " + SPARK_STATE_MSG.get(appState));
            }
            if (appState != null && appState.isFinal()) {
                countDownLatch.countDown();
            }
        }
        @Override
        public void infoChanged(SparkAppHandle handle) {}
        @Override
        public void run() {}
    }
    

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-09-02
    • 2015-04-17
    • 2020-01-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-09-02
    • 1970-01-01
    相关资源
    最近更新 更多