【问题标题】:GroupBy with condition on aggregate Spark/ScalaGroupBy 与聚合 Spark/Scala 的条件
【发布时间】:2019-10-02 20:03:20
【问题描述】:

我有一个这样的数据框:

|   ID_VISITE_CALCULE|       TAG_TS_TO_TS|EXTERNAL_PERSON_ID|EXTERNAL_ORGANISATION_ID| RK|
+--------------------+-------------------+------------------+------------------------+---+
|GA1.2.1023040287....|2019-04-23 11:24:19|            dupont|                    null|  1|
|GA1.2.1023040287....|2019-04-23 11:24:19|            durand|                    null|  2|
|GA1.2.105243141.1...|2019-04-23 11:21:01|              null|                    null|  1|
|GA1.2.1061963529....|2019-04-23 11:12:19|              null|                    null|  1|
|GA1.2.1065635192....|2019-04-23 11:07:14|            antoni|                    null|  1|
|GA1.2.1074357108....|2019-04-23 11:11:34|              lang|                    null|  1|
|GA1.2.1074357108....|2019-04-23 11:12:37|              lang|                    null|  2|
|GA1.2.1075803022....|2019-04-23 11:28:38|            cavail|                    null|  1|
|GA1.2.1080137035....|2019-04-23 11:20:00|              null|                    null|  1|
|GA1.2.1081805479....|2019-04-23 11:10:49|              null|                    null|  1|
|GA1.2.1081805479....|2019-04-23 11:10:49|            linare|                    null|  2|
|GA1.2.1111218536....|2019-04-23 11:28:43|              null|                    null|  1|
|GA1.2.1111218536....|2019-04-23 11:32:26|              null|                    null|  2|
|GA1.2.1111570355....|2019-04-23 11:07:00|              null|                    null|  1|
+--------------------+-------------------+------------------+------------------------+---+

我正在尝试应用规则来按 ID_VISITE_CALCULE 进行聚合,并为一个 ID 保留一行。

对于一个ID(一个组),我希望:

  • 获取组的第一个时间戳并将其存储在 START 列中

  • 获取组的最后一个时间戳并将其存储在 END 列中

  • 测试整个组的 EXTERNAL_PERSON_ID 是否相同。 如果是这种情况并且它是NULL,那么我写NULL,如果它是并且它是一个名字,那么我写这个名字。最后,如果组中有不同的值,那么我注册 UNDEFINED

  • 对列 EXTERNAL_ORGANIZATION_ID 应用完全相同的规则
RESULT :
+--------------------+------------------+------------------------+-------------------+-------------------+
|   ID_VISITE_CALCULE|EXTERNAL_PERSON_ID|EXTERNAL_ORGANISATION_ID|              START|                END|
+--------------------+------------------+------------------------+-------------------+-------------------+
|GA1.2.1023040287....|         undefined|                    null|2019-04-23 11:24:19|2019-04-23 11:24:19|
|GA1.2.105243141.1...|              null|                    null|2019-04-23 11:21:01|2019-04-23 11:21:01|
|GA1.2.1061963529....|              null|                    null|2019-04-23 11:12:19|2019-04-23 11:12:19|
|GA1.2.1065635192....|            antoni|                    null|2019-04-23 11:07:14|2019-04-23 11:07:14|
|GA1.2.1074357108....|              lang|                    null|2019-04-23 11:11:34|2019-04-23 11:12:37|
|GA1.2.1075803022....|            cavail|                    null|2019-04-23 11:28:38|2019-04-23 11:28:38|
|GA1.2.1080137035....|              null|                    null|2019-04-23 11:20:00|2019-04-23 11:20:00|
|GA1.2.1081805479....|         undefined|                    null|2019-04-23 11:10:49|2019-04-23 11:10:49|
|GA1.2.1111218536....|              null|                    null|2019-04-23 11:28:43|2019-04-23 11:32:26|
|GA1.2.1111570355....|              null|                    null|2019-04-23 11:07:00|2019-04-23 11:07:00|
+--------------------+------------------+------------------------+-------------------+-------------------+

在我的示例中,一个组最多只有 2 行,但在实际数据集中,一个组可以有数百行。

