【问题标题】:Spark S3 CSV read returns org.apache.hadoop.mapred.InvalidInputExceptionSpark S3 CSV 读取返回 org.apache.hadoop.mapred.InvalidInputException
【发布时间】:2017-06-21 07:07:12
【问题描述】:

我在这里看到了几篇帖子,并在 Google 搜索 org.apache.hadoop.mapred.InvalidInputException 但大多数处理 HDFS 文件或捕获错误。我的问题是,虽然我可以从 spark-shell 读取 CSV 文件,但从已编译的 JAR 运行它会不断返回 org.apache.hadoop.mapred.InvalidInputException 错误。

罐子的大致流程:
1. 在 S3 中读取 JSON 文档(可行)
2. 从 S3 中的 parquet 文件中读取(这也成功)
3. 将针对 #1 和 #2 的查询结果写入 S3 中的 parquet 文件(也成功)
4. 从写入 #3 的同一存储桶中读取配置 csv 文件。 (这失败了)

这些是我在代码中尝试过的各种方法:

1. val osRDD = spark.read.option("header","true").csv("s3://bucket/path/")
2. val osRDD = spark.read.format("com.databricks.spark.csv").option("header", "true").load("s3://bucket/path/")

上面两个带有 s3、s3a 和 s3n 前缀的所有变体在 REPL 中都可以正常工作,但在 JAR 中它们会返回以下内容: org.apache.hadoop.mapred.InvalidInputException:输入路径不存在:s3://bucket/path/eventsByOS.csv 所以,它找到了文件但无法读取它。

认为这是权限问题,我尝试过:

a. export AWS_ACCESS_KEY_ID=<access key> and export AWS_SECRET_ACCESS_KEY=<secret> from the Linux prompt.  With Spark 2 this has been sufficient to provide us access to the S3 folders up until now.
b. .config("fs.s3.access.key", <access>)
.config("fs.s3.secret.key", <secret>)
.config("fs.s3n.access.key", <access>)
.config("fs.s3n.secret.key", <secret>)
.config("fs.s3a.access.key", <access>)
.config("fs.s3a.secret.key", <secret>)

在此失败之前,代码会读取位于同一存储桶中的 parquet 文件并将 parquet 文件写入同一存储桶。 CSV 文件大小仅为 4.8 KB。

任何想法为什么会失败?

谢谢!

添加堆栈跟踪:

org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:253)
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:201)
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:281)
org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
scala.Option.getOrElse(Option.scala:121)
org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
scala.Option.getOrElse(Option.scala:121)
org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
scala.Option.getOrElse(Option.scala:121)
org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1332)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
org.apache.spark.rdd.RDD.take(RDD.scala:1326)
org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1367)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
org.apache.spark.rdd.RDD.first(RDD.scala:1366)
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.findFirstLine(CSVFileFormat.scala:206)
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:60)
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:184)
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:184)
scala.Option.orElse(Option.scala:289)
org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$getOrInferFileFormatSchema(DataSource.scala:183)
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:415)
org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:352)

【问题讨论】:

  • 完整的堆栈跟踪是什么?
  • 感谢@SteveLoughran 的提醒 :-)

标签: csv apache-spark amazon-s3


【解决方案1】:

当我将该堆栈粘贴到 IDE 中时,什么也没有出现,但我正在查看更高版本的 Hadoop,目前无法切换到旧版本。

  1. 看看these instructions
  2. 那个 landsat gz 文件实际上是一个 CSV 文件,您可以尝试读入;它是我们通常用于测试的那个,因为它在那里并且可以免费使用。首先看看你是否可以使用它。
  3. 如果使用 spark 2.0,请使用 spark 自己的 CSV 包。
  4. 请使用 S3a,而不是其他。

【讨论】:

  • 谢谢史蒂夫。这是一组很棒的说明。虽然他们没有帮助阅读 CSV,但他们确实帮助我更快地写入文件。仅 spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2 就带来了巨大的改进。
  • 是的,重命名一个杀手。有一些正在进行的工作来解决这个问题,HADOOP-13786;尚未准备好使用,但欢迎您在它接近发布时帮助测试它。
【解决方案2】:

我通过为适当的方法(此处示例中的 s3)添加特定的 Hadoop 配置解决了这个问题。奇怪的是,上述安全性适用于 Spark 2.0 中的所有内容,除了读取 CSV。

这段代码使用 S3 解决了我的问题。

spark.sparkContext.hadoopConfiguration.set("fs.s3.awsAccessKeyId", p.aws_accessKey)
spark.sparkContext.hadoopConfiguration.set("fs.s3.awsSecretAccessKey",p.aws_secretKey)

【讨论】:

    猜你喜欢
    • 2022-01-13
    • 1970-01-01
    • 2015-12-04
    • 2021-08-15
    • 2020-02-04
    • 2021-05-13
    • 1970-01-01
    • 1970-01-01
    • 2016-11-16
    相关资源
    最近更新 更多