【问题标题】:Recursively fetch file contents from subdirectories using sc.textFile使用 sc.textFile 从子目录递归获取文件内容
【发布时间】:2023-04-05 17:38:01
【问题描述】:

SparkContext textFile 似乎只希望文件出现在给定的目录位置 - 它也没有

  • (a) 递归或
  • (b) 甚至 support 目录(尝试将目录读取为文件)

关于如何构建递归的任何建议 - 可能比手动创建递归文件列表/下降逻辑更简单?

这是用例:

下的文件

/data/tables/my_table

我希望能够通过 hdfs 调用读取该父目录下所有目录级别的所有文件。

更新

sc.textFile() 通过(子类)TextInputFormat 调用 Hadoop FileInputFormat。内部确实存在执行递归目录读取的逻辑 - 即首先检测条目是否为目录,如果是则降序:

<!-- language: java -->
     for (FileStatus globStat: matches) {
218          if (globStat.isDir()) {
219            for(FileStatus stat: fs.listStatus(globStat.getPath(),
220                inputFilter)) {
221              result.add(stat);
222            }          
223          } else {
224            result.add(globStat);
225          }
226        }

但是,当调用 sc.textFile 时,目录条目出现错误:“不是文件”。这种行为令人困惑 - 鉴于似乎已经为处理目录提供了适当的支持。

【问题讨论】:

  • 一种可能的解决方案:stackoverflow.com/questions/27914145/…
  • @JustinPihony 谢谢我没有使用 S3 - 所以不清楚该答案是否适用。
  • 您能提供一个示例目录布局吗?通配符语法不够吗?例如textFile(/path/*/*)
  • @NickChammas 不,通配符不起作用:它不会下降,并且遇到的任何目录都会生成错误“不是文件”

标签: java apache-spark


【解决方案1】:

我正在查看旧版本的 FileInputFormat..

之前设置递归配置ma​​preduce.input.fileinputformat.input.dir.recursive

scala> sc.textFile("dev/*").count
     java.io.IOException: Not a file: file:/shared/sparkup/dev/audit-release/blank_maven_build

默认值为 null/未设置,评估为“false”:

scala> sc.hadoopConfiguration.get("mapreduce.input.fileinputformat.input.dir.recursive")
res1: String = null

之后:

现在设置值:

sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true")

现在重试递归操作:

scala>sc.textFile("dev/*/*").count

..
res5: Long = 3481

So it works.

更新添加了 / 以实现@Ben 的每条评论的完全递归

【讨论】:

  • 这非常有用。还值得补充的是,每个级别都需要一个星号。我的数据存储方式更像dev/&lt;year&gt;/&lt;month&gt;/&lt;day&gt;。要获得多个月的数据,我需要像 sc.textFile("dev/2015/*/*").count 这样的语法
  • 感谢您的补充评论。会更新我的帖子
  • 我使用的是 Spark 1.6.1,不需要为每个级别添加星号。
  • 使用 Spark 1.6.2(通过 pyspark)并收到错误:AttributeError: 'SparkContext' object has no attribute 'hadoopConfiguration'
  • 好吧,我的错。显然在 Spark 1.6.2 中您不必设置此参数。似乎是默认值。
【解决方案2】:

我发现这些参数必须按以下方式设置:

.set("spark.hive.mapred.supports.subdirectories","true")
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true")

【讨论】:

    猜你喜欢
    • 2017-08-29
    • 1970-01-01
    • 2012-11-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-12-10
    • 2013-11-28
    相关资源
    最近更新 更多