【问题标题】:Spark read file from S3 using sc.textFile ("s3n://...)Spark 使用 sc.textFile ("s3n://...) 从 S3 读取文件
【发布时间】:2015-08-31 07:22:32
【问题描述】:

尝试使用 spark-shell 读取位于 S3 中的文件:

scala> val myRdd = sc.textFile("s3n://myBucket/myFile1.log")
lyrics: org.apache.spark.rdd.RDD[String] = s3n://myBucket/myFile1.log MappedRDD[55] at textFile at <console>:12

scala> myRdd.count
java.io.IOException: No FileSystem for scheme: s3n
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2607)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2614)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
    ... etc ...

IOException: No FileSystem for scheme: s3n 发生错误:

  • 开发机器上的 Spark 1.31 或 1.40(无 Hadoop 库)
  • 从集成了 Spark 1.2.1 开箱即用的 Hortonworks Sandbox HDP v2.2.4 (Hadoop 2.60) 运行
  • 使用 s3:// 或 s3n:// 方案

这个错误的原因是什么?缺少依赖、缺少配置或误用sc.textFile()

或者这可能是由于一个影响特定于 Hadoop 2.60 的 Spark 构建的错误,正如 post 似乎暗示的那样。我将尝试使用 Spark for Hadoop 2.40 看看这是否能解决问题。

