【问题标题】:How to create a Spark SQL Dataframe with list of Map objects如何使用 Map 对象列表创建 Spark SQL 数据框
【发布时间】:2019-07-11 04:07:13
【问题描述】:

我在 List (Scala) 中有多个 Map[String, String]。例如:

map1 = Map("EMP_NAME" -> “Ahmad”, "DOB" -> “01-10-1991”, "CITY" -> “Dubai”)
map2 = Map("EMP_NAME" -> “Rahul”, "DOB" -> “06-12-1991”, "CITY" -> “Mumbai”)
map3 = Map("EMP_NAME" -> “John”, "DOB" -> “11-04-1996”, "CITY" -> “Toronto”)
list = List(map1, map2, map3)

现在我想用这样的东西创建一个数据框:

EMP_NAME    DOB             CITY
Ahmad       01-10-1991      Dubai
Rahul       06-12-1991      Mumbai
John        11-04-1996      Toronto

我如何做到这一点?

【问题讨论】:

    标签: scala apache-spark dataframe apache-spark-sql bigdata


    【解决方案1】:

    你可以这样做:

    import spark.implicits._
    
    val df = list
      .map( m => (m.get("EMP_NAME"),m.get("DOB"),m.get("CITY")))
      .toDF("EMP_NAME","DOB","CITY")
    
    df.show()
    
    +--------+----------+-------+
    |EMP_NAME|       DOB|   CITY|
    +--------+----------+-------+
    |   Ahmad|01-10-1991|  Dubai|
    |   Rahul|06-12-1991| Mumbai|
    |    John|11-04-1996|Toronto|
    +--------+----------+-------+
    

    【讨论】:

      【解决方案2】:

      稍微不太具体的方法,例如:

      val map1 = Map("EMP_NAME" -> "Ahmad", "DOB" -> "01-10-1991", "CITY" -> "Dubai")
      val map2 = Map("EMP_NAME" -> "John",  "DOB" -> "01-10-1992", "CITY" -> "Mumbai")
      ///...
      val list = List(map1, map2) // map3, ...
      val RDDmap = sc.parallelize(list)
      
      // Get cols dynamically
      val cols = RDDmap.take(1).flatMap(x=> x.keys)
      
      // Map is K,V like per Map entry
      val df = RDDmap.map{ value=>
                           val list=value.values.toList
                           (list(0), list(1), list(2))
             }.toDF(cols:_*) // dynamic column names assigned
      
      df.show(false)
      

      返回:

      +--------+----------+------+
      |EMP_NAME|DOB       |CITY  |
      +--------+----------+------+
      |Ahmad   |01-10-1991|Dubai |
      |John    |01-10-1992|Mumbai|
      +--------+----------+------+
      

      或回答您的子问题,如下所示 - 至少我认为这是您要问的,但可能不是:

      val RDDmap = sc.parallelize(List(
         Map("EMP_NAME" -> "Ahmad", "DOB" -> "01-10-1991", "CITY" -> "Dubai"),
         Map("EMP_NAME" -> "John",  "DOB" -> "01-10-1992", "CITY" -> "Mumbai")))
         ...
      
      // Get cols dynamically
      val cols = RDDmap.take(1).flatMap(x=> x.keys)
      
      // Map is K,V like per Map entry
      val df = RDDmap.map{ value=>
                       val list=value.values.toList
                       (list(0), list(1), list(2))
             }.toDF(cols:_*) // dynamic column names assigned
      

      您当然可以动态构建列表,但您仍然需要分配 Map 元素。见Appending Data to List or any other collection Dynamically in scala。我会从文件中读取并完成它。

      【讨论】:

      • 谢谢 man。还有一点:如何动态循环这个 (list(0), list(1), list(2)) ?我的意思是不是硬编码 1,2 和 3,而是可以从 list(i) 之类的东西中获取?
      • 更新的答案实际上是另一个答案。否则请接受。
      • @thebluephantom,我不认为Map.keys.values 的结果将始终保持KV 对顺序。
      • @LeoC 请详细说明
      • 如果m = Map(1->a, 2->b, ...),我认为假设m.keysm.values 的元素肯定会分别像1, 2, ...a, b, ... 这样排序,因为Map 都不是Set 也不保留秩序。
      【解决方案3】:
      import org.apache.spark.SparkContext
      import org.apache.spark.sql._
      import org.apache.spark.sql.types.{StringType, StructField, StructType}
      
      object DataFrameTest2 extends Serializable {
        var sparkSession: SparkSession = _
        var sparkContext: SparkContext = _
        var sqlContext: SQLContext = _
      
        def main(args: Array[String]): Unit = {
          sparkSession = SparkSession.builder().appName("TestMaster").master("local").getOrCreate()
          sparkContext = sparkSession.sparkContext
      
          val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
      
          val map1 = Map("EMP_NAME" -> "Ahmad", "DOB" -> "01-10-1991", "CITY" -> "Dubai")
          val map2 = Map("EMP_NAME" -> "Rahul", "DOB" -> "06-12-1991", "CITY" -> "Mumbai")
          val map3 = Map("EMP_NAME" -> "John", "DOB" -> "11-04-1996", "CITY" -> "Toronto")
          val list = List(map1, map2, map3)
      
          //create your rows
          val rows = list.map(m => Row(m.values.toSeq:_*))
      
          //create the schema from the header
          val header = list.head.keys.toList
          val schema = StructType(header.map(fieldName => StructField(fieldName, StringType, true)))
      
          //create your rdd
          val rdd = sparkContext.parallelize(rows)
      
          //create your dataframe using rdd
          val df = sparkSession.createDataFrame(rdd, schema)
          df.show()
        }
      }
      

      【讨论】:

      • 协议是您选择其他答案之一作为正确答案,除非没有其他人提供或您认为它们不合适。
      • 我认为在这种情况下所有答案都是正确的。你的和第一个也是。不知道如何标记多个正确答案。另外,我只是在寻找最通用的解决方案。实际上,我将为大约 40 多个列动态创建和填充数据集。顺便说一句,我非常感谢您提供的解决方案:)
      • 那么请接受它,因为它更灵活,对于 40 列的方法,。但是你可以投票。选择权在你。
      • 我多次赞成您的回答。但这是我得到的信息:感谢您的反馈!声望低于 15 人的投票将被记录,但不会更改公开显示的帖子得分。 :( 看来我需要先建立自己的声誉 :)
      • 对,就是这样。然后,您可以只接受答案,如果您认为它是最好的,那就这样做,否则选择另一个。
      猜你喜欢
      • 2017-09-12
      • 1970-01-01
      • 1970-01-01
      • 2021-03-03
      • 1970-01-01
      • 1970-01-01
      • 2020-12-20
      • 2019-06-12
      • 1970-01-01
      相关资源
      最近更新 更多