【问题标题】:Load a file from SFTP server into spark RDD将文件从 SFTP 服务器加载到 spark RDD
【发布时间】:2018-02-05 21:27:18
【问题描述】:

如何将文件从 SFTP 服务器加载到 spark RDD。加载此文件后,我需要对数据执行一些过滤。该文件也是 csv 文件,所以请你帮我决定我应该使用 Dataframes 还是 RDDs。

【问题讨论】:

    标签: scala apache-spark spark-dataframe


    【解决方案1】:

    您可以通过以下方式在程序中使用spark-sftp 库:

    适用于 Spark 2.x

    Maven 依赖

    <dependency>
        <groupId>com.springml</groupId>
        <artifactId>spark-sftp_2.11</artifactId>
        <version>1.1.0</version>
    </dependency>
    

    SBT 依赖

    libraryDependencies += "com.springml" % "spark-sftp_2.11" % "1.1.0"
    

    与 Spark shell 一起使用

    可以使用 --packages 命令行选项将此包添加到 Spark。例如,在启动 spark shell 时包含它:

    $ bin/spark-shell --packages com.springml:spark-sftp_2.11:1.1.0
    

    Scala API

    // Construct Spark dataframe using file in FTP server
    val df = spark.read.
                format("com.springml.spark.sftp").
                option("host", "SFTP_HOST").
                option("username", "SFTP_USER").
                option("password", "****").
                option("fileType", "csv").
                option("inferSchema", "true").
                load("/ftp/files/sample.csv")
    
    // Write dataframe as CSV file to FTP server
    df.write.
          format("com.springml.spark.sftp").
          option("host", "SFTP_HOST").
          option("username", "SFTP_USER").
          option("password", "****").
          option("fileType", "csv").
          save("/ftp/files/sample.csv")
    

    适用于 Spark 1.x (1.5+)

    Maven 依赖

    <dependency>
        <groupId>com.springml</groupId>
        <artifactId>spark-sftp_2.10</artifactId>
        <version>1.0.2</version>
    </dependency>
    

    SBT 依赖

    libraryDependencies += "com.springml" % "spark-sftp_2.10" % "1.0.2"
    

    与 Spark shell 一起使用

    可以使用--packages 命令行选项将此包添加到 Spark。例如,在启动 spark shell 时包含它:

    $ bin/spark-shell --packages com.springml:spark-sftp_2.10:1.0.2
    

    Scala API

    import org.apache.spark.sql.SQLContext
    
    // Construct Spark dataframe using file in FTP server
    val sqlContext = new SQLContext(sc)
    val df = sqlContext.read.
                        format("com.springml.spark.sftp").
                        option("host", "SFTP_HOST").
                        option("username", "SFTP_USER").
                        option("password", "****").
                        option("fileType", "csv").
                        option("inferSchema", "true").
                        load("/ftp/files/sample.csv")
    
    // Write dataframe as CSV file to FTP server
    df.write().
          format("com.springml.spark.sftp").
          option("host", "SFTP_HOST").
          option("username", "SFTP_USER").
          option("password", "****").
          option("fileType", "csv").
          save("/ftp/files/sample.csv")
    

    有关spark-sftp的更多信息,您可以访问github页面springml/spark-sftp

    【讨论】:

    • spark.read 中,spark 是SparkSession,它是在 Spark 2.0 中引入的。如果您使用的是 Spark 版本 spark 代替 sqlContext
    • 可以读取数据吗?如果否,您能否提供完整的错误堆栈跟踪?
    【解决方案2】:

    使用 sftp 连接器直接从 SFTP 加载。

    https://github.com/springml/spark-sftp

    请记住,它是单线程应用程序,即使您没有指定它,也会将数据放入 hdfs。它将数据流式传输到 hdfs,然后在其上创建一个 DataFrame

    在加载时,我们需要指定更多参数。

    通常不指定位置也可以在您的用户 sudo 用户的 hdfs 时工作。它将在 hdfs 的 / 中创建临时文件,并在该过程完成后将其删除。

     val data = sparkSession.read.format("com.springml.spark.sftp").
          option("host", "host").
          option("username", "user").
          option("password", "password").
          option("fileType", "json").
          option("createDF", "true").
          option("hdfsTempLocation","/user/currentuser/").
          load("/Home/test_mapping.json");
    

    所有可用选项如下,源码

    https://github.com/springml/spark-sftp/blob/master/src/main/scala/com/springml/spark/sftp/DefaultSource.scala

    override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType) = {
        val username = parameters.get("username")
        val password = parameters.get("password")
        val pemFileLocation = parameters.get("pem")
        val pemPassphrase = parameters.get("pemPassphrase")
        val host = parameters.getOrElse("host", sys.error("SFTP Host has to be provided using 'host' option"))
        val port = parameters.get("port")
        val path = parameters.getOrElse("path", sys.error("'path' must be specified"))
        val fileType = parameters.getOrElse("fileType", sys.error("File type has to be provided using 'fileType' option"))
        val inferSchema = parameters.get("inferSchema")
        val header = parameters.getOrElse("header", "true")
        val delimiter = parameters.getOrElse("delimiter", ",")
        val createDF = parameters.getOrElse("createDF", "true")
        val copyLatest = parameters.getOrElse("copyLatest", "false")
        //System.setProperty("java.io.tmpdir","hdfs://devnameservice1/../")
        val tempFolder = parameters.getOrElse("tempLocation", System.getProperty("java.io.tmpdir"))
        val hdfsTemp = parameters.getOrElse("hdfsTempLocation", tempFolder)
        val cryptoKey = parameters.getOrElse("cryptoKey", null)
        val cryptoAlgorithm = parameters.getOrElse("cryptoAlgorithm", "AES")
    
        val supportedFileTypes = List("csv", "json", "avro", "parquet")
        if (!supportedFileTypes.contains(fileType)) {
          sys.error("fileType " + fileType + " not supported. Supported file types are " + supportedFileTypes)
        }
    

    【讨论】:

    • 如果您使用 Databricks,请注意设置选项(“tempLocation”,“/tmp”)
    • 有没有办法指定代理?如果没有,有什么解决方法吗?
    猜你喜欢
    • 2022-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-01-28
    • 2018-03-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多