【发布时间】:2020-05-24 12:20:51
【问题描述】:
我有一个通过
读取类型安全配置的 jar object ConfigWordCount {
def main(args: Array[String]) {
// Load configuration into Settings class
val spark: SparkSession = SparkSession.builder()
.appName("Word Count")
.getOrCreate()
val conf: Config = ConfigFactory.load()
val settings: Settings = new Settings(conf)
import spark.implicits._
// Business logic
val document = spark.read.textFile(settings.inputFile)
val result = document.flatMap(_.split(" "))
.filter(not(lower(col("value")).isin(settings.stopWords: _*)))
.groupBy("value")
.count()
.filter(col("count") >= settings.minCount)
.orderBy(desc("count"))
result.coalesce(1).write.csv(settings.outputFile)
spark.stop()
}
}
我想在 spark 独立集群模式下运行我的工作并从 spark-submit 传递 application.conf。
到目前为止,我已经尝试过客户端和集群模式,并且
客户
spark-submit --master spark://xx.xx.xx.xx:6066 --deploy-mode client [...] --files application.conf --driver-java-options
-Dconfig.file=application.conf myApplication.jar
集群
spark-submit --master spark://xx.xx.xx.xx:6066 --deploy-mode cluster [...] --files application.conf --conf spark.driver.extraJavaOptions=-Dconfig.file=application.conf
--conf spark.executor.extraJavaOptions=-Dconfig.file=application.conf s3a://bucket/myApplication.jar
代码在客户端模式下工作正常并加载了配置,但在集群模式它在其中一个工作人员上启动并因错误而失败
Caused by: java.io.FileNotFoundException: File file:/home/centos/application.conf does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
at org.apache.spark.SparkContext.addFile(SparkContext.scala:1544)
at org.apache.spark.SparkContext.addFile(SparkContext.scala:1508)
at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:462)
at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:462)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:462)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:935)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:926)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
【问题讨论】:
-
你能发布你的代码吗,你如何以及在哪里加载配置文件?你也可以发布完整的例外吗??
-
@Srinivas 更新
标签: scala apache-spark