【问题标题】:How to aggregate on two columns in Spark SQL如何在 Spark SQL 中聚合两列
【发布时间】:2023-04-02 06:52:01
【问题描述】:

现在我有一张桌子,上面有以下任务:

  1. 按 DepartmentID 和 EmployeeID 上的函数分组
  2. 在每个组中,我需要按 (ArrivalDate, ArrivalTime) 排序并选择第一个。因此,如果两个日期不同,请选择较新的日期。如果两个日期相同,请选择较新的时间。

我正在尝试这种方法:

input.select("DepartmenId","EmolyeeID", "ArrivalDate", "ArrivalTime", "Word")
  .agg(here will be the function that handles logic from 2)
  .show()

这里聚合的语法是什么?

提前谢谢你。

// +-----------+---------+-----------+-----------+--------+
// |DepartmenId|EmolyeeID|ArrivalDate|ArrivalTime|   Word |
// +-----------+---------+-----------+-----------+--------+
// |     D1    |   E1    |  20170101 |    0730   |  "YES" |
// +-----------+---------+-----------+-----------+--------+
// |     D1    |   E1    |  20170102 |    1530   |  "NO"  |
// +-----------+---------+-----------+-----------+--------+
// |     D1    |   E2    |  20170101 |    0730   |  "ZOO" |
// +-----------+---------+-----------+-----------+--------+
// |     D1    |   E2    |  20170102 |    0330   |  "BOO" |
// +-----------+---------+-----------+-----------+--------+
// |     D2    |   E1    |  20170101 |    0730   |  "LOL" |
// +-----------+---------+-----------+-----------+--------+
// |     D2    |   E1    |  20170101 |    1830   |  "ATT" |
// +-----------+---------+-----------+-----------+--------+
// |     D2    |   E2    |  20170105 |    1430   |  "UNI" |
// +-----------+---------+-----------+-----------+--------+


// output should be

// +-----------+---------+-----------+-----------+--------+
// |DepartmenId|EmolyeeID|ArrivalDate|ArrivalTime|   Word |
// +-----------+---------+-----------+-----------+--------+
// |     D1    |   E1    |  20170102 |    1530   |  "NO"  |
// +-----------+---------+-----------+-----------+--------+
// |     D1    |   E2    |  20170102 |    0330   |  "BOO" |
// +-----------+---------+-----------+-----------+--------+
// |     D2    |   E1    |  20170101 |    1830   |  "ATT" |
// +-----------+---------+-----------+-----------+--------+
// |     D2    |   E2    |  20170105 |    1430   |  "UNI" |
// +-----------+---------+-----------+-----------+--------+

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    您可以在包含所有非分组列的新结构列上使用max,首先使用ArrivalData,然后使用ArrivalTime:该新列的排序将符合您的要求(最新日期优先;最新相似日期之间的第一个小时),因此获得最大值将产生您所追求的记录。

    然后,您可以使用select 操作将结构“拆分”回单独的列。

    import spark.implicits._
    import org.apache.spark.sql.functions._
    
    df.groupBy($"DepartmentID", $"EmployeeID")
      .agg(max(struct("ArrivalDate", "ArrivalTime", "Word")) as "struct")
      .select($"DepartmentID", $"EmployeeID",
        $"struct.ArrivalDate" as "ArrivalDate",
        $"struct.ArrivalTime" as "ArrivalTime",
        $"struct.Word" as "Word"
      )
    

    【讨论】:

    • 这行得通!但是有没有一种方法可以避免 struct() 中出现“Word”?因为在我的真实示例中,“Word”不止一列。所以图像有“Word1”,“Word2”,“Word3”......,有没有办法让我可以在 .select 语句中添加这些词?
    • 此技术仅在您将所有感兴趣的列“组合”到一个结构中时才有效 - 否则,仅在 一些 列上使用 max 不会获取其他列的正确值。如果您有很多这样的列,您可以尝试@leo-c 建议的替代 Window 方法,或者对df.columns 使用一些过滤来动态生成结构中包含的列列表,这样您就不必列出明确的。
    【解决方案2】:

    一种方法是使用 Spark Window 功能:

    val df = Seq(
      ("D1", "E1", "20170101", "0730", "YES"),
      ("D1", "E1", "20170102", "1530", "NO"),
      ("D1", "E2", "20170101", "0730", "ZOO"),
      ("D1", "E2", "20170102", "0330", "BOO"),
      ("D2", "E1", "20170101", "0730", "LOL"),
      ("D2", "E1", "20170101", "1830", "ATT"),
      ("D2", "E2", "20170105", "1430", "UNI")
    ).toDF(
      "DepartmenId", "EmolyeeID", "ArrivalDate", "ArrivalTime", "Word"
    )
    
    import org.apache.spark.sql.expressions.Window
    
    val df2 = df.withColumn("rowNum", row_number().over(
        Window.partitionBy("DepartmenId", "EmolyeeID").
          orderBy($"ArrivalDate".desc, $"ArrivalTime".desc)
      )).
      select("DepartmenId", "EmolyeeID", "ArrivalDate", "ArrivalTime","Word").
      where($"rowNum" === 1).
      orderBy("DepartmenId", "EmolyeeID")
    
    df2.show
    +-----------+---------+-----------+-----------+----+
    |DepartmenId|EmolyeeID|ArrivalDate|ArrivalTime|Word|
    +-----------+---------+-----------+-----------+----+
    |         D1|       E1|   20170102|       1530|  NO|
    |         D1|       E2|   20170102|       0330| BOO|
    |         D2|       E1|   20170101|       1830| ATT|
    |         D2|       E2|   20170105|       1430| UNI|
    +-----------+---------+-----------+-----------+----+
    

    【讨论】:

    • 谢谢!使用 Window.Partition 是否比使用 GroupBy 更好?我想知道速度和性能是否存在权衡
    • @vincwng,我还没有对 Window 函数和 groupBy 进行任何基准测试,但我发现 Window 函数通常有助于降低必要的嵌套查询(例如自联接)的级别,在这种情况下它们会提供更好的性能。
    猜你喜欢
    • 2018-07-24
    • 2018-01-11
    • 1970-01-01
    • 2020-07-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-03-13
    相关资源
    最近更新 更多