【问题标题】:How to list all files in a directory and its subdirectories in hadoop hdfs如何在hadoop hdfs中列出目录及其子目录中的所有文件
【发布时间】:2018-07-20 21:10:14
【问题描述】:

我在 hdfs 中有一个文件夹,其中有两个子文件夹,每个子文件夹大约有 30 个子文件夹,最后,每个子文件夹都包含 xml 文件。 我想列出所有 xml 文件,只给出主文件夹的路径。 在本地,我可以使用apache commons-io's FileUtils.listFiles() 执行此操作。 这个我试过了

FileStatus[] status = fs.listStatus( new Path( args[ 0 ] ) );

但它只列出了前两个子文件夹,并没有更进一步。 有没有办法在hadoop中做到这一点?

【问题讨论】:

  • 我知道这是一个面向 java 的问题,但如果其他阅读者可以选择使用操作系统命令,hadoop fs -ls -R /user/your_directory 应该递归列出目录

标签: java hadoop hdfs


【解决方案1】:

如果您使用 hadoop 2.* API,还有更优雅的解决方案:

    Configuration conf = getConf();
    Job job = Job.getInstance(conf);
    FileSystem fs = FileSystem.get(conf);

    //the second boolean parameter here sets the recursion to true
    RemoteIterator<LocatedFileStatus> fileStatusListIterator = fs.listFiles(
            new Path("path/to/lib"), true);
    while(fileStatusListIterator.hasNext()){
        LocatedFileStatus fileStatus = fileStatusListIterator.next();
        //do stuff with the file like ...
        job.addFileToClassPath(fileStatus.getPath());
    }

【讨论】:

  • getConf() 方法是什么?
  • getConf()Configured 类中的一个方法。理想情况下,您的课程会扩展它。
【解决方案2】:

您需要使用FileSystem 对象并对生成的 FileStatus 对象执行一些逻辑,以手动递归到子目录中。

您还可以应用 PathFilter 以使用 listStatus(Path, PathFilter) 方法仅返回 xml 文件

hadoop FsShell 类有关于 hadoop fs -lsr 命令的示例,这是一个递归 ls - 请参阅 the source,在第 590 行附近(递归步骤在第 635 行触发)

【讨论】:

  • 最后我做了一个比你建议的更简单的实现,但你给了我这个想法。谢谢!!!
  • 参考链接失效
  • @nik686 能否提供您申请的解决方案
【解决方案3】:
/**
 * @param filePath
 * @param fs
 * @return list of absolute file path present in given path
 * @throws FileNotFoundException
 * @throws IOException
 */
public static List<String> getAllFilePath(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
    List<String> fileList = new ArrayList<String>();
    FileStatus[] fileStatus = fs.listStatus(filePath);
    for (FileStatus fileStat : fileStatus) {
        if (fileStat.isDirectory()) {
            fileList.addAll(getAllFilePath(fileStat.getPath(), fs));
        } else {
            fileList.add(fileStat.getPath().toString());
        }
    }
    return fileList;
}

快速示例:假设您具有以下文件结构:

a  ->  b
   ->  c  -> d
          -> e 
   ->  d  -> f

使用上面的代码,你会得到:

a/b
a/c/d
a/c/e
a/d/f

如果您只想要叶子(即文件名),请在 else 块中使用以下代码:

 ...
    } else {
        String fileName = fileStat.getPath().toString(); 
        fileList.add(fileName.substring(fileName.lastIndexOf("/") + 1));
    }

这将给出:

b
d
e
f

【讨论】:

  • fileStat.getPath().toString() 结果格式 hdfs://nameservice1/knime/1/2e4a7753-ce62-4ab1-85a1-7dc4e7aae3d5.xlsx 以 hdfs://开头
【解决方案4】:

你试过了吗:

import java.io.*;
import java.util.*;
import java.net.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class cat{
    public static void main (String [] args) throws Exception{
        try{
            FileSystem fs = FileSystem.get(new Configuration());
            FileStatus[] status = fs.listStatus(new Path("hdfs://test.com:9000/user/test/in"));  // you need to pass in your hdfs path

            for (int i=0;i<status.length;i++){
                BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(status[i].getPath())));
                String line;
                line=br.readLine();
                while (line != null){
                    System.out.println(line);
                    line=br.readLine();
                }
            }
        }catch(Exception e){
            System.out.println("File not found");
        }
    }
}

【讨论】:

  • 是的,我看到了同样的例子,我在上面提到过。但是它列出了深入的子目录 1.我想从主文件夹中获取最终文件
【解决方案5】:

现在,人们可以使用 Spark 来做同样的事情,而且速度比其他方法(例如 Hadoop MR)更快。这是代码sn-p。

def traverseDirectory(filePath:String,recursiveTraverse:Boolean,filePaths:ListBuffer[String]) {
    val files = FileSystem.get( sparkContext.hadoopConfiguration ).listStatus(new Path(filePath))
            files.foreach { fileStatus => {
                if(!fileStatus.isDirectory() && fileStatus.getPath().getName().endsWith(".xml")) {                
                    filePaths+=fileStatus.getPath().toString()      
                }
                else if(fileStatus.isDirectory()) {
                    traverseDirectory(fileStatus.getPath().toString(), recursiveTraverse, filePaths)
                }
            }
    }   
}

