【问题标题】:Convert Spark DataFrame to HashMap of HashMaps将 Spark DataFrame 转换为 HashMap 的 HashMap
【发布时间】:2019-02-17 11:08:16
【问题描述】:

我有一个如下所示的数据框:

column1_ID column2 column3 column4
A_123      12      A       1
A_123      12      B       2
A_123      23      A       1 
B_456      56      DB      4 
B_456      56      BD      5
B_456      60      BD      3

我想将上面的 dataframe/rdd 转换为下面的 OUTPUT column1_ID(KEY): HashMap(Long, HashMap(String, Long))

'A_123': {12 : {'A': 1, 'B': 2}, 23: {'A': 1} }, 
'B_456': {56 : {'DB': 4, 'BD': 5}, 60: {'BD': 3} }

尝试使用 reduceByKey 和 groupByKey,但无法按预期转换输出。

【问题讨论】:

    标签: scala apache-spark dataframe apache-spark-sql rdd


    【解决方案1】:

    可以通过从最后三列创建复杂结构,然后应用 UDF:

    val data = List(
      ("A_123", 12, "A", 1),
      ("A_123", 12, "B", 2),
      ("A_123", 23, "A", 1),
      ("B_456", 56, "DB", 4),
      ("B_456", 56, "BD", 5),
      ("B_456", 60, "BD", 3))
    val df = data.toDF("column1_ID", "column2", "column3", "column4")
    
    val twoLastCompacted = df.withColumn("lastTwo", struct($"column3", $"column4"))
    twoLastCompacted.show(false)
    val grouppedByTwoFirst = twoLastCompacted.groupBy("column1_ID", "column2").agg(collect_list("lastTwo").alias("lastTwoArray"))
    grouppedByTwoFirst.show(false)
    
    val treeLastCompacted = grouppedByTwoFirst.withColumn("lastThree", struct($"column2", $"lastTwoArray"))
    treeLastCompacted.show(false)
    
    val gruppedByFirst = treeLastCompacted.groupBy("column1_ID").agg(collect_list("lastThree").alias("lastThreeArray"))
    gruppedByFirst.printSchema()
    gruppedByFirst.show(false)
    
    val structToMap = (value: Seq[Row]) =>
      value.map(v => v.getInt(0) ->
        v.getSeq(1).asInstanceOf[Seq[Row]].map(r => r.getString(0) -> r.getInt(1)).toMap)
        .toMap
    val structToMapUDF = udf(structToMap)
    gruppedByFirst.select($"column1_ID", structToMapUDF($"lastThreeArray")).show(false)
    

    输出:

    +----------+-------+-------+-------+-------+
    |column1_ID|column2|column3|column4|lastTwo|
    +----------+-------+-------+-------+-------+
    |A_123     |12     |A      |1      |[A,1]  |
    |A_123     |12     |B      |2      |[B,2]  |
    |A_123     |23     |A      |1      |[A,1]  |
    |B_456     |56     |DB     |4      |[DB,4] |
    |B_456     |56     |BD     |5      |[BD,5] |
    |B_456     |60     |BD     |3      |[BD,3] |
    +----------+-------+-------+-------+-------+
    
    +----------+-------+----------------+
    |column1_ID|column2|lastTwoArray    |
    +----------+-------+----------------+
    |B_456     |60     |[[BD,3]]        |
    |A_123     |12     |[[A,1], [B,2]]  |
    |B_456     |56     |[[DB,4], [BD,5]]|
    |A_123     |23     |[[A,1]]         |
    +----------+-------+----------------+
    
    +----------+-------+----------------+---------------------------------+
    |column1_ID|column2|lastTwoArray    |lastThree                        |
    +----------+-------+----------------+---------------------------------+
    |B_456     |60     |[[BD,3]]        |[60,WrappedArray([BD,3])]        |
    |A_123     |12     |[[A,1], [B,2]]  |[12,WrappedArray([A,1], [B,2])]  |
    |B_456     |56     |[[DB,4], [BD,5]]|[56,WrappedArray([DB,4], [BD,5])]|
    |A_123     |23     |[[A,1]]         |[23,WrappedArray([A,1])]         |
    +----------+-------+----------------+---------------------------------+
    
    root
     |-- column1_ID: string (nullable = true)
     |-- lastThreeArray: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- column2: integer (nullable = false)
     |    |    |-- lastTwoArray: array (nullable = true)
     |    |    |    |-- element: struct (containsNull = true)
     |    |    |    |    |-- column3: string (nullable = true)
     |    |    |    |    |-- column4: integer (nullable = false)
    
    +----------+--------------------------------------------------------------+
    |column1_ID|lastThreeArray                                                |
    +----------+--------------------------------------------------------------+
    |B_456     |[[60,WrappedArray([BD,3])], [56,WrappedArray([DB,4], [BD,5])]]|
    |A_123     |[[12,WrappedArray([A,1], [B,2])], [23,WrappedArray([A,1])]]   |
    +----------+--------------------------------------------------------------+
    
    +----------+----------------------------------------------------+
    |column1_ID|UDF(lastThreeArray)                                 |
    +----------+----------------------------------------------------+
    |B_456     |Map(60 -> Map(BD -> 3), 56 -> Map(DB -> 4, BD -> 5))|
    |A_123     |Map(12 -> Map(A -> 1, B -> 2), 23 -> Map(A -> 1))   |
    +----------+----------------------------------------------------+
    

    【讨论】:

    • 谢谢帕夏,我有这样的查找场景 1) 对于特定列 _1 包含一些 "x" ,然后它 column_2 , column_3 应该必须在表中查找 2) 这个查找table/db 提取具有映射的记录,即 column_2、column_3 的字段并转换相应的映射字段 ....如何实现?
    • 关于如何在 spark 中解决这个问题的任何建议...stackoverflow.com/questions/62933135/…
    • @BdEngineer 在 Someshwar Kale 的问题中的回答看起来不错。
    • 感谢 pasha 的快速回复...一个通用问题,如何在 SOF 上收到有关任何 spark 问题帖子的警报?
    • @BdEngineer 这是一个更难的问题,也许 SOF 有专门的帮助页面。
    【解决方案2】:

    您可以将 DF 转换为 rdd 并应用如下操作:

    scala> case class Data(col1: String, col2: Int, col3: String, col4: Int)
    defined class Data
    
    scala> var x: Seq[Data] = List(Data("A_123",12,"A",1), Data("A_123",12,"B",2), Data("A_123",23,"A",1), Data("B_456",56,"DB",4), Data("B_456",56,"BD",5), Data("B_456",60,"BD",3))
    x: Seq[Data] = List(Data(A_123,12,A,1), Data(A_123,12,B,2), Data(A_123,23,A,1), Data(B_456,56,DB,4), Data(B_456,56,BD,5), Data(B_456,60,BD,3))
    
    scala> sc.parallelize(x).groupBy(_.col1).map{a => (a._1, HashMap(a._2.groupBy(_.col2).map{b => (b._1, HashMap(b._2.groupBy(_.col3).map{c => (c._1, c._2.map(_.col4).head)}.toArray: _*))}.toArray: _*))}.toDF()
    res26: org.apache.spark.sql.DataFrame = [_1: string, _2: map<int,map<string,int>>]
    

    我已经用 sc.parallelize(x) 的数据结构初始化了一个 rdd

    【讨论】:

    • 感谢分享代码。删除“toDF()”后它工作正常。我收到“错误:无法找到存储在数据集中的类型的编码器。导入 spark.implicits 支持原始类型(Int、String 等)和产品类型(案例类)。将添加对序列化其他类型的支持在未来的版本中。”将其转换为数据框时。有什么想法吗?
    • 您需要在代码上方导入 spark.implicits._ 才能正常工作。
    • 已经试过了......但仍然给出同样的错误。不知道我是否遗漏了什么。
    猜你喜欢
    • 1970-01-01
    • 2017-08-14
    • 1970-01-01
    • 2012-06-02
    • 2012-08-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多