【问题标题】:Spark Skip Bad Records while reading CSVSpark 在读取 CSV 时跳过不良记录
【发布时间】:2019-11-29 19:19:02
【问题描述】:

我想将 .csv 文件中的加载数据读取到 Spark Dataframe 中,但我收到一条错误消息,很可能是由于输入错误。是否有可能以编程方式跳过坏行?

这是我的 scala 代码。

val df = session.read
      .option("header", "true")
      .option("delimiter", delimiter)
      .option("inferSchema", "true")
      .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
      .csv(csvFilePath)
    onData(df)

这是我从 Amazon EMR 获得的错误日志:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 61 in stage 1.0 failed 1 times, most recent failure: Lost task 61.0 in stage 1.0 (TID 62, localhost, executor driver): java.lang.NullPointerException
    at org.apache.spark.sql.execution.datasources.csv.CSVInferSchema$.org$apache$spark$sql$execution$datasources$csv$CSVInferSchema$$inferRowType(CSVInferSchema.scala:64)
    at org.apache.spark.sql.execution.datasources.csv.CSVInferSchema$$anonfun$2.apply(CSVInferSchema.scala:44)
    at org.apache.spark.sql.execution.datasources.csv.CSVInferSchema$$anonfun$2.apply(CSVInferSchema.scala:44)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:142)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:142)
    at scala.collection.Iterator$class.foreach(Iterator.scala:750)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1202)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:142)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1202)
    at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:199)
    at scala.collection.AbstractIterator.aggregate(Iterator.scala:1202)
    at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1113)
    at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1113)
    at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2125)
    at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2125)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

(提供整个堆栈跟踪会更有帮助吗?)

非常感谢!

数据来自带有 cmets 的 reddit 数据集。这是:https://www.kaggle.com/reddit/reddit-comments-may-2015 数据看起来是这样的(抱歉,它有 17 列,我认为这是向您展示这些行的最佳方式)https://imgur.com/a/vAE9ynF

【问题讨论】:

    标签: csv dataframe apache-spark amazon-emr


    【解决方案1】:

    使用 option("mode", "DROPMALFORMED") 跳过坏行。

    val df = session.read
              .option("header", "true")
              .option("delimiter", delimiter)
              .option("inferSchema", "true")
              .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
              .option("mode", "DROPMALFORMED")
              .csv(csvFilePath)
            onData(df)
    

    【讨论】:

    • 您的堆栈跟踪显示 InferSchema 中的 Nullpointer 异常。请检查标题是否存在并共享文件的示例记录。
    • 我做到了,更新了我的答案。确实,没有标题。但是当我将选项“header”设置为“false”时,会发生完全相同的错误
    • 请尝试使用示例小 CSV 文件。而不是一个大文件。此外,我可以看到行中的列数超过 17。请上传一个示例 CSV 文件,以便我们进行测试。检查它可能是一个多行问题,因为它包含一个大字符串。
    • 整个数据集为 17GB 的 CSV 文件。我将其采样到 100 万和 300 万行(300MB 和 900MB),完全没有错误。
    【解决方案2】:

    当您查看第 64 行的源代码 InferSchema.scala 时,它会尝试在 next 上调用 length,这就是您得到 Nullpointer 异常的地方。

    https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/util/InferSchema.scala

    我怀疑即使在删除格式错误的记录后数据也存在问题。也许您可以通过禁用推断模式来尝试(或者可能创建您自己的模式并传递给它)

    val df = session.read
          .option("header", "true")
          .option("delimiter", delimiter)
          .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
          .option("mode", "DROPMALFORMED")
          .csv(csvFilePath) 
    

    看了源码后的想法。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-02-26
      • 2020-07-20
      • 2023-03-23
      • 1970-01-01
      • 1970-01-01
      • 2014-06-30
      • 1970-01-01
      • 2016-10-06
      相关资源
      最近更新 更多