【问题标题】:How can I get the file-name list of a directory from hdfs in pyspark? [closed]如何从 pyspark 中的 hdfs 获取目录的文件名列表? [关闭]
【发布时间】:2026-01-25 13:30:01
【问题描述】:

我在 hdfs 中有一个目录,其中包含许多文件。我知道目录的路径,我正在尝试获取目录包含的那些文件名的列表。我该怎么办?

如果我有如下目录:

+dir/
    +f1
    +f2
    +fN

我想得到一个列表如下:

[f1, f2, fN]

【问题讨论】:

  • 所提供的答案都不让您满意?

标签: file path directory pyspark hdfs


【解决方案1】:

对于 python 3 使用子进程库:

from subprocess import Popen, PIPE
hdfs_path = '/path/to/the/designated/folder'
process = Popen(f'hdfs dfs -ls -h {hdfs_path}', shell=True, stdout=PIPE, stderr=PIPE)
std_out, std_err = process.communicate()
list_of_file_names = [fn.split(' ')[-1].split('/')[-1] for fn in std_out.decode().readlines()[1:]][:-1]
list_of_file_names_with_full_address = [fn.split(' ')[-1] for fn in std_out.decode().readlines()[1:]][:-1]

【讨论】:

    【解决方案2】:

    Pyspark 中没有此功能(编辑:请参阅 Mariusz 的回答和最后的更新)- Python 包 pywebhdfs 中提供了此功能(只需由 pip install pywebhdfs 安装):

    from pywebhdfs.webhdfs import PyWebHdfsClient
    from pprint import pprint
    
    hdfs = PyWebHdfsClient(host='192.10.10.73',port='50070', user_name='ctsats')  # your Namenode IP & username here
    my_dir = 'user/ctsats'
    pprint(hdfs.list_dir(my_dir))
    

    结果是一个(相当长的)Python 字典(未显示)- 做一些实验来感受一下。您可以解析它以获取名称和类型(文件/目录),如下所示:

    data = hdfs.list_dir(my_dir)
    dd = [[x["pathSuffix"], x["type"]] for x in data["FileStatuses"]["FileStatus"]]
    dd
    # [[u'.Trash', u'DIRECTORY'], [u'.sparkStaging', u'DIRECTORY'], [u'checkpoint', u'DIRECTORY'], [u'datathon', u'DIRECTORY'], [u'ms-spark', u'DIRECTORY'], [u'projects', u'DIRECTORY'], [u'recsys', u'DIRECTORY'], [u'sparklyr', u'DIRECTORY'], [u'test.data', u'FILE'], [u'word2vec', u'DIRECTORY']]
    

    从这里开始,一个简单的列表理解应该可以完成这项工作 - 例如,在我同时存在文件和目录的情况下,我可以只保留目录:

    sub_dirs = [x[0] for x in dd if x[1]=='DIRECTORY']
    sub_dirs
    # [u'.Trash', u'.sparkStaging', u'checkpoint', u'datathon', u'ms-spark', u'projects', u'recsys', u'sparklyr', u'word2vec']
    

    为了比较,这里是同一目录的实际列表:

    [ctsats@dev-hd-01 ~]$ hadoop fs -ls
    Found 10 items
    drwx------   - ctsats supergroup          0 2016-06-08 13:31 .Trash
    drwxr-xr-x   - ctsats supergroup          0 2016-12-15 20:18 .sparkStaging
    drwxr-xr-x   - ctsats supergroup          0 2016-06-23 13:23 checkpoint
    drwxr-xr-x   - ctsats supergroup          0 2016-02-03 15:40 datathon
    drwxr-xr-x   - ctsats supergroup          0 2016-04-25 10:56 ms-spark
    drwxr-xr-x   - ctsats supergroup          0 2016-06-30 15:51 projects
    drwxr-xr-x   - ctsats supergroup          0 2016-04-14 18:55 recsys
    drwxr-xr-x   - ctsats supergroup          0 2016-11-07 12:46 sparklyr
    -rw-r--r--   3 ctsats supergroup         90 2016-02-03 16:55 test.data
    drwxr-xr-x   - ctsats supergroup          0 2016-12-15 20:18 word2vec
    

    必须启用 Hadoop 集群中的 WebHDFS 服务,即您的 hdfs-site.xml 文件必须包含以下条目:

    <property>
        <name>dfs.webhdfs.enabled</name>
        <value>true</value>
    </property>
    

    更新(在 Mariusz 的回答之后):在我的示例中,这是 Mariusz 对 Spark 1.6 的回答的改编(您需要将 spark 替换为 sc):

    path="/user/ctsats"
    fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
    list_status = fs.listStatus(sc._jvm.org.apache.hadoop.fs.Path(path))
    result = [file.getPath().getName() for file in list_status]
    result
    # [u'.Trash', u'.sparkStaging', u'checkpoint', u'datathon', u'ms-spark', u'projects', u'recsys', u'sparklyr', u'test.data', u'word2vec']
    

    这里的问题是文件和子文件夹都被返回了,没有任何方法可以区分它们。正如所展示的,pywebhdfs 解决方案不会受此影响...

    我想有办法克服这个问题,但您必须深入研究 py4j API - 尽管有欺骗性的外观,list_status 不是 Python 列表:

    list_status
    # JavaObject id=o40
    

    【讨论】:

      【解决方案3】:

      您可以在 pyspark 中使用 HDFS(或任何其他兼容的 Hadoop 文件系统)API,并带有一点 py4j 魔法。要列出特定目录中的文件,请使用:

      path = "/here/is/my/dir/"
      fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
      list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(path))
      result = [file.getPath().getName() for file in list_status]
      

      list_status 集合的元素是 FileSystem 类型。使用此 API,您可以获取文件元数据,例如目录、模式、所有者、组、acls 等信息,并使用这些信息过滤掉不需要的文件。

      【讨论】: