【发布时间】:2025-12-01 09:05:01
【问题描述】:
我正在使用 Spark Dataframe API 从 NFS 共享加载/读取文件,然后将该文件的数据保存/写入 HDFS。
我有一个包含一个主节点和两个工作节点的三节点 Spark 集群。我的 Spark 集群使用 YARN 作为集群管理器,因此两个 Worker 节点是 YARN NodeManager 节点,主节点是 Yarn ResourceManager 节点。
我有一个远程位置,例如 /data/files,它安装到所有三个 YARN/SPARK 节点,因为它是 [/data/files],其中存在我想要读取的所有 csv 文件 [多个]从并最终写入 HDFS。
我在我的主节点上运行以下代码
import java.io.File
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext
object TestMoreThan1CSV2DF {
private val source: String = "file:///data/files/"
private val destination = "hdfs://<myHostIP>:8020/raw/"
private val fileFormat : String = "com.databricks.spark.csv"
def main(args:Array[String]):Unit={
val conf = new SparkConf().setAppName("TestMoreThan1CSV2DF").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val fileArray: Array[File] = new java.io.File(source).listFiles.filter(_.getName.endsWith(".csv"))
for(file<-fileArray){
// reading csv file from shared location and taking whole data in a dataframe
var df = loadCSV2DF(sqlContext, fileFormat, "true", "true", file.getName)
// variable for holding destination location : HDFS Location
var finalDestination: String = destination+file.getName
// saving data into HDFS
writeDF2HDFS(df,fileFormat,"true",finalDestination) /// saved using default number of partition = 1
}
}
def loadCSV2DF(sqlContext : SQLContext, fileFormat: String, header : String, inferSchema: String, source: String) : DataFrame = {
try{
sqlContext.read.format(fileFormat)
.option("header", header) // Use first line of all files as header
.option("inferSchema", inferSchema) // Automatically infer data types
.load(source)
}
catch{
case ex: OnboardingException => {
throw ex;
}
}
}
def writeDF2HDFS(df: DataFrame, fileFormat: String, header: String, destination: String, partitions: Integer = 1){
try{
df.repartition(partitions).write.format(fileFormat).option("header",header).save(destination)
}
catch{
Case ez : OnboardingException => {
throw ez;
}
}
}
}
此代码读取共享位置的所有 csv 文件 /data/files/ 并将它们中的每一个写入HDFS。前任: /data/files/f1.csv 将作为 /raw/f1.csv/part-xxxxx 加载到 HDFS 文件
运行此代码时,我无法辨认:
1) 整个代码在哪里运行?它在驱动程序上运行吗?或使用 两个工人?
2) load() 和 save() API 是否在工作节点上运行? 它并行工作?如果是,那么两名工人如何跟踪 它读过或写过的while部分?
3) 现在我是 在“for”循环中顺序读取每个文件并处理每个文件 它们按顺序排列,是否有可能使其成为多线程 应用程序,其中每个文件分配给一个线程执行 端到端并行读写。磁盘 IO 是否会受到任何约束 这样做的时候?
任何快速响应/参考/指针将不胜感激。
问候, 布佩什
【问题讨论】:
标签: hadoop apache-spark apache-spark-sql spark-dataframe hadoop2