【发布时间】:2018-02-05 21:27:18
【问题描述】:
如何将文件从 SFTP 服务器加载到 spark RDD。加载此文件后,我需要对数据执行一些过滤。该文件也是 csv 文件,所以请你帮我决定我应该使用 Dataframes 还是 RDDs。
【问题讨论】:
标签: scala apache-spark spark-dataframe
如何将文件从 SFTP 服务器加载到 spark RDD。加载此文件后,我需要对数据执行一些过滤。该文件也是 csv 文件,所以请你帮我决定我应该使用 Dataframes 还是 RDDs。
【问题讨论】:
标签: scala apache-spark spark-dataframe
您可以通过以下方式在程序中使用spark-sftp 库:
适用于 Spark 2.x
Maven 依赖
<dependency>
<groupId>com.springml</groupId>
<artifactId>spark-sftp_2.11</artifactId>
<version>1.1.0</version>
</dependency>
SBT 依赖
libraryDependencies += "com.springml" % "spark-sftp_2.11" % "1.1.0"
与 Spark shell 一起使用
可以使用 --packages 命令行选项将此包添加到 Spark。例如,在启动 spark shell 时包含它:
$ bin/spark-shell --packages com.springml:spark-sftp_2.11:1.1.0
Scala API
// Construct Spark dataframe using file in FTP server
val df = spark.read.
format("com.springml.spark.sftp").
option("host", "SFTP_HOST").
option("username", "SFTP_USER").
option("password", "****").
option("fileType", "csv").
option("inferSchema", "true").
load("/ftp/files/sample.csv")
// Write dataframe as CSV file to FTP server
df.write.
format("com.springml.spark.sftp").
option("host", "SFTP_HOST").
option("username", "SFTP_USER").
option("password", "****").
option("fileType", "csv").
save("/ftp/files/sample.csv")
适用于 Spark 1.x (1.5+)
Maven 依赖
<dependency>
<groupId>com.springml</groupId>
<artifactId>spark-sftp_2.10</artifactId>
<version>1.0.2</version>
</dependency>
SBT 依赖
libraryDependencies += "com.springml" % "spark-sftp_2.10" % "1.0.2"
与 Spark shell 一起使用
可以使用--packages 命令行选项将此包添加到 Spark。例如,在启动 spark shell 时包含它:
$ bin/spark-shell --packages com.springml:spark-sftp_2.10:1.0.2
Scala API
import org.apache.spark.sql.SQLContext
// Construct Spark dataframe using file in FTP server
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.
format("com.springml.spark.sftp").
option("host", "SFTP_HOST").
option("username", "SFTP_USER").
option("password", "****").
option("fileType", "csv").
option("inferSchema", "true").
load("/ftp/files/sample.csv")
// Write dataframe as CSV file to FTP server
df.write().
format("com.springml.spark.sftp").
option("host", "SFTP_HOST").
option("username", "SFTP_USER").
option("password", "****").
option("fileType", "csv").
save("/ftp/files/sample.csv")
有关spark-sftp的更多信息,您可以访问github页面springml/spark-sftp
【讨论】:
spark.read 中,spark 是SparkSession,它是在 Spark 2.0 中引入的。如果您使用的是 Spark 版本 spark 代替 sqlContext。
使用 sftp 连接器直接从 SFTP 加载。
https://github.com/springml/spark-sftp
请记住,它是单线程应用程序,即使您没有指定它,也会将数据放入 hdfs。它将数据流式传输到 hdfs,然后在其上创建一个 DataFrame
在加载时,我们需要指定更多参数。
通常不指定位置也可以在您的用户 sudo 用户的 hdfs 时工作。它将在 hdfs 的 / 中创建临时文件,并在该过程完成后将其删除。
val data = sparkSession.read.format("com.springml.spark.sftp").
option("host", "host").
option("username", "user").
option("password", "password").
option("fileType", "json").
option("createDF", "true").
option("hdfsTempLocation","/user/currentuser/").
load("/Home/test_mapping.json");
所有可用选项如下,源码
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType) = {
val username = parameters.get("username")
val password = parameters.get("password")
val pemFileLocation = parameters.get("pem")
val pemPassphrase = parameters.get("pemPassphrase")
val host = parameters.getOrElse("host", sys.error("SFTP Host has to be provided using 'host' option"))
val port = parameters.get("port")
val path = parameters.getOrElse("path", sys.error("'path' must be specified"))
val fileType = parameters.getOrElse("fileType", sys.error("File type has to be provided using 'fileType' option"))
val inferSchema = parameters.get("inferSchema")
val header = parameters.getOrElse("header", "true")
val delimiter = parameters.getOrElse("delimiter", ",")
val createDF = parameters.getOrElse("createDF", "true")
val copyLatest = parameters.getOrElse("copyLatest", "false")
//System.setProperty("java.io.tmpdir","hdfs://devnameservice1/../")
val tempFolder = parameters.getOrElse("tempLocation", System.getProperty("java.io.tmpdir"))
val hdfsTemp = parameters.getOrElse("hdfsTempLocation", tempFolder)
val cryptoKey = parameters.getOrElse("cryptoKey", null)
val cryptoAlgorithm = parameters.getOrElse("cryptoAlgorithm", "AES")
val supportedFileTypes = List("csv", "json", "avro", "parquet")
if (!supportedFileTypes.contains(fileType)) {
sys.error("fileType " + fileType + " not supported. Supported file types are " + supportedFileTypes)
}
【讨论】: