【问题标题】:Spark: GroupBy and collect_list while filtering by another columnSpark:GroupBy和collect_list同时按另一列过滤
【发布时间】:2021-03-31 03:48:12
【问题描述】:

我有以下数据框

+-----+-----+------+
|group|label|active|
+-----+-----+------+
|    a|    1|     y|
|    a|    2|     y|
|    a|    1|     n|
|    b|    1|     y|
|    b|    1|     n|
+-----+-----+------+

我想按“组”列分组并按“标签”列收集,同时过滤活动列中的值。

预期的结果是

+-----+---------+---------+----------+
|group| labelyes| labelno |difference|
+-----+---------+---------+----------+
|a    | [1,2]   | [1]     | [2]      |
|b    | [1]     | [1]     | []       |
+-----+---------+---------+----------+

我可以通过

轻松过滤“y”标签
val dfyes = df.filter($"active" === "y").groupBy("group").agg(collect_set("label"))

对于“n”值也是如此

val dfno = df.filter($"active" === "n").groupBy("group").agg(collect_set("label"))

但我不明白是否可以在过滤时同时聚合以及如何获得两组的差异。

【问题讨论】:

    标签: arrays scala apache-spark group-by apache-spark-sql


    【解决方案1】:

    你可以做一个pivot,并使用一些数组函数来获得差异:

    val df2 = df.groupBy("group").pivot("active").agg(collect_list("label")).withColumn(
        "difference", 
        array_union(
            array_except(col("n"), col("y")), 
            array_except(col("y"), col("n"))
        )
    )
    
    df2.show
    +-----+---+------+----------+
    |group|  n|     y|difference|
    +-----+---+------+----------+
    |    b|[1]|   [1]|        []|
    |    a|[1]|[1, 2]|       [2]|
    +-----+---+------+----------+
    

    【讨论】:

      【解决方案2】:

      感谢@mck 的帮助。我找到了解决问题的另一种方法,即在聚合期间使用when 进行过滤:

      df
         .groupBy("group")
         .agg(
              collect_set(when($"active" === "y", $"label")).as("labelyes"), 
              collect_set(when($"active" === "n", $"label")).as("labelno")
             )
      .withColumn("diff", array_except($"labelyes", $"labelno"))
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-03-22
        • 1970-01-01
        • 2016-02-05
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多