【问题标题】:org.apache.spark.rdd.RDD[((String, Double), (String, Double))] to Dataframe in Scalaorg.apache.spark.rdd.RDD[((String, Double), (String, Double))] 到 Scala 中的 Dataframe
【发布时间】:2018-02-24 21:23:39
【问题描述】:

我正在学习 Scala/Spark。 Scala 中很少有 groupby 操作导致下面的 rdd 。现在我正在尝试将以下内容写入 sql 数据帧并将其保存在 hadoop 中。但是,在将其写入 sql 数据帧时,它会转换为

示例 RDD 格式:

Array[((String, Double), (String, Double))] = Array(((Veterans Affairs Dept of,11669.0),(Veterans Affairs Dept of,101124.0)), ((Office Wisc Public Defender,40728.0),(Office Wisc Public Defender,40728.0)))

直接使用 .toDF 给出

 |                  _1|                  _2|
 +--------------------+--------------------+
 |[Veterans Affairs...|[Veterans Affairs...|
 |[Office Wisc Publ...|[Office Wisc Publ...| 
 |[Health Services,...|[Health Services,...|

我可以做些什么来获得如下所示格式的上述内容:

|                  _1|                  _2|_3|
+--------------------+--------------------+-----+
|[Veterans Affairs...|11669.0|101124|
|[Office Wisc Publ...|40728|40728|

【问题讨论】:

    标签: scala


    【解决方案1】:

    如果row._1._1 字符串等于row._2._1 字符串,则您有RDD((String, Double), (String, Double)) 并且您想转换为RDD(String, Double, Double)

    val input: Array[((String, Double), (String, Double))] =
        Array((("Veterans Affairs Dept of", 11669.0), ("Veterans Affairs Dept of", 101124.0)),
          (("Office Wisc Public Defender", 40728.0), ("Office Wisc Public Defender", 40728.0)))
    

    输入RDD[((String, Double), (String, Double))]

    val myRDD: RDD[((String, Double), (String, Double))] = sc.parallelize(input)
    

    使用flatMap 转换为RDD[(String, Double, Double)]

    val resultRDD: RDD[(String, Double, Double)] =
        myRDD.flatMap(row => row._1._1 match {
          case firstString if firstString == row._2._1 =>
            Some((firstString, row._1._2, row._2._2))
          case _ => None
        })
    

    将 RDD 转换为 Data Frame。

    resultRDD.toDF().show()
    

    结果:

    +--------------------+-------+--------+
    |                  _1|     _2|      _3|
    +--------------------+-------+--------+
    |Veterans Affairs ...|11669.0|101124.0|
    |Office Wisc Publi...|40728.0| 40728.0|
    +--------------------+-------+--------+
    

    【讨论】:

      【解决方案2】:

      由于您使用了 groupBy 操作,所以我将假设 Array[((String,Double),(String,Double))] 中的两个字符串是相同的。如果是这样,那么您可以尝试以下方法:

      val myRDD=Array[((String,Double),(String,Double))]
      
      val strings = myRDD.map(a=>a._1._1)
      
      val values = myRDD.map(a=>(a._1._2,a._2._2))
      
      val rows = strings.zip(values)
      
      val rowsDF=rows.map{case (a,b)=>(a,b._1,b._2)}.toDF
      

      例如,考虑以下虚拟数据

      val myRDD=sc.parallelize(Array((("string1",1.0),("string1",2.0)),(("string2",3.0),("string2",4.0))))
      
      myRDD: org.apache.spark.rdd.RDD[((String, Double), (String, Double))] = ParallelCollectionRDD[33] at parallelize at <console>:27
      

      输出将是

      scala> rowsDF: org.apache.spark.sql.DataFrame = [_1: string, _2: double, _3: double]
      scala> rowsDF.collect()
      res49: Array[org.apache.spark.sql.Row] = Array([string1,1.0,2.0], [string2,3.0,4.0])
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2014-08-15
        • 1970-01-01
        • 1970-01-01
        • 2021-02-21
        • 2015-04-11
        • 2019-01-31
        • 1970-01-01
        • 2017-12-29
        相关资源
        最近更新 更多