【问题标题】:Grouping records based on continuous dates根据连续日期对记录进行分组
【发布时间】:2021-12-05 19:28:12
【问题描述】:

我是一名 Spark SQL 查询开发人员。我有一个复杂的要求,即根据连续/连续的日期对记录进行分组。

首先,我们需要根据strt_dtdschrg_dt 对同一患者(X5624)按ASC 顺序排列记录。下单后——

第一步
我们需要检查这些记录是否连续/连续 - 即它们是否有连续的日期。也就是说,上一条记录的dschrg_dt是否等于当前记录的strt_dt。如果是,则所有此类记录都应归入同一组。
第 2 步
如果上一条记录的dschrg_dt 和当前记录的strt_dt 之间有任何间隔 1 天或更多天,则该组结束,然后是组应该开始,之后出现的所有连续记录都将属于该新组,直到出现另一个间隙。
步骤 3: 对于每个这样的组(在上面的步骤 1 中标识),找到在 strt_dtdschrg_dt 之间具有最大跨度(以天为单位)的主记录(在该组内),然后将其 group-id 复制到所有剩余的该组中的记录。

输入:

GroupId | pat_id | clm_num  | clm_line |  strt_dt    | dschrg_dt
123-1   |  X5624 | 123      |   1      | 05-Jan-2019 | 07-Jan-2019 
629-3   |  X5624 | 629      |   3      | 07-Jan-2019 | 14-Jan-2019
918-2   |  X5624 | 918      |   2      | 14-Jan-2019 | 15-Jan-2019
307-1   |  X5624 | 307      |   1      | 08-May-2019 | 11-May-2019
478-3   |  X5624 | 478      |   3      | 11-May-2019 | 12-Jan 2019

预期输出:

Group Id | pat_id | clm_num | clm_line |  strt_dt    | dschrg_dt   | main_rec_ind
629-3    |  X5624 | 123     |   1      | 05-Jan-2019 | 07-Jan-2019 | N
629-3    |  X5624 | 629     |   3      | 07-Jan-2019 | 14-Jan-2019 | Y
629-3    |  X5624 | 918     |   2      | 14-Jan-2019 | 15-Jan-2019 | N
307-1    |  X5624 | 307     |   1      | 08-May-2019 | 11-May-2019 | Y
307-1    |  X5624 | 478     |   3      | 11-May-2019 | 12-Jan 2019 | N

请注意以下几点:

  • 所有记录都属于患者 X5624,整个记录集是 排序在strt_dt & dschrg_dt
  • 输入中的前 3 条记录是连续的,即前一条记录的 dschrg_dt 等于当前记录的strt_dt。因此,这 3 记录属于同一组。
  • 该组中的主要记录是2nd 记录,因为它的跨度为 strt_dtdschrg_dt 之间的天数是 14 天数,大于同一组中的所有其他记录。因此,它的组 ID '629-3' 被复制到另一个 它的组中的记录。同样,第二组的主记录是第 4 记录
  • 现在第 4 条记录与之前的记录(第 3 条记录)之间存在差距,因此与第 3 条记录不连续,因此将落入新的组中
  • 但是,第 5 条记录连续到第 4 条记录,因此也将落入第 2 组,并从第 4 条记录中获取其 GroupId (307-1)

谁能帮我在 Spark SQL 查询中实现这个?到目前为止,我只能识别连续记录,但无法将主记录的GroupId复制到该组中的其他记录。

