【问题标题】:Spark transform RDD火花变换 RDD
【发布时间】:2017-02-28 00:45:48
【问题描述】:

我有这样的 csv 文件输入:

time,col1,col2,col3  
0,5,8,9 
1,6,65,3 
2,5,8,465,4 
3,85,45,8

列数未知 我希望结果 RDD 的格式为:

(constant,column,time,value) 

这意味着: ((car1,col1,0,5),(car1,col2,1,8)..)

我有 RDD 时间、行和标题

class SimpleCSVHeader(header:Array[String]) extends Serializable {
    val index = header.zipWithIndex.toMap
    def apply(array:Array[String], key:String):String = array(index(key))
  }
  val constant = "car1"

  val csv = sc.textFile("C:\\file.csv")  

  val data = csv.map(line => line.split(",").map(elem => elem.trim)) 

  val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line
  val rows = data.filter(line => header(line,"time") != "time") // filter the header out
  val time = rows.map(row => header(row,"time"))

但我不确定如何从中创建结果 RDD

【问题讨论】:

    标签: scala parsing apache-spark cassandra rdd


    【解决方案1】:

    我的建议是在您的场景中使用 DataFrame 而不是 RDD。但是我厌倦了为您提供受数据量影响的工作解决方案。

          val lines = Array("time,col1,col2,col3", "0,5,8,9", "1,6,65,3", "2,5,8,465,4")
    
            val sc = prepareConfig()    
            val baseRDD = sc.parallelize(lines)    
            val columList = baseRDD.take(1)
    
    //Prepare column list. this code can be avoided if you use DataFrames
            val map = scala.collection.mutable.Map[Int, String]()
            columList.foreach { x =>
              {
    
            var index: Int = 0
                x.split(",").foreach { x =>
                  {
                    index += 1
                    map += (index -> x)
    
                  }
                }
    
              }
            }
    
            val mapRDD = baseRDD.flatMap { line =>
              {
                val splits = line.split(",")
    
    //Replace Tuples with your case classes 
                Array(("car1", map(2), splits(0), splits(1)), ("car1", map(3), splits(0), splits(2)), ("car1", map(4), splits(0), splits(3)))
              }
            }
    
            mapRDD.collect().foreach(f => println(f))
    

    结果:

    (car1,col1,0,5) (car1,col2,0,8) (car1,col3,0,9) (car1,col1,1,6) (car1,col2,1,65) (car1,col3,1,3) (car1,col1,2,5) (car1,col2,2,8) (car1,col3,2,465)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2014-08-30
      • 1970-01-01
      • 2017-02-22
      • 1970-01-01
      • 1970-01-01
      • 2018-04-26
      • 2021-01-20
      相关资源
      最近更新 更多