【问题标题】:Spark write dataframe to vertica giving errorSpark将数据帧写入vertica给出错误
【发布时间】:2020-04-26 20:16:01
【问题描述】:

我尝试使用以下文档将数据帧写入 vertica:https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SparkConnector/WritingtoVerticaUsingDefaultSource.htm?tocpath=Integrating%20with%20Apache%20Spark%7CSaving%20an%20Apache%20Spark%20DataFrame%20to%20a%20Vertica%20Table%7C_____1 由 vertica 提供,并且它有效。 加载所需库后,数据框将写入表中。


现在,当我尝试在 Intellij 中执行相同的代码或不直接从 spark shell 编写代码时,会出现一些错误:

代码是:

val rows: RDD[Row] = sc.parallelize(Array(
      Row(1,"hello", true),
      Row(2,"goodbye", false)
    ))

    val schema = StructType(Array(
      StructField("id",IntegerType, false),
      StructField("sings",StringType,true),
      StructField("still_here",BooleanType,true)
    ))

    val spark = SparkSession.builder().config(conf).getOrCreate()

    val df = spark.createDataFrame(rows, schema) // Spark 2.0// View the sample data and schema
    df.show

    df.schema// Setup the user options, defaults are shown where applicable for optional values.


    // Replace the values in italics with the settings for your Vertica instance.
    val opts: Map[String, String] = Map(
      "table" -> "signs",
      "db" -> "dbadmin",
      "user" -> "dbadmin",
      "password" -> "password",
      "host" -> "localhost",
      "hdfs_url" -> "hdfs://localhost:9000/user",
      "web_hdfs_url" -> "webhdfs://localhost:9870/user",
      // "failed_rows_percent_tolerance"-> "0.00"   // OPTIONAL (default val shown)
      "dbschema" -> "public"                     // OPTIONAL (default val shown)
      // "port" -> "5433"                           // OPTIONAL (default val shown)
      // "strlen" -> "1024"                         // OPTIONAL (default val shown)
      // "fileformat" -> "orc"                      // OPTIONAL (default val shown)
    )// SaveMode can be either Overwrite, Append, ErrorIfExists, Ignore

    val mode = SaveMode.Append
    df
      .write
      .format("com.vertica.spark.datasource.DefaultSource")
      .options(opts)
      .mode(mode)
      .save()

这与文档中提供的相同。这个错误来了。 我已经设置了我的 hdfs 和 vertica。
问题是它是否在 spark shell 中按预期工作,为什么它不能在外部工作?

20/04/27 01:55:50 INFO S2V: Load by name. Column list: ("name","miles_per_gallon","cylinders","displacement","horsepower","weight_in_lbs","acceleration","year","origin")
20/04/27 01:55:50 INFO S2V: Writing intermediate data files to path: hdfs://localhost:9000/user/S2V_job2509086937642333836
20/04/27 01:55:50 ERROR S2VUtils: Unable to delete the HDFS path: hdfs://localhost:9000/user/S2V_job2509086937642333836
20/04/27 01:55:50 ERROR S2V: Failed to save DataFrame to Vertica table: second0.car with SaveMode: Append
20/04/27 01:55:50 ERROR JobScheduler: Error running job streaming job 1587932740000 ms.2
java.lang.Exception: S2V: FATAL ERROR for job S2V_job2509086937642333836. Job status information is available in the Vertica table second0.S2V_JOB_STATUS_USER_DBADMIN. Unable to create/insert into target table: second0.car with SaveMode: Append.  ERROR MESSAGE:  ERROR: java.lang.Exception: S2V: FATAL ERROR for job S2V_job2509086937642333836. Unable to save intermediate orc files to HDFS path: hdfs://localhost:9000/user/S2V_job2509086937642333836. Error message: The ORC data source must be used with Hive support enabled; 
    at com.vertica.spark.s2v.S2V.do2Stage(S2V.scala:446)
    at com.vertica.spark.s2v.S2V.save(S2V.scala:496)
    at com.vertica.spark.datasource.DefaultSource.createRelation(VerticaSource.scala:100)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:469)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
    at replica_nimble_spark.SparkVerticaHelper$$anonfun$applyPipeline$1$$anonfun$apply$3.apply(SparkVerticaHelper.scala:85)
    at replica_nimble_spark.SparkVerticaHelper$$anonfun$applyPipeline$1$$anonfun$apply$3.apply(SparkVerticaHelper.scala:76)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

【问题讨论】:

    标签: scala apache-spark vertica connector


    【解决方案1】:

    问题是它是否按 spark shell 的预期工作,为什么 它不是在外面工作吗?



    答案就是您的错误信息:

    Error message: The ORC data source must be used with Hive support enabled; 
        at com.vertica.spark.s2v.S2V.do2Stage(S2V.scala:446)
    

    意味着你必须像这个例子一样启用配置单元支持来修复这个错误

     val spark = SparkSession
        .builder()
        .appName("Mr potterpod wants to test spark hive support")
        .master("local[*]")
        .config("spark.sql.warehouse.dir", warehouseLocation)
        .enableHiveSupport() // this is what I was talking about
        .getOrCreate()
    

    为什么 spark-shell 能正常工作?

    答案:spark-shell 默认启用 Hive 支持大于或等于 Spark 2.0。

    证明: 要在没有任何选项的情况下测试默认性质 open spark-shell 然后执行此操作...

    scala> spark.sparkContext.getConf.get("spark.sql.catalogImplementation")
    res3: String = hive
    

    如果您想通过使用spark.sql.catalogImplementation 禁用spark-shell 中的配置单元支持来测试此功能

    此属性的选项是(in-memoryhive

    spark-shell --conf spark.sql.catalogImplementation=in-memory
    

    那么你也会在 spark-shell 中遇到同样的错误

    延伸阅读How to enable or disable Hive support in spark-shell through Spark property (Spark 1.6)?

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-10-22
    • 2018-08-13
    • 2018-09-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多