【发布时间】:2019-11-29 19:19:02
【问题描述】:
我想将 .csv 文件中的加载数据读取到 Spark Dataframe 中,但我收到一条错误消息,很可能是由于输入错误。是否有可能以编程方式跳过坏行?
这是我的 scala 代码。
val df = session.read
.option("header", "true")
.option("delimiter", delimiter)
.option("inferSchema", "true")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.csv(csvFilePath)
onData(df)
这是我从 Amazon EMR 获得的错误日志:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 61 in stage 1.0 failed 1 times, most recent failure: Lost task 61.0 in stage 1.0 (TID 62, localhost, executor driver): java.lang.NullPointerException
at org.apache.spark.sql.execution.datasources.csv.CSVInferSchema$.org$apache$spark$sql$execution$datasources$csv$CSVInferSchema$$inferRowType(CSVInferSchema.scala:64)
at org.apache.spark.sql.execution.datasources.csv.CSVInferSchema$$anonfun$2.apply(CSVInferSchema.scala:44)
at org.apache.spark.sql.execution.datasources.csv.CSVInferSchema$$anonfun$2.apply(CSVInferSchema.scala:44)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:142)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:142)
at scala.collection.Iterator$class.foreach(Iterator.scala:750)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1202)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:142)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1202)
at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:199)
at scala.collection.AbstractIterator.aggregate(Iterator.scala:1202)
at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1113)
at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1113)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2125)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2125)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
(提供整个堆栈跟踪会更有帮助吗?)
非常感谢!
数据来自带有 cmets 的 reddit 数据集。这是:https://www.kaggle.com/reddit/reddit-comments-may-2015 数据看起来是这样的(抱歉,它有 17 列,我认为这是向您展示这些行的最佳方式)https://imgur.com/a/vAE9ynF
【问题讨论】:
标签: csv dataframe apache-spark amazon-emr