【问题标题】:Key corresponding to max value in a spark map column与火花映射列中的最大值对应的键
【发布时间】:2020-01-09 03:18:37
【问题描述】:

如果我有一个从字符串到双精度的火花映射列,是否可以轻松生成一个新列,其键对应于最大值?

我能够使用如下所示的集合函数来实现它:

import org.apache.spark.sql.functions._

val mockedDf = Seq(1, 2, 3)
  .toDF("id")
  .withColumn("optimized_probabilities_map", typedLit(Map("foo"->0.34333337, "bar"->0.23)))
val df = mockedDf
  .withColumn("optimizer_probabilities", map_values($"optimized_probabilities_map"))
  .withColumn("max_probability", array_max($"optimizer_probabilities"))
  .withColumn("max_position", array_position($"optimizer_probabilities", $"max_probability"))
  .withColumn("optimizer_ruler_names", map_keys($"optimized_probabilities_map"))
  .withColumn("optimizer_ruler_name", $"optimizer_ruler_names"( $"max_position"))

但是,此解决方案不必要地冗长且效率不高。还有一个可能的精度问题,因为我在使用array_position 时比较双精度。我想知道是否有更好的方法可以在没有 UDF 的情况下执行此操作,也许使用表达式字符串。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    您可以使用 Spark 2.4+,一种方法是使用 Spark-SQL 内置函数 aggregate,我们遍历所有 map_keys,然后将相应的 map_values 与缓冲值 acc.val 进行比较,然后相应地更新 acc.name

    mockedDf.withColumn("optimizer_ruler_name", expr("""
        aggregate(
          map_keys(optimized_probabilities_map), 
          (string(NULL) as name, double(NULL) as val),
          (acc, y) ->
            IF(acc.val is NULL OR acc.val < optimized_probabilities_map[y]
            , (y as name, optimized_probabilities_map[y] as val)
            , acc
            ),
          acc -> acc.name
        )
    """)).show(false)
    +---+--------------------------------+--------------------+
    |id |optimized_probabilities_map     |optimizer_ruler_name|
    +---+--------------------------------+--------------------+
    |1  |[foo -> 0.34333337, bar -> 0.23]|foo                 |
    |2  |[foo -> 0.34333337, bar -> 0.23]|foo                 |
    |3  |[foo -> 0.34333337, bar -> 0.23]|foo                 |
    +---+--------------------------------+--------------------+
    

    【讨论】:

      【解决方案2】:

      另一种解决方案是分解地图列,然后使用窗口函数来获取最大值,如下所示:

      import org.apache.spark.sql.expressions.Window
      
      val w = Window.partitionBy($"id")
      
      val df = mockedDf.select($"id", $"optimized_probabilities_map", explode($"optimized_probabilities_map"))
                       .withColumn("max_value", max($"value").over(w))
                       .where($"max_value" === $"value")
                       .drop("value", "max_value")
      

      【讨论】:

        猜你喜欢
        • 2021-10-17
        • 2019-09-17
        • 2017-06-17
        • 1970-01-01
        • 1970-01-01
        • 2017-02-04
        • 2015-10-05
        • 2017-12-09
        • 1970-01-01
        相关资源
        最近更新 更多