【发布时间】:2017-02-28 00:45:48
【问题描述】:
我有这样的 csv 文件输入:
time,col1,col2,col3
0,5,8,9
1,6,65,3
2,5,8,465,4
3,85,45,8
列数未知 我希望结果 RDD 的格式为:
(constant,column,time,value)
这意味着: ((car1,col1,0,5),(car1,col2,1,8)..)
我有 RDD 时间、行和标题
class SimpleCSVHeader(header:Array[String]) extends Serializable {
val index = header.zipWithIndex.toMap
def apply(array:Array[String], key:String):String = array(index(key))
}
val constant = "car1"
val csv = sc.textFile("C:\\file.csv")
val data = csv.map(line => line.split(",").map(elem => elem.trim))
val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line
val rows = data.filter(line => header(line,"time") != "time") // filter the header out
val time = rows.map(row => header(row,"time"))
但我不确定如何从中创建结果 RDD
【问题讨论】:
标签: scala parsing apache-spark cassandra rdd