【问题标题】:Spark dataframe to nested mapSpark数据框到嵌套地图
【发布时间】:2019-07-10 01:27:50
【问题描述】:

如何将 spark 中相当小的数据帧(最大 300 MB)转换为嵌套映射以改进 spark 的 DAG。我相信此操作将比稍后的连接 (Spark dynamic DAG is a lot slower and different from hard coded DAG) 更快,因为转换后的值是在自定义估计器的训练步骤中创建的。现在我只想在管道的预测步骤中快速应用它们。

val inputSmall = Seq(
    ("A", 0.3, "B", 0.25),
    ("A", 0.3, "g", 0.4),
    ("d", 0.0, "f", 0.1),
    ("d", 0.0, "d", 0.7),
    ("A", 0.3, "d", 0.7),
    ("d", 0.0, "g", 0.4),
    ("c", 0.2, "B", 0.25)).toDF("column1", "transformedCol1", "column2", "transformedCol2")

这给出了错误的地图类型

val inputToMap = inputSmall.collect.map(r => Map(inputSmall.columns.zip(r.toSeq):_*))

我更想要类似的东西:

Map[String, Map[String, Double]]("column1" -> Map("A" -> 0.3, "d" -> 0.0, ...), "column2" -> Map("B" -> 0.25), "g" -> 0.4, ...)

【问题讨论】:

    标签: scala apache-spark dataframe hashmap apache-spark-sql


    【解决方案1】:

    编辑:从最终地图中移除收集操作

    如果您使用的是 Spark 2+,这里有一个建议:

    val inputToMap = inputSmall.select(
      map($"column1", $"transformedCol1").as("column1"),
      map($"column2", $"transformedCol2").as("column2")
    )
    
    val cols = inputToMap.columns
    val localData = inputToMap.collect
    
    cols.map { colName => 
      colName -> localData.flatMap(_.getAs[Map[String, Double]](colName)).toMap
    }.toMap
    

    【讨论】:

      【解决方案2】:

      我不确定我是否遵循了动机,但我认为这是可以为您带来想要的结果的转变:

      // collect from DF (by your assumption - it is small enough)
      val data: Array[Row] = inputSmall.collect()
      
      // Create the "column pairs" -
      // can be replaced with hard-coded value: List(("column1", "transformedCol1"), ("column2", "transformedCol2"))
      val columnPairs: List[(String, String)] = inputSmall.columns
        .grouped(2)
        .collect { case Array(k, v) => (k, v) }
        .toList
      
      // for each pair, get data and group it by left-column's value, choosing first match
      val result: Map[String, Map[String, Double]] = columnPairs
        .map { case (k, v) => k -> data.map(r => (r.getAs[String](k), r.getAs[Double](v))) }
        .toMap
        .mapValues(l => l.groupBy(_._1).map { case (c, l2) => l2.head })
      
      result.foreach(println)
      // prints: 
      // (column1,Map(A -> 0.3, d -> 0.0, c -> 0.2))
      // (column2,Map(d -> 0.7, g -> 0.4, f -> 0.1, B -> 0.25))
      

      【讨论】:

        猜你喜欢
        • 2020-08-12
        • 2020-12-30
        • 1970-01-01
        • 2023-03-03
        • 2019-04-11
        • 2020-03-13
        • 1970-01-01
        • 2019-11-27
        • 2019-09-09
        相关资源
        最近更新 更多