【问题标题】:How do I find a max of a column for each unique value in another column in Spark?如何在 Spark 的另一列中找到每个唯一值的最大值?
【发布时间】:2021-02-03 17:16:01
【问题描述】:

假设我有一个类似这样的数据集:

我的最终产品需要在一周中的每一天与当天活动最多的地方排成一行。即 Mon Place A 56、Wed Place C 64 等。我尝试过使用 Window 功能并使用 max 和 groupie,但我让自己感到困惑。

【问题讨论】:

    标签: scala apache-spark pyspark apache-spark-sql


    【解决方案1】:

    出于您的目的,您需要写window function

    val df = Seq(
      ("Mon", "Place A", 10),
      ("Mon", "Place B", 42),
      ("Wed", "Place C", 41),
      ("Thurs", "Place D", 45),
      ("Fri", "Place E", 64),
      ("Fri", "Place A", 12),
      ("Wed", "Place F", 54),
      ("Wed", "Place A", 1)
    ).toDF("day", "place", "number")
    df.show()
    df.withColumn("orderedNumberForDay",
      row_number()
        .over(
          Window.orderBy(col("number").desc)
            .partitionBy("day")
        )
    ).filter(col("orderedNumberForDay") === lit(1))
     .select("day", "place", "number")
     .show()
    /*                            
    +-----+-------+------+        +-----+-------+------+
    |  day|  place|number|        |  day|  place|number|
    +-----+-------+------+        +-----+-------+------+
    |  Mon|Place A|    10|        |  Mon|Place B|    42|
    |  Mon|Place B|    42|  ===>> |  Wed|Place F|    54|
    |  Wed|Place C|    41|        |  Fri|Place E|    64|
    |Thurs|Place D|    45|        |Thurs|Place D|    45|
    |  Fri|Place E|    64|        +-----+-------+------+
    |  Fri|Place A|    12|   
    |  Wed|Place F|    54|   
    |  Wed|Place A|     1|   
    +-----+-------+------+
    */
    

    简单解释一下它是如何工作的

    首先你需要添加带有window function结果的列,这里是:

    df.withColumn("orderedNumberForDay",
      row_number()
        .over(
          Window.orderBy(col("number").desc)
          .partitionBy("day")
        )
    )
    

    row_number() - 是partition 内的行计数器。 Partition 就像group by 中的组。 partitionBy("day") 只是将具有相同 day 列值的窗口分组。最后,我们必须在desc 订单中通过number 订购window,所以我们的window function 中有orderBy(col("number").descover 就像一座桥梁,从 windowswindows 内部的一些有用计算,它只是绑定 row_numberwindow function

    执行完这个阶段我们会有数据:

    +-----+-------+------+-------------------+
    |  day|  place|number|orderedNumberForDay|
    +-----+-------+------+-------------------+
    |  Mon|Place B|    42|                  1|
    |  Mon|Place A|    10|                  2|
    |  Wed|Place F|    54|                  1|
    |  Wed|Place C|    41|                  2|
    |  Wed|Place A|     1|                  3|
    |  Fri|Place E|    64|                  1|
    |  Fri|Place A|    12|                  2|
    |Thurs|Place D|    45|                  1|
    +-----+-------+------+-------------------+
    

    所以,我们只需要filter 行与orderedNumberForDay 等于1 - 它将与max number 并选择开始列:day, place, number。最终结果将是:

    +-----+-------+------+
    |  day|  place|number|
    +-----+-------+------+
    |  Mon|Place B|    42|
    |  Wed|Place F|    54|
    |  Fri|Place E|    64|
    |Thurs|Place D|    45|
    +-----+-------+------+
    

    【讨论】:

      【解决方案2】:

      Spark 3.0 引入了聚合函数 max_by,它可以满足您的需求:

      df.groupBy("day")
         .agg(expr("max_by(place, number)"), max('number))
         .show()
      

      结果:

      +-----+---------------------+-----------+
      |  day|max_by(place, number)|max(number)|
      +-----+---------------------+-----------+
      |  Mon|              Place B|         42|
      |  Wed|              Place F|         54|
      |  Fri|              Place E|         64|
      |Thurs|              Place D|         45|
      +-----+---------------------+-----------+
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2011-04-16
        • 2020-11-17
        • 1970-01-01
        • 1970-01-01
        • 2016-05-06
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多