【问题讨论】:

    标签: java scala apache-spark rdd hortonworks-data-platform


    【解决方案1】:

    确认这与针对 Hadoop 2.60 的 Spark 构建有关。刚刚安装了Spark 1.4.0 "Pre built for Hadoop 2.4 and later"(而不是 Hadoop 2.6)。现在代码可以正常工作了。

    sc.textFile("s3n://bucketname/Filename") 现在引发另一个错误:

    java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).
    

    以下代码使用 S3 URL 格式显示 Spark 可以读取 S3 文件。使用开发机器(无 Hadoop 库)。

    scala> val lyrics = sc.textFile("s3n://MyAccessKeyID:MySecretKey@zpub01/SafeAndSound_Lyrics.txt")
    lyrics: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21
    
    scala> lyrics.count
    res1: Long = 9
    

    甚至更好:如果 AWS 密钥具有前向“/”,则上述代码在 S3N URI 中内嵌 AWS 凭证时会中断。在 SparkContext 中配置 AWS 凭证将修复它。无论 S3 文件是公共文件还是私有文件,代码都有效。

    sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "BLABLA")
    sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "....") // can contain "/"
    val myRDD = sc.textFile("s3n://myBucket/MyFilePattern")
    myRDD.count
    

    【讨论】:

    • Spark 1.6.0 和 Hadoop 2.4 为我工作。带有 Hadoop 2.6 的 Spark 1.6.0 没有。
    • @PriyankDesai 对于其他有同样问题的人,请参阅issues.apache.org/jira/browse/SPARK-7442 和评论部分中的链接。
    • 请参阅下面的答案,了解它不适用于 Hadoop 2.6 版本的原因。
    • 在我的 SparkContext 中添加以下内容解决了我的问题 code sc.hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") sc.hadoopConfiguration .set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")code
    • 请注意,您不应使用您的密钥和访问代码库的密钥签入代码。理想的方法是让您的集群环境假设您的 IAMRole 可以访问 S3。我从程序中删除了访问和密钥代码,但在 Amazon EMR sc.hadoopConfiguration.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem" 上运行时忘记删除以下代码),然后程序再次开始失败并出现上述错误。
    【解决方案2】:

    尽管这个问题已经得到了公认的答案,但我认为仍然缺少关于为什么会发生这种情况的确切细节。所以我认为可能还有其他答案。

    如果您添加所需的 hadoop-aws 依赖项,您的代码应该可以工作。

    从 Hadoop 2.6.0 开始,s3 FS 连接器已移至名为 hadoop-aws 的单独库。 还有一个 Jira: Move s3-related FS connector code to hadoop-aws.

    这意味着针对 Hadoop 2.6.0 或更高版本构建的任何 spark 版本都必须使用另一个外部依赖项才能连接到 S3 文件系统。
    这是我尝试过的一个 sbt 示例,它使用基于 Hadoop 2.6.0 构建的 Apache Spark 1.6.2 按预期工作:

    libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.6.0"

    在我的情况下,我遇到了一些依赖问题,所以我通过添加排除来解决:

    libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.6.0" exclude("tomcat", "jasper-compiler") excludeAll ExclusionRule(organization = "javax.servlet")

    关于其他相关说明,我还没有尝试过,但建议使用“s3a”而不是“s3n”文件系统启动 Hadoop 2.6.0。

    第三代,s3a:文件系统。该文件系统绑定旨在替代 s3n:,支持更大的文件并承诺更高的性能。

    【讨论】:

      【解决方案3】:

      您可以使用适当的 jar 添加 --packages 参数: 对您的提交:

      bin/spark-submit --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0 code.py
      

      【讨论】:

      • 看起来很有希望,但是当我使用 spark-submit --packages com.amazonaws:aws-java-sdk-pom:1.11.83,org.apache.hadoop:hadoop-aws:2.7.3 merge.py 执行此操作时,我无法下载 file:/home/jcomeau/.m2/repository/asm/asm/3.2/asm-3.2.jar。有什么想法吗?
      【解决方案4】:

      我必须将 jar 文件从 hadoop 下载复制到 $SPARK_HOME/jars 目录。对 spark-submit 使用 --jars 标志或 --packages 标志不起作用。

      详情:

      • Spark 2.3.0
      • 下载的 Hadoop 是 2.7.6
      • 复制的两个 jar 文件来自 (hadoop dir)/share/hadoop/tools/lib/
        • aws-java-sdk-1.7.4.jar
        • hadoop-aws-2.7.6.jar

      【讨论】:

        【解决方案5】:

        这是一个示例 spark 代码,可以读取 s3 上的文件

        val hadoopConf = sparkContext.hadoopConfiguration
        hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
        hadoopConf.set("fs.s3.awsAccessKeyId", s3Key)
        hadoopConf.set("fs.s3.awsSecretAccessKey", s3Secret)
        var jobInput = sparkContext.textFile("s3://" + s3_location)
        

        【讨论】:

          【解决方案6】:

          在 Spark 2.0.2 中遇到了同样的问题。通过喂它罐子解决它。这是我跑的:

          $ spark-shell --jars aws-java-sdk-1.7.4.jar,hadoop-aws-2.7.3.jar,jackson-annotations-2.7.0.jar,jackson-core-2.7.0.jar,jackson-databind-2.7.0.jar,joda-time-2.9.6.jar
          
          scala> val hadoopConf = sc.hadoopConfiguration
          scala> hadoopConf.set("fs.s3.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")
          scala> hadoopConf.set("fs.s3.awsAccessKeyId",awsAccessKeyId)
          scala> hadoopConf.set("fs.s3.awsSecretAccessKey", awsSecretAccessKey)
          scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
          scala> sqlContext.read.parquet("s3://your-s3-bucket/")
          

          显然,您需要在运行 spark-shell 的路径中放置 jars

          【讨论】:

          • 我在使用 Spark 2.1.0 时也遇到了这个问题,并将最新的 aws 要求 (spark.jars.packages org.apache.hadoop:hadoop-aws:2.7.3) 添加到“spark-defaults .conf”,成功了。
          【解决方案7】:

          有一个 Spark JIRA,SPARK-7481,于今天(2016 年 10 月 20 日)开放,以添加一个 spark-cloud 模块,其中包括对所有 s3a 和 azure wasb 的传递依赖:需要以及测试。

          还有一个 Spark PR 来匹配。这就是我在 Spark 构建中获得 s3a 支持的方式

          如果您手动执行此操作,您必须获得与您的 hadoop JARS 的其余部分具有的确切版本相同的 hadoop-aws JAR,以及与 Hadoop aws 编译所针对的 100% 同步的 AWS JAR 版本。对于 Hadoop 2.7。{1、2、3、...}

          hadoop-aws-2.7.x.jar 
          aws-java-sdk-1.7.4.jar
          joda-time-2.9.3.jar
          + jackson-*-2.6.5.jar
          

          将所有这些都粘贴到SPARK_HOME/jars。使用您在 Env vars 或 spark-default.conf 中设置的凭据运行 spark

          最简单的测试是你能做一个CSV文件的行数

          val landsatCSV = "s3a://landsat-pds/scene_list.gz"
          val lines = sc.textFile(landsatCSV)
          val lineCount = lines.count()
          

          得到一个数字:一切都很好。获取堆栈跟踪。坏消息。

          【讨论】:

          【解决方案8】:

          对于 Spark 1.4.x“为 Hadoop 2.6 及更高版本预构建”:

          我刚刚将所需的 S3、S3native 包从 hadoop-aws-2.6.0.jar 复制到 spark-assembly-1.4.1-hadoop2.6.0.jar.

          之后我重新启动了 spark 集群,它可以工作了。 不要忘记检查程序集 jar 的所有者和模式。

          【讨论】:

            【解决方案9】:

            我遇到了同样的问题。在设置 fs.s3n.impl 的值并添加 hadoop-aws 依赖项后,它工作正常。

            sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", awsAccessKeyId)
            sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", awsSecretAccessKey)
            sc.hadoopConfiguration.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
            

            【讨论】:

            • 在python中:AttributeError:'SparkContext'对象没有属性'hadoopConfiguration'
            • @UriGoren 在 Python 中,hadoopConfiguration 可以通过 java 实现访问:sc._jsc.hadoopConfiguration
            【解决方案10】:

            S3N 不是默认文件格式。您需要使用具有用于 AWS 兼容性的附加库的 Hadoop 版本构建您的 Spark 版本。我在这里找到的其他信息,https://www.hakkalabs.co/articles/making-your-local-hadoop-more-like-aws-elastic-mapreduce

            【讨论】:

              【解决方案11】:

              您可能必须使用 s3a:/ 方案而不是 s3:/ 或 s3n:/ 但是,对于火花壳来说,它不是开箱即用的(对我来说)。我看到以下堆栈跟踪:

              java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
                      at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074)
                      at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578)
                      at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
                      at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
                      at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
                      at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
                      at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
                      at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
                      at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
                      at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
                      at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
                      at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
                      at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
                      at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
                      at scala.Option.getOrElse(Option.scala:120)
                      at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
                      at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
                      at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
                      at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
                      at scala.Option.getOrElse(Option.scala:120)
                      at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
                      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781)
                      at org.apache.spark.rdd.RDD.count(RDD.scala:1099)
                      at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:24)
                      at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:29)
                      at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:31)
                      at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33)
                      at $iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
                      at $iwC$$iwC$$iwC.<init>(<console>:37)
                      at $iwC$$iwC.<init>(<console>:39)
                      at $iwC.<init>(<console>:41)
                      at <init>(<console>:43)
                      at .<init>(<console>:47)
                      at .<clinit>(<console>)
                      at .<init>(<console>:7)
                      at .<clinit>(<console>)
                      at $print(<console>)
                      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                      at java.lang.reflect.Method.invoke(Method.java:497)
                      at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
                      at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
                      at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
                      at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
                      at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
                      at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
                      at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
                      at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
                      at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
                      at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
                      at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
                      at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
                      at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
                      at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
                      at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
                      at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
                      at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
                      at org.apache.spark.repl.Main$.main(Main.scala:31)
                      at org.apache.spark.repl.Main.main(Main.scala)
                      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                      at java.lang.reflect.Method.invoke(Method.java:497)
                      at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
                      at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
                      at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
                      at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
                      at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
              Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
                      at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1980)
                      at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2072)
                      ... 68 more
              

              我的想法 - 您必须手动添加 hadoop-aws 依赖项 http://search.maven.org/#artifactdetails|org.apache.hadoop|hadoop-aws|2.7.1|jar 但我不知道如何正确将其添加到 spark-shell。

              【讨论】:

              • 使用--jars 参数将jar 的路径添加到spark-shell,逗号分隔。您还需要添加aws-java-sdk-*-jar
              【解决方案12】:
              1. maven repository 下载与您的hadoop 版本匹配的hadoop-aws jar。
              2. 将 jar 复制到$SPARK_HOME/jars 位置。

              现在在您的 Pyspark 脚本中,设置 AWS 访问密钥和秘密访问密钥。

              spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "ACCESS_KEY")
              spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "YOUR_SECRET_ACCESSS_KEY")
              
              // where spark is SparkSession instance
              

              对于 Spark scala:

              spark.sparkContext.hadoopConfiguration.set("fs.s3.awsAccessKeyId", "ACCESS_KEY")
              spark.sparkContext.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "YOUR_SECRET_ACCESSS_KEY")
              

              【讨论】:

                【解决方案13】:

                使用 s3a 而不是 s3n。我在 Hadoop 工作中遇到了类似的问题。从 s3n 切换到 s3a 后,它工作了。

                例如

                s3a://myBucket/myFile1.log

                【讨论】:

                  猜你喜欢
                  • 2023-03-19
                  • 2017-08-29
                  • 1970-01-01
                  • 1970-01-01
                  • 2015-12-04
                  • 1970-01-01
                  • 1970-01-01
                  相关资源
                  最近更新 更多