感谢您的热心帮助。

【问题讨论】:

    标签: scala apache-spark aggregate


    【解决方案1】:

    所有都可以在单个 groupby 调用中完成,但是我建议将(轻微的)性能优势和代码的可读性拆分为 2 个调用:

    import org.apache.spark.sql.functions.{col, size, collect_set, max, min, when, lit}
    
    val res1DF = df.groupBy(col("ID_VISITE_CALCULE")).agg(
     min(col("START")).alias("START"),
     max(col("END")).alias("END"),
     collect_set(col("EXTERNAL_PERSON_ID")).alias("EXTERNAL_PERSON_ID"),
     collect_set(col("EXTERNAL_ORGANIZATION_ID")).alias("EXTERNAL_ORGANIZATION_ID")
    )
    
    val res2DF = res1DF.withColumn("EXTERNAL_PERSON_ID",
     when(
      size(col("EXTERNAL_PERSON_ID")) > 1, 
      lit("UNDEFINED")).otherwise(col("EXTERNAL_PERSON_ID").getItem(0)
     )
    ).withColumn("EXTERNAL_ORGANIZATION_ID",
     when(
      size(col("EXTERNAL_ORGANIZATION_ID")) > 1, 
      lit("UNDEFINED")).otherwise(col("EXTERNAL_ORGANIZATION_ID").getItem(0)
     )
    )
    

    getItem 方法在后台处理大部分条件。如果值集为空,则返回null,如果只有1个值,则返回该值。

    【讨论】:

    • 嗨,理查德,非常感谢您的回答。它完美地工作。如果我想添加我的起始数据框中的列,但我之前给出的示例中没有。例如,我有一个名为“CANAL_LD”的列,它始终包含一个组的唯一值。我无法使用 .withColumn 将其添加到聚合中。有没有“优雅”的解决方案?感谢您的回复
    • 当然,如果您没有关于如何在聚合组中选择特定值的特定逻辑,您可以随时使用first 函数,即df.groupBy("ID_VISITE_CALCULE").agg(first("CANAL_LD"), ...),其中... 是其余的聚合。
    • 嗨,理查德,抱歉打扰了。我想我已经找到了解决办法。我使用“第一”功能。谢谢
    • 感谢所有理查德。祝你有美好的一天:-)
    【解决方案2】:

    /如果您显示一些代码/示例数据来自构建数据框的位置会很好。

    假设您的数据框为tableDf

    ** Spark Sql 解决方案 **

    tableDf.createOrReplaceTempView("input_table")
    val sqlStr ="""
        select ID_VISITE_CALCULE,
               (case when count(distinct person_id_calculation) > 1 then "undefined"
                    when count(distinct person_id_calculation) = 1 and 
                         max(person_id_calculation) = "noNull" then ""
                    else max(person_id_calculation)) as EXTERNAL_PERSON_ID,
             -- do the same for EXTERNAL_ORGANISATION_ID
            max(start_v) as start_v, max(last_v) as last_v
    from
    (select ID_VISITE_CALCULE,
               ( case
                   when nvl(EXTERNAL_PERSON_ID,"noNull") = 
                        lag(EXTERNAL_PERSON_ID,1,"noNull")over(partition by 
                            ID_VISITE_CALCULE order by TAG_TS_TO_TS) then 
                            EXTERNAL_PERSON_ID
                   else "undefined" end ) AS person_id_calculation,
                -- Same calculation for EXTERNAL_ORGANISATION_ID
                first(TAG_TS_TO_TS) over(partition by ID_VISITE_CALCULE order by 
                                  TAG_TS_TO_TS) as START_V,
               last(TAG_TS_TO_TS) over(partition by ID_VISITE_CALCULE order by 
                                  TAG_TS_TO_TS) as last_V 
               from input_table ) a
       group by 1
    """
    val resultDf = spark.sql(sqlStr)
    

    【讨论】:

      猜你喜欢
      • 2019-02-27
      • 2019-08-10
      • 2021-04-07
      • 1970-01-01
      • 2018-09-29
      • 1970-01-01
      • 2016-07-30
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多