【问题标题】:pyspark and HDFS commandspyspark 和 HDFS 命令
【发布时间】:2016-03-04 22:12:40
【问题描述】:

我想在我的 Spark 程序 (Pyspark) 开始时做一些清理工作。例如,我想从以前的 HDFS 运行中删除数据。在 pig 中,这可以使用诸如

之类的命令来完成
fs -copyFromLocal ....

rmf /path/to-/hdfs

或在本地使用 sh 命令。

我想知道如何对 Pyspark 做同样的事情。

【问题讨论】:

  • 你不能用 Spark 做这样的事情。也许最好的选择是使用 oozie 工作流,您可以在其中放置 HDFS 命令和 Spark 作业,并且可以根据自己喜欢的逻辑组合它们。

标签: python apache-spark hdfs pyspark


【解决方案1】:

您可以在不使用第三方依赖项的情况下删除PySpark 中的hdfs 路径,如下所示:

from pyspark.sql import SparkSession
# example of preparing a spark session
spark = SparkSession.builder.appName('abc').getOrCreate()
sc = spark.sparkContext
# Prepare a FileSystem manager
fs = (sc._jvm.org
      .apache.hadoop
      .fs.FileSystem
      .get(sc._jsc.hadoopConfiguration())
      )
path = "Your/hdfs/path"
# use the FileSystem manager to remove the path
fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path), True)

为了进一步改进,您可以将上述想法包装成一个辅助函数,您可以跨作业/包重复使用:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()

def delete_path(spark, path):
    sc = spark.sparkContext
    fs = (sc._jvm.org
          .apache.hadoop
          .fs.FileSystem
          .get(sc._jsc.hadoopConfiguration())
          )
    fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path), True)

delete_path(spark, "Your/hdfs/path")

【讨论】:

  • 我试过了,它奏效了。谢谢。如何使用此 API 列出 HDFS 中目录的所有分区?
【解决方案2】:

您可以使用表单示例 subprocess.callsh library 执行任意 shell 命令,所以这样的东西应该可以正常工作:

import subprocess

some_path = ...
subprocess.call(["hadoop", "fs", "-rm", "-f", some_path])

如果你使用 Python 2.x,你可以尝试使用 spotify/snakebite:

from snakebite.client import Client

host = ...
port = ...
client = Client(host, port)
client.delete(some_path, recurse=True)

hdfs3 是另一个可以用来做同样事情的库:

from hdfs3 import HDFileSystem

hdfs = HDFileSystem(host=host, port=port)
HDFileSystem.rm(some_path)

Apache Arrow Python bindings 是最新的选项(通常已经在 Spark 集群上可用,因为它是 pandas_udf 所必需的):

from pyarrow import hdfs

fs = hdfs.connect(host, port)
fs.delete(some_path, recursive=True)

