【问题标题】:scala spark - matching dataframes based on variable datesscala spark - 基于可变日期匹配数据帧
【发布时间】:2019-05-26 13:11:04
【问题描述】:

我正在尝试根据可变日期窗口匹配两个数据框。我不只是试图获得一个完全匹配,我的代码实现了,而是在一个可变的日期窗口内获得所有可能的候选人。

我能够用我的代码精确匹配日期。

但我想知道这些记录是否仍然可以匹配,因为它们可能会在任何一方休息几天,但仍然足够合理加入。

我尝试在 spark 中寻找类似于 python 的 pd.to_timedelta('1 day') 的东西来添加到过滤器中,但可惜没有运气。

这是我当前的代码,它与 ID 列上的数据帧匹配,然后运行过滤器以确保第二个数据帧中的 from_date 之间start_date 和第一个数据帧的 end_date

我需要的不是精确的日期匹配,而是能够匹配实际日期的一两天(任一侧)之间的记录。

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().getOrCreate()

val df1 = spark.read.option("header","true")
               .option("inferSchema","true").csv("../data/df1.csv")

val df2 = spark.read.option("header","true")
               .option("inferSchema","true")
               .csv("../data/df2.csv")

val df = df2.join(df1,
                      (df1("ID") === df2("ID")) &&
                      (df2("from_date") >= df1("start_date")) &&
                      (df2("from_date") <= df1("end_date")),"left")
            .select(df1("ID"), df1("start_date"), df1("end_date"), 
                                                  $"from_date", $"to_date")

df.coalesce(1).write.format("com.databricks.spark.csv")
  .option("header", "true").save("../mydata.csv")

基本上我希望能够编辑此日期窗口以增加或减少实际匹配的数据。

非常感谢您的意见。我是 spark/scala 的新手,但必须说到目前为止我很喜欢它……比 python 快得多(也更干净)!

干杯

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    您可以在join 条件中将date_adddate_sub 应用于start_date/end_date,如下所示:

    import org.apache.spark.sql.functions._
    import java.sql.Date
    
    val df1 = Seq(
      (1, Date.valueOf("2018-12-01"), Date.valueOf("2018-12-05")),
      (2, Date.valueOf("2018-12-01"), Date.valueOf("2018-12-06")),
      (3, Date.valueOf("2018-12-01"), Date.valueOf("2018-12-07"))
    ).toDF("ID", "start_date", "end_date")
    
    val df2 = Seq(
      (1, Date.valueOf("2018-11-30")),
      (2, Date.valueOf("2018-12-08")),
      (3, Date.valueOf("2018-12-08"))
    ).toDF("ID", "from_date")
    
    val deltaDays = 1
    
    df2.join( df1,
      df1("ID") === df2("ID") &&
      df2("from_date") >= date_sub(df1("start_date"), deltaDays) &&
      df2("from_date") <= date_add(df1("end_date"), deltaDays),
      "left_outer"
    ).show
    // +---+----------+----+----------+----------+
    // | ID| from_date|  ID|start_date|  end_date|
    // +---+----------+----+----------+----------+
    // |  1|2018-11-30|   1|2018-12-01|2018-12-05|
    // |  2|2018-12-08|null|      null|      null|
    // |  3|2018-12-08|   3|2018-12-01|2018-12-07|
    // +---+----------+----+----------+----------+
    

    【讨论】:

      【解决方案2】:

      您也可以使用 datediff() 函数获得相同的结果。看看这个:

      scala> val df1 = Seq((1,  "2018-12-01", "2018-12-05"),(2,  "2018-12-01", "2018-12-06"),(3,  "2018-12-01", "2018-12-07")).toDF("ID", "start_date", "end_date").withColumn("start_date",'start_date.cast("date")).withColumn("end_date",'end_date.cast("date"))
      df1: org.apache.spark.sql.DataFrame = [ID: int, start_date: date ... 1 more field]
      
      scala> val df2 = Seq((1,  "2018-11-30"), (2,  "2018-12-08"),(3,  "2018-12-08")).toDF("ID", "from_date").withColumn("from_date",'from_date.cast("date"))
      df2: org.apache.spark.sql.DataFrame = [ID: int, from_date: date]
      
      scala> val delta = 1;
      delta: Int = 1
      
      scala> df2.join(df1,df1("ID") === df2("ID") && datediff('from_date,'start_date) >= -delta && datediff('from_date,'end_date)<=delta, "leftOuter").show(false)
      +---+----------+----+----------+----------+
      |ID |from_date |ID  |start_date|end_date  |
      +---+----------+----+----------+----------+
      |1  |2018-11-30|1   |2018-12-01|2018-12-05|
      |2  |2018-12-08|null|null      |null      |
      |3  |2018-12-08|3   |2018-12-01|2018-12-07|
      +---+----------+----+----------+----------+
      
      
      scala>
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-03-25
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多