【问题标题】:Spark Join based on nearest DateSpark Join 基于最近的日期
【发布时间】:2021-12-07 04:08:13
【问题描述】:

我有 2 张镶木地板桌。简化架构如下:

case class Product(SerialNumber:Integer,
                   UniqueKey:String,
                   ValidityDate1:Date
                   )
                   
case class ExceptionEvents(SerialNumber:Integer,
                      ExceptionId:String,
                      ValidityDate2:Date
                     )
                

Product Dataframe 可以包含以下条目,例如:

产品:

-----------------------------------------
SerialNumber    UniqueKey   ValidityDate1
-----------------------------------------
10001           Key_1       01/10/2021
10001           Key_2       05/10/2021
10001           Key_3       10/10/2021
10002           Key_4       02/10/2021
10003           Key_5       07/10/2021
-----------------------------------------

异常事件:

-----------------------------------------
SerialNumber    ExceptionId     ValidityDate2
-----------------------------------------
10001           ExcId_1         02/10/2021
10001           ExcId_2         05/10/2021
10001           ExcId_3         07/10/2021
10001           ExcId_4         11/10/2021
10001           ExcId_5         15/10/2021
-----------------------------------------

我想加入 2 个 DF,以使 SerialNumbers 匹配并且应映射 ValidityDate,以使 ExceptionEvent 的 ValidityDate2 大于 Product 的 ValidityDate1,但 2 个日期应尽可能接近。 例如,生成的 DF 应如下所示:

---------------------------------------------------------------------
SerialNumber    ExceptionId     UniqueKey       ValidityDate2
---------------------------------------------------------------------
10001           ExcId_1         Key_1           02/10/2021
10001           ExcId_2         Key_2           05/10/2021
10001           ExcId_3         Key_2           07/10/2021
10001           ExcId_4         Key_3           11/10/2021
10001           ExcId_5         Key_3           15/10/2021
---------------------------------------------------------------------

知道应该如何使用 scala 和 spark Dataframe API 完成查询吗?

【问题讨论】:

    标签: scala dataframe apache-spark join apache-spark-sql


    【解决方案1】:

    以下解决方案对我来说很好:

    val dfp1 = List(("1001", "Key1", "01/10/2021"), ("1001", "Key2", "05/10/2021"), ("1001", "Key3", "10/10/2021"),  ("1002", "Key4", "02/10/2021")).toDF("SerialNumber", "UniqueKey", "Date1")
    
    val dfProduct = dfp1.withColumn("Date1", to_date($"Date1","dd/MM/yyyy"))
    
    val dfe1 = List(("1001", "ExcId1", "02/10/2021"), ("1001", "ExcId2", "05/10/2021"), ("1001", "ExcId3", "07/10/2021"),  ("1001", "ExcId4", "11/10/2021"),  ("1001", "ExcId5", "15/10/2021")).toDF("SerialNumber", "ExceptionId", "Date2")
    
    val dfExceptions = dfe1.withColumn("Date2", to_date($"Date2","dd/MM/yyyy"))
    
    val exceptionStat2 = dfExceptions.as("fact").join(dfProduct.as("dim"), Seq("SerialNumber")).select($"fact.*", $"dim.UniqueKey", datediff($"fact.Date2", $"dim.Date1").as("DiffDate")).where($"DiffDate" >= 0)
    
    import org.apache.spark.sql.expressions.Window
    
    val exceptionStat3 = exceptionStat2.withColumn("rank",  rank.over(Window.partitionBy($"SerialNumber", $"ExceptionId").orderBy($"DiffDate")) )
                            .where($"rank" === 1)
                            .select( $"SerialNumber", $"ExceptionId", $"UniqueKey", $"Date2", $"DiffDate", $"rank" )
                            .orderBy($"SerialNumber", $"Date2")
    

    【讨论】:

      猜你喜欢
      • 2017-03-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-05-31
      相关资源
      最近更新 更多