【问题标题】:spark-submit not exiting even after process finishes successfully即使进程成功完成,spark-submit 也不会退出
【发布时间】:2018-04-18 19:29:53
【问题描述】:

我编写了一个简单的应用程序,它运行良好,但是当我通过 spark-submit 提交代码时,即使在调用 close() 之后,spark-submit 会话也没有完成,我需要终止 PID。

下面是代码sn-p

object FaultApp {

  case class Person(name: String, age: Long)

  def main(args: Array[String]):Unit = {  

val spark = SparkSession
  .builder
  .enableHiveSupport()
  .config("spark.scheduler.mode", "FAIR")
  .appName("parjobs")
  .getOrCreate()

import spark.implicits._

val pool = Executors.newFixedThreadPool(5)
 // create the implicit ExecutionContext based on our thread pool
implicit val xc = ExecutionContext.fromExecutorService(pool)

import Function._

val caseClass = Seq(Person("X", 32)
                      ,Person("Y", 37)
                      ,Person("Z", 37)
                      ,Person("A", 6)
                   )


val caseClassDS = caseClass.toDF()

val taskA = write_first(caseClassDS)

Await.result(Future.sequence(Seq(taskA)), Duration(1, MINUTES))

spark.stop()

println("After Spark Stop command")  
  }

}

object Function {
  def write_first (ds : DataFrame)(implicit xc: ExecutionContext)  = Future {
   Thread.sleep(10000)
   ds.write.format("orc").mode("overwrite")
     .option("compression", "zlib")
     .saveAsTable("save_1")
  }
}

我正在使用以下命令提交作业

spark-submit --master yarn --deploy-mode client fault_application-assembly-1.0-SNAPSHOT.jar --executor-memory 1G --executor-cores 2 --driver-memory 1G

以下是日志的最后几行

18/04/18 15:15:20 INFO SchedulerExtensionServices: Stopping 
SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false)
18/04/18 15:15:20 INFO YarnClientSchedulerBackend: Stopped
18/04/18 15:15:20 INFO MapOutputTrackerMasterEndpoint: 
  MapOutputTrackerMasterEndpoint stopped!
18/04/18 15:15:20 INFO MemoryStore: MemoryStore cleared
18/04/18 15:15:20 INFO BlockManager: BlockManager stopped
18/04/18 15:15:20 INFO BlockManagerMaster: BlockManagerMaster stopped
18/04/18 15:15:20 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/04/18 15:15:20 INFO SparkContext: Successfully stopped SparkContext
After Spark Stop command

任何帮助或建议将不胜感激。

【问题讨论】:

  • 你想在这里做什么 Await.result(Future.sequence(Seq(taskA)), Duration(1, MINUTES)) ?

标签: scala apache-spark spark-dataframe


【解决方案1】:

那是因为您正在使用线程池创建执行上下文,因此您的程序在该线程池关闭之前不会关闭。

spark.stop()之后,添加

xc.shutdown()
println("After shutdown.")

或者,您可以只使用全局上下文,而不是为您的未来创建新的执行上下文:

 implicit val executor =  scala.concurrent.ExecutionContext.global

【讨论】:

    猜你喜欢
    • 2023-04-04
    • 1970-01-01
    • 1970-01-01
    • 2021-01-17
    • 1970-01-01
    • 2020-11-28
    • 2023-03-11
    • 1970-01-01
    • 2020-05-04
    相关资源
    最近更新 更多