【问题标题】:Spark 2.0.1:scala: temporary view query failsSpark 2.0.1:scala:临时视图查询失败
【发布时间】:2016-11-16 05:09:12
【问题描述】:

我运行 spark 2.0.1 并在仅查询时遇到问题:

  1. 从原始 rdd 创建 rdd 行
  2. 从 rdd 创建架构
  3. 创建数据框

我又测试了一遍:

case class Person(name: String, age: Long)

val peopleDF = sparkSession.sparkContext
  .textFile("/home/raja/scala_code/text2.dat")
  .map(_.split("|"))
  .map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()

peopleDF.createOrReplaceTempView("people")

val teenagersDF = sparkSession.sql("SELECT * FROM people")

teenagersDF.show()

当我解雇sparkSession.sql("SELECT name FROM emp") 时,它给出了以下错误。

    { java.lang.RuntimeException: java.lang.ArrayIndexOutOfBoundsException: 1
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) AS name#0
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 0
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name)
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) AS age#1
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 1
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age)
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true) AS salary#2
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 2
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary)
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true) AS birthplace#3
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 0
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name)
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) AS age#1
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 1
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age)
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true) AS salary#2
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 2
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary)
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true) AS birthplace#3
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 3
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace)
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]

    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:279)
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537)
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
    at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:200)
    at org.apache.spark.sql.Row$class.isNullAt(Row.scala:185)
    at org.apache.spark.sql.catalyst.expressions.GenericRow.isNullAt(rows.scala:192)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_1$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:276)
    ... 20 more

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
  at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934)
  at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2576)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
  ... 64 elided
Caused by: java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 1
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) AS name#0
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 0
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name)
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) AS age#1
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 1
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age)
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true) AS salary#2
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 2
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary)
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true) AS birthplace#3
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 3
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace)
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]

  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:279)
  at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537)
  at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
  at org.apache.spark.scheduler.Task.run(Task.scala:86)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
  at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:200)
  at org.apache.spark.sql.Row$class.isNullAt(Row.scala:185)
  at org.apache.spark.sql.catalyst.expressions.GenericRow.isNullAt(rows.scala:192)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_1$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:276)
  ... 20 more}

【问题讨论】:

  • 请发布创建 emp 的代码 - Spark 懒惰地评估视图,因此仅在查询时抛出异常并不意味着错误在查询中,它可能来自创建视图的代码。
  • 我在这里添加两部分,在下面添加另一个 val sparkSession = SparkSession.builder.master("local").appName("example").getOrCreate() val sc = sparkSession.sparkContext import sparkSession .implicits._ //val emprdd = sc.textFile("/home/raja/scala_code/text2.dat").map(_.split("|")).map(p => Row(p(0) ,p(1))) val schemaString = "name 出生地" val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields)跨度>
  • val semprdd=emprdd.map(value => Row(value)) val empDF = sparkSession.createDataFrame(semprdd, schema) empDF.createOrReplaceTempView("emp") val results = sparkSession.sql(" SELECT name FROM emp") results.show()
  • edit您的问题与此信息,代码在 cmets 中不可读。
  • 我再次测试:case class Person(name: String, age: Long) val peopleDF = sparkSession.sparkContext.textFile("/home/raja/scala_code/text2.dat").map(_ .split("|")).map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF() peopleDF.createOrReplaceTempView("people") valteensDF = sparkSession.sql( "SELECT * FROM people")teensDF.show() 给出错误粘贴为 "java.lang.NumberFormatException: For input string"

标签: scala apache-spark view temporary


【解决方案1】:

首先,split("|") 不会像您期望的那样被管道分割,因为 split(s: String) 需要一个 正则表达式 作为输入,而管道是正则表达式中的一个特殊字符。在此处查看更多详细信息和解决方案:https://stackoverflow.com/a/40359414/5344058

如果修复后问题仍然存在(您的问题没有提供示例输入数据,所以我不能确定),异常 (java.lang.ArrayIndexOutOfBoundsException: 1) 非常具有指示性 - 您的代码假定 split("|") 的结果开启每条记录生成一个包含至少 两个 项的数组:

.map(_.split("|"))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
//                                                  ^
//                                                  |
// this will throw exception if input isn't valid --/

如果任何记录不符合此条件,您将看到此异常。

为避免这种情况 - 您可以选择几条路线。如果您只想跳过无效行,您可以使用 collect 而不是 map 与仅针对具有至少两个项目的数组定义的部分函数:

.map(_.split("\\|"))
.collect { case Array(a1, a2, _*) => Person(a1, a2.trim.toInt) }

此代码将过滤掉所有由split 生成的数组少于两条记录的记录。

【讨论】:

  • 好吧,我建议的代码适用于简单的管道分隔数据(我使用了RAJA|123|Pynursla),所以除非你提供一些它失败的示例数据(通过再次编辑问题)我可以'没有进一步的帮助。
  • 是的,我用正确的类型更改了类定义,映射也应该是准确的类型并且它可以工作。似乎类型转换需要做很多工作:)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2023-03-16
  • 2019-05-15
  • 1970-01-01
  • 2022-11-12
  • 2021-09-18
  • 1970-01-01
  • 2020-02-08
相关资源
最近更新 更多