【讨论】:

    【解决方案6】:

    递归和非递归方法的代码 sn-p:

    //helper method to get the list of files from the HDFS path
    public static List<String>
        listFilesFromHDFSPath(Configuration hadoopConfiguration,
                              String hdfsPath,
                              boolean recursive) throws IOException,
                                            IllegalArgumentException
    {
        //resulting list of files
        List<String> filePaths = new ArrayList<String>();
    
        //get path from string and then the filesystem
        Path path = new Path(hdfsPath);  //throws IllegalArgumentException
        FileSystem fs = path.getFileSystem(hadoopConfiguration);
    
        //if recursive approach is requested
        if(recursive)
        {
            //(heap issues with recursive approach) => using a queue
            Queue<Path> fileQueue = new LinkedList<Path>();
    
            //add the obtained path to the queue
            fileQueue.add(path);
    
            //while the fileQueue is not empty
            while (!fileQueue.isEmpty())
            {
                //get the file path from queue
                Path filePath = fileQueue.remove();
    
                //filePath refers to a file
                if (fs.isFile(filePath))
                {
                    filePaths.add(filePath.toString());
                }
                else   //else filePath refers to a directory
                {
                    //list paths in the directory and add to the queue
                    FileStatus[] fileStatuses = fs.listStatus(filePath);
                    for (FileStatus fileStatus : fileStatuses)
                    {
                        fileQueue.add(fileStatus.getPath());
                    } // for
                } // else
    
            } // while
    
        } // if
        else        //non-recursive approach => no heap overhead
        {
            //if the given hdfsPath is actually directory
            if(fs.isDirectory(path))
            {
                FileStatus[] fileStatuses = fs.listStatus(path);
    
                //loop all file statuses
                for(FileStatus fileStatus : fileStatuses)
                {
                    //if the given status is a file, then update the resulting list
                    if(fileStatus.isFile())
                        filePaths.add(fileStatus.getPath().toString());
                } // for
            } // if
            else        //it is a file then
            {
                //return the one and only file path to the resulting list
                filePaths.add(path.toString());
            } // else
    
        } // else
    
        //close filesystem; no more operations
        fs.close();
    
        //return the resulting list
        return filePaths;
    } // listFilesFromHDFSPath
    

    【讨论】:

      【解决方案7】:

      这是一个代码 sn-p,它计算特定 HDFS 目录中的文件数(我用它来确定在特定 ETL 代码中使用多少个 reducer)。您可以轻松地对其进行修改以满足您的需求。

      private int calculateNumberOfReducers(String input) throws IOException {
          int numberOfReducers = 0;
          Path inputPath = new Path(input);
          FileSystem fs = inputPath.getFileSystem(getConf());
          FileStatus[] statuses = fs.globStatus(inputPath);
          for(FileStatus status: statuses) {
              if(status.isDirectory()) {
                  numberOfReducers += getNumberOfInputFiles(status, fs);
              } else if(status.isFile()) {
                  numberOfReducers ++;
              }
          }
          return numberOfReducers;
      }
      
      /**
       * Recursively determines number of input files in an HDFS directory
       *
       * @param status instance of FileStatus
       * @param fs instance of FileSystem
       * @return number of input files within particular HDFS directory
       * @throws IOException
       */
      private int getNumberOfInputFiles(FileStatus status, FileSystem fs) throws IOException  {
          int inputFileCount = 0;
          if(status.isDirectory()) {
              FileStatus[] files = fs.listStatus(status.getPath());
              for(FileStatus file: files) {
                  inputFileCount += getNumberOfInputFiles(file, fs);
              }
          } else {
              inputFileCount ++;
          }
      
          return inputFileCount;
      }
      

      【讨论】:

        【解决方案8】:

        不要使用递归方法(堆问题):) 使用队列

        queue.add(param_dir)
        while (queue is not empty){
        
          directory=  queue.pop
         - get items from current directory
         - if item is file add to a list (final list)
         - if item is directory => queue.push
        }
        

        这很简单,享受吧!

        【讨论】:

          【解决方案9】:

          感谢 Radu Adrian Moldovan 的建议。

          这是一个使用队列的实现:

          private static List<String> listAllFilePath(Path hdfsFilePath, FileSystem fs)
          throws FileNotFoundException, IOException {
            List<String> filePathList = new ArrayList<String>();
            Queue<Path> fileQueue = new LinkedList<Path>();
            fileQueue.add(hdfsFilePath);
            while (!fileQueue.isEmpty()) {
              Path filePath = fileQueue.remove();
              if (fs.isFile(filePath)) {
                filePathList.add(filePath.toString());
              } else {
                FileStatus[] fileStatus = fs.listStatus(filePath);
                for (FileStatus fileStat : fileStatus) {
                  fileQueue.add(fileStat.getPath());
                }
              }
            }
            return filePathList;
          }
          

          【讨论】:

            猜你喜欢
            • 2014-03-08
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2017-03-15
            • 1970-01-01
            • 1970-01-01
            • 2012-07-22
            相关资源
            最近更新 更多