select
groupid, pat_id, clm_num, clm_line, strt_dt, dschrg_dt,
case 
   when lag(dschrg_dt is null
   then 'true'
   when lag(dschrg_dt) over (partition by pat_id order by strt_dt asc, dschrg_dt desc) = strt_dt
   then 'true'
   else 'false'
end as tmp_grp
from pat_clms;  

我无法从这里继续。任何人都可以帮我在 Spark SQL 查询中实现这一点吗?感谢任何输入。如果需要,很乐意提供更多信息。非常感谢。

【问题讨论】:

  • 请添加您创建数据报表。

标签: sql apache-spark apache-spark-sql


【解决方案1】:

除了将truefalse 分配给tmp_grp 之外,您还可以在相邻时分配0,否则分配1。在此新列上创建累积总和将分配不同的组号。然后,您可以使用新实现的组确定每个组中具有最大跨度的记录(使用datediff 和一个案例表达式或when),然后使用max 为组内的所有行复制此组ID 作为窗口函数。

例如

SELECT
    *,
    CASE 
        WHEN DATEDIFF(dschrg_dt,strt_dt) = MAX(DATEDIFF(dschrg_dt,strt_dt)) OVER (
                 PARTITION BY pat_id,gn
             ) THEN GroupId
    END as main_rec_ind
FROM (
    SELECT 
       *,
       SUM(contiguous) OVER (
           PARTITION BY pat_id
           ORDER BY strt_dt asc, dschrg_dt desc
        ) as gn
    FROM (
        SELECT
            *,
            CASE
                WHEN LAG(dschrg_dt,1,strt_dt) OVER (
                         PARTITION BY pat_id
                         ORDER BY strt_dt asc, dschrg_dt desc
                     ) = strt_dt THEN 0
               ELSE 1
            END contiguous
        FROM
            pat_clms
    ) t1
) t2

输出:

+-------+------+-------+--------+----------+----------+----------+---+------------+
|GroupId|pat_id|clm_num|clm_line|strt_dt   |dschrg_dt |contiguous|gn |main_rec_ind|
+-------+------+-------+--------+----------+----------+----------+---+------------+
|123-1  |X5624 |123    |1       |2019-01-05|2019-01-07|0         |0  |null        |
|629-3  |X5624 |629    |3       |2019-01-07|2019-01-14|0         |0  |629-3       |
|918-2  |X5624 |918    |2       |2019-01-14|2019-01-15|0         |0  |null        |
|307-1  |X5624 |307    |1       |2019-05-08|2019-05-11|1         |1  |307-1       |
|478-3  |X5624 |478    |3       |2019-05-11|2019-01-12|0         |1  |null        |
+-------+------+-------+--------+----------+----------+----------+---+------------+

您想要的查询可能如下所示

SELECT
    GroupId, 
    pat_id, 
    clm_num, 
    clm_line, 
    strt_dt, 
    dschrg_dt,
    MAX(main_rec_ind) OVER (PARTITION BY pat_id,gn) main_rec_ind
FROM (
    SELECT
        *,
        CASE 
            WHEN DATEDIFF(dschrg_dt,strt_dt) = MAX(DATEDIFF(dschrg_dt,strt_dt)) OVER (
                     PARTITION BY pat_id,gn
                 ) THEN GroupId
        END as main_rec_ind
    FROM (
        SELECT 
            *,
            SUM(contiguous) OVER (
                PARTITION BY pat_id
                ORDER BY strt_dt asc, dschrg_dt desc
            ) as gn
        FROM (
            SELECT
                *,
                CASE
                    WHEN LAG(dschrg_dt,1,strt_dt) OVER (
                             PARTITION BY pat_id
                             ORDER BY strt_dt asc, dschrg_dt desc
                         ) = strt_dt THEN 0
                   ELSE 1
                END contiguous
            FROM
                pat_clms
        ) t1
    ) t2
) t3

输出:

+-------+------+-------+--------+----------+----------+------------+
|GroupId|pat_id|clm_num|clm_line|   strt_dt| dschrg_dt|main_rec_ind|
+-------+------+-------+--------+----------+----------+------------+
|  123-1| X5624|    123|       1|2019-01-05|2019-01-07|       629-3|
|  629-3| X5624|    629|       3|2019-01-07|2019-01-14|       629-3|
|  918-2| X5624|    918|       2|2019-01-14|2019-01-15|       629-3|
|  307-1| X5624|    307|       1|2019-05-08|2019-05-11|       307-1|
|  478-3| X5624|    478|       3|2019-05-11|2019-01-12|       307-1|
+-------+------+-------+--------+----------+----------+------------+

使用 pyspark api:

from pyspark.sql import functions as F

patient_window = Window.partitionBy("pat_id").orderBy("strt_dt",F.col("dschrg_dt").desc())
patient_group_window = Window.partitionBy("pat_id","gn")

output_df = (
    df.withColumn(
        "contiguous",
        F.when(
            F.lag("dschrg_dt",1,"strt_dt").over(patient_window)==F.col("strt_dt"),0
        ).otherwise(1)
    )
    .withColumn(
        "gn",
        F.sum("contiguous").over(patient_window)
    )
    .withColumn(
        "main_rec_ind",
        F.when(
            F.datediff("dschrg_dt","strt_dt")== F.max(F.datediff("dschrg_dt","strt_dt")).over(patient_group_window), 
            F.col("GroupId")
        )
    )
    .select(
        "GroupId", 
        "pat_id", 
        "clm_num", 
        "clm_line", 
        "strt_dt", 
        "dschrg_dt",
        F.max("main_rec_ind").over(patient_group_window).alias("main_rec_ind")
    )
)

使用 scala api:

patientWindow = Window.partitionBy("pat_id").orderBy("strt_dt",col("dschrg_dt").desc())
patientGroupWindow = Window.partitionBy("pat_id","gn")

output_df = 
    df.withColumn(
        "contiguous",
        when(
            lag("dschrg_dt",1,"strt_dt").over(patientWindow)==col("strt_dt"),0
        ).otherwise(1)
    )
    .withColumn(
        "gn",
        sum("contiguous").over(patientWindow)
    )
    .withColumn(
        "main_rec_ind",
        when(
            datediff("dschrg_dt","strt_dt")== max(datediff("dschrg_dt","strt_dt")).over(patientGroupWindow), 
            col("GroupId")
        )
    )
    .select(
        "GroupId", 
        "pat_id", 
        "clm_num", 
        "clm_line", 
        "strt_dt", 
        "dschrg_dt",
        max("main_rec_ind").over(patientGroupWindow).alias("main_rec_ind")
    )

让我知道这是否适合你。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-05-23
    • 1970-01-01
    • 1970-01-01
    • 2022-07-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多