【讨论】:

    【解决方案3】:

    来自https://diogoalexandrefranco.github.io/interacting-with-hdfs-from-pyspark/ 仅使用 PySpark

    ######
    # Get fs handler from java gateway
    ######
    URI = sc._gateway.jvm.java.net.URI
    Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
    FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
    fs = FileSystem.get(URI("hdfs://somehost:8020"), sc._jsc.hadoopConfiguration())
    
    # We can now use the Hadoop FileSystem API (https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html)
    fs.listStatus(Path('/user/hive/warehouse'))
    # or
    fs.delete(Path('some_path'))
    

    其他解决方案在我的情况下不起作用,但这篇博文有所帮助:)

    【讨论】:

    • somehost 是运行特定服务或任何其他规范的主机节点吗?尝试使用httpfs-hosting 节点时出现java.net.ConnectException: Connection refused; 错误。
    【解决方案4】:

    solution1 -子流程

    
    def copy_from_local(local_file, hdfs_file, logger):
        import subprocess
        proc = subprocess.Popen(["hdfs", "dfs", "-copyFromLocal", "-f", local_file, hdfs_file])
        proc.communicate()
    
        if proc.returncode != 0:
            logger.info("copyFromLocal {} to {} error".format(local_file, hdfs_file))
            return False
        else:
            logger.info("copyFromLocal {} to {} success".format(local_file, hdfs_file))
            return True
    

    解决方案2 -py4j

    
    def copy_from_local_file(sc, logger, local_file, hdfs_file, delSrc=True, overwrite=True):
        # copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
        Path = sc._jvm.org.apache.hadoop.fs.Path
        try:
            getFileSystem(sc).copyFromLocalFile(delSrc, overwrite, Path(local_file), Path(hdfs_file))
            logger.info("copyFromLocal {} to {} success".format(local_file, hdfs_file))
        except Exception as e:
            logger.error(e)
            logger.info("copyFromLocal {} to {} error".format(local_file, hdfs_file))
    
    
    def getFileSystem(sc):
        # Prepare a FileSystem manager
        FileSystem = sc._jvm.org.apache.hadoop.fs.FileSystem
        fs = FileSystem.get(sc._jsc.hadoopConfiguration())
        return fs
    
    

    你可以得到py4j jvm FileSystem 对象并在上面进行文件操作

    getFileSystem(sc) = {JavaObject} DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_15601292_17, ugi=jrsyb@HADOOP.COM (auth:KERBEROS)]]
     access = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e7d0>
     addCacheDirective = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432610>
     addCachePool = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435610>
     addDelegationTokens = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44323d0>
     allowSnapshot = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432990>
     append = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435c90>
     areSymlinksEnabled = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442cf10>
     cancelDeleteOnExit = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432850>
     clearStatistics = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e410>
     close = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442eb90>
     closeAll = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432c10>
     closeAllForUGI = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432150>
     completeLocalOutput = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435190>
     concat = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e450>
     copyFromLocalFile = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e650>
     copyToLocalFile = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e990>
     create = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432790>
     createEncryptionZone = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432310>
     createNewFile = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44321d0>
     createNonRecursive = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432f90>
     createSnapshot = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e290>
     createSymlink = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432a50>
     delete = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435390>
     deleteOnExit = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ec10>
     deleteSnapshot = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e110>
     disallowSnapshot = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ee50>
     enableSymlinks = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ed90>
     equals = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ef90>
     exists = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432050>
     finalizeUpgrade = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432890>
     get = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ea10>
     getAclStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432a90>
     getAllStatistics = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e8d0>
     getBlockSize = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e1d0>
     getCanonicalServiceName = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442cdd0>
     getChildFileSystems = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435410>
     getClass = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432290>
     getClient = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44358d0>
     getConf = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435750>
     getContentSummary = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44357d0>
     getCorruptBlocksCount = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432550>
     getDataNodeStats = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432950>
     getDefaultBlockSize = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432d50>
     getDefaultReplication = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e210>
     getDefaultUri = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435490>
     getDelegationToken = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435c50>
     getDiskStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ee10>
     getEZForPath = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435710>
     getFileBlockLocations = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e2d0>
     getFileBlockStorageLocations = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e890>
     getFileChecksum = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432d10>
     getFileLinkStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435a90>
     getFileStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e250>
     getFileSystemClass = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e850>
     getHomeDirectory = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44326d0>
     getInotifyEventStream = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432690>
     getLength = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435b90>
     getLinkTarget = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442cfd0>
     getLocal = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ef10>
     getMissingBlocksCount = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435450>
     getName = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432350>
     getNamed = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ea90>
     getRawCapacity = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435950>
     getRawUsed = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e690>
     getReplication = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e150>
     getScheme = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ec90>
     getServerDefaults = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432650>
     getSnapshotDiffReport = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e0d0>
     getSnapshottableDirListing = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435a50>
     getStatistics = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432490>
     getStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442cf50>
     getStoragePolicies = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432e90>
     getUnderReplicatedBlocksCount = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ced0>
     getUri = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44350d0>
     getUsed = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432f10>
     getWorkingDirectory = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44329d0>
     getXAttr = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432410>
     getXAttrs = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432b10>
     globStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435810>
     hashCode = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435510>
     initialize = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44320d0>
     isDirectory = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435150>
     isFile = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ed10>
     isFileClosed = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435650>
     isInSafeMode = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e4d0>
     listCacheDirectives = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442edd0>
     listCachePools = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435210>
     listCorruptFileBlocks = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432ad0>
     listEncryptionZones = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432750>
     listFiles = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435690>
     listLocatedStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442eb50>
     listStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432590>
     listXAttrs = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e550>
     makeQualified = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435b50>
     metaSave = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442cd50>
     mkdir = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e310>
     mkdirs = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e5d0>
     modifyAclEntries = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435b10>
     modifyCacheDirective = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435590>
     modifyCachePool = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435350>
     moveFromLocalFile = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432510>
     moveToLocalFile = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e510>
     newInstance = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432bd0>
     newInstanceLocal = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432710>
     notify = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e910>
     notifyAll = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435250>
     open = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432210>
     printStatistics = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e190>
     recoverLease = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432fd0>
     refreshNodes = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44355d0>
     removeAcl = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ce50>
     removeAclEntries = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e390>
     removeCacheDirective = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432a10>
     removeCachePool = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e710>
     removeDefaultAcl = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ce10>
     removeXAttr = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ec50>
     rename = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ee90>
     renameSnapshot = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432d90>
     resolvePath = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e9d0>
     restoreFailedStorage = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432810>
     rollEdits = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432dd0>
     rollingUpgrade = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44324d0>
     saveNamespace = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435c10>
     setAcl = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44352d0>
     setBalancerBandwidth = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e3d0>
     setConf = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44359d0>
     setDefaultUri = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432ed0>
     setOwner = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e050>
     setPermission = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442eb10>
     setQuota = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432c90>
     setReplication = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435990>
     setSafeMode = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435890>
     setStoragePolicy = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44354d0>
     setTimes = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435050>
     setVerifyChecksum = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432b90>
     setWorkingDirectory = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432e50>
     setWriteChecksum = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44351d0>
     setXAttr = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44328d0>
     startLocalOutput = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432190>
     supportsSymlinks = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435ad0>
     toString = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4dbcbdc10>
     wait = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e750>
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-06-14
      • 2016-04-02
      • 1970-01-01
      • 2013-08-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多