【发布时间】:2020-07-04 03:21:57
【问题描述】:
我正在使用 spark-submit 运行一个胖 jar 文件(使用 sbt 程序集生成)。 这是 scala 文件的主要内容:
package antarctic
import antarctic.comparison.TablesComparison
import antarctic.utils.Utils.withSpark
import antarctic.inputs.Inputs
import antarctic.outputs.{ValOutput, ComparisonOutput}
import antarctic.parser.Parser
import antarctic.profiling.Profiling
import com.amazon.deequ.VerificationSuite
import org.apache.spark.sql.SparkSession
object DataQuality {
def main(args: Array[String]): Unit = {
val filename = args(0)
val parsedConfigs = new Parser(filename)
val processType = args(1)
val processID = args(2)
val confFilePath = args(3)
withSpark { spark =>
//Se ejecuta el proceso definido en args(0).
//En ambos se ejecuta Data Profiling.
if (processType == "comparison"){
runProfiling(parsedConfigs, spark, processID, confFilePath, filename)
runTablesComparison(parsedConfigs, spark, processID, confFilePath)
} else if(processType == "validations"){
runProfiling(parsedConfigs, spark, processID, confFilePath, filename)
runValidations(parsedConfigs, spark, processID, confFilePath)
}
}
}
def runValidations(parsedConfigs: Parser, spark: SparkSession, processID: String, confFilePath: String): Unit = {
val inputParams = parsedConfigs.input(parsedConfigs.getValidationInputId)
if (inputParams.url != "" && inputParams.driver != ""
&& inputParams.query != "" && inputParams.user != "" && inputParams.fetchSize > 0) {
val inputData = new Inputs(spark).readJDBC(inputParams)
val parsedJSON = parsedConfigs.validations(processID, inputData)
val verificationResult = VerificationSuite()
.onData(inputData)
.addChecks(parsedJSON.checks)
.run()
ValOutput.store(spark, verificationResult, confFilePath)
ValOutput.print(verificationResult)
}
}
def runProfiling(parsedConfigs: Parser, spark: SparkSession, processID: String, confFilePath: String, filename: String): Unit = {
val inputsList = parsedConfigs.inputsList
inputsList.foreach(inputID => {
new Profiling(filename, inputID, processID, spark, confFilePath).exportToDB()}
)
}
def runTablesComparison(parsedConfigs: Parser, spark: SparkSession, processID: String, confFilePath: String): Unit = {
val parsedConfig = parsedConfigs.tablesComparison()
val comparisonType = parsedConfig._1
val table1Conf = parsedConfig._2
val table2Conf = parsedConfig._3
val table1InputId = table1Conf._1
val table2InputId = table2Conf._1
val table1Column = table1Conf._2
val table2Column = table1Conf._2
if (table1InputId != "" && table2InputId != "" && table1Column != "" && table2Column != "") {
val inputConfig1 = parsedConfigs.input(table1InputId)
val inputConfig2 = parsedConfigs.input(table2InputId)
val input = new Inputs(spark)
val tablesComparison = new TablesComparison(
(table1InputId, table1Column, input.readJDBC(inputConfig1)),
(table2InputId, table2Column, input.readJDBC(inputConfig2)),
spark: SparkSession
)
val comparisonResults = tablesComparison.run(comparisonType)
val keys = comparisonResults.keys.toSeq
if (keys.contains("areEquals")){
ComparisonOutput.storeAreEquals(comparisonResults("areEquals"), confFilePath)
}
if (keys.contains("countDistinct")){
ComparisonOutput.storeCountDistinct(comparisonResults("countDistinct"), confFilePath)
}
if (keys.contains("count")){
ComparisonOutput.storeCount(comparisonResults("count"), confFilePath)
}
}
}
}
这是我运行的命令:
spark-submit --class antarctic.DataQuality --master local[*] --deploy-mode client --jars "path\to\jar\DataQuality.jar" "path\to\json\mongoDocs.json" "validations" "process-id-1" "path\to\conf\application.conf"
这是我得到的跟踪:
Exception in thread "main" java.io.FileNotFoundException: validations (El sistema no puede encontrar el archivo especificado) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.<init>(FileInputStream.java:138) at scala.io.Source$.fromFile(Source.scala:94) at scala.io.Source$.fromFile(Source.scala:79) at scala.io.Source$.fromFile(Source.scala:57) at antarctic.parser.Parser.fileToText(Parser.scala:23) at antarctic.parser.Parser.<init>(Parser.scala:13) at antarctic.DataQuality$.main(DataQuality.scala:22) at antarctic.DataQuality.main(DataQuality.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 20/07/02 11:23:00 INFO ShutdownHookManager: Shutdown hook called 20/07/02 11:23:00 INFO ShutdownHookManager: Deleting directory C:\Users\AKAINIX分析\AppData\Local\Temp\spark-7da12975-bd26-4728-a1e0-941252d9ab86
文件的路径很好,我认为这是 Scala 接受参数的方式。
【问题讨论】:
-
我认为双引号不是必需的。尝试不使用它们。
标签: scala apache-spark spark-submit