【发布时间】:2019-06-08 09:13:44
【问题描述】:
我需要将文本文件读入 Spark 中的数据集[T]。该文件格式不正确,因为它有一些空白字段,并且很难定义参数来分割字符串。我一直在尝试将数据读入 RDD,然后将其转换为案例类类型,但是,并非所有字段都被正确解析,并且出现错误:
java.lang.NumberFormatException: empty String
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1842)
at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
at java.lang.Double.parseDouble(Double.java:538)
at scala.collection.immutable.StringLike.toDouble(StringLike.scala:321)
at scala.collection.immutable.StringLike.toDouble$(StringLike.scala:321)
at scala.collection.immutable.StringOps.toDouble(StringOps.scala:33)
at captify.test.spark.Stats$$anonfun$2.apply(Stats.scala:53)
at captify.test.spark.Stats$$anonfun$2.apply(Stats.scala:53)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
如何正确处理此文件? 我的 .txt 文件看起来像这样(匿名随机数据但格式相同):
NEW50752085 84.0485 -76.3851 85.1 THE NAME OF AN OBJECT
DEM00752631 51.9581 -85.3315 98.5 THE NAME OF AN OBJECT
KI004867205 40.8518 15.9351 276.5 THE NAME OF AN OBJECT FHG 41196
我尝试过这样处理:
val dataRdd = spark.sparkContext
.textFile("file.txt")
val dataArray = dataRdd
.map(_.split(" "))
case class caseClass(
c1: String,
c2: Double,
c3: Double,
c4: Double,
c5: String,
c6: String,
c7: String
)
val df = dataArray
.map(record => (record(0), record(1).toDouble, record(2).toDouble, record(3).toDouble, record(4), record(5), record(6)))
.map{case (c1, c2, c3, c4, c5, c6, c7) => CaseClass(c1, c2, c3, c4, c5, c6, c7)
}.toDF()
【问题讨论】:
-
我认为你有标签作为分隔符。尝试改用
spark.read.format("csv").option("delimiter", "\t")。
标签: scala apache-spark text-files apache-spark-dataset