【问题标题】:Implement SCD Type 2 in Spark在 Spark 中实现 SCD 类型 2
【发布时间】:2020-04-22 11:56:18
【问题描述】:

尝试在 Spark 2.4.4 中实现 SCD Type 2 逻辑。我有两个数据框;一个包含“现有数据”,另一个包含“新传入数据”。

输入和预期输出如下所示。需要发生的是:

  1. 所有传入的行都应该附加到现有数据中。

  2. 只有以下 3 行之前是“活动的”才应该变为非活动的,并填充如下适当的“结束日期”:

    pk=1, amount = 20 => Row 应该变成'inactive' & 'endDate' 是下一行(Lead)的'startDate'

    pk=2, amount = 100 => Row 应该变成'inactive' & 'endDate' 是下一行(Lead)的'startDate'

    pk=3, amount = 750 => Row 应该变成'inactive' & 'endDate' 是下一行(Lead)的'startDate'

如何在 Spark 中执行此操作?

现有数据:

+---+------+-------------------+-------------------+------+
| pk|amount|          startDate|            endDate|active|
+---+------+-------------------+-------------------+------+
|  1|    10|2019-01-01 12:00:00|2019-01-20 05:00:00|     0|
|  1|    20|2019-01-20 05:00:00|               null|     1|
|  2|   100|2019-01-01 00:00:00|               null|     1|
|  3|    75|2019-01-01 06:00:00|2019-01-26 08:00:00|     0|
|  3|   750|2019-01-26 08:00:00|               null|     1|
| 10|    40|2019-01-01 00:00:00|               null|     1|
+---+------+-------------------+-------------------+------+

新传入数据:

+---+------+-------------------+-------------------+------+
| pk|amount|          startDate|            endDate|active|
+---+------+-------------------+-------------------+------+
|  1|    50|2019-02-01 07:00:00|2019-02-02 08:00:00|     0|
|  1|    75|2019-02-02 08:00:00|               null|     1|
|  2|   200|2019-02-01 05:00:00|2019-02-01 13:00:00|     0|
|  2|    60|2019-02-01 13:00:00|2019-02-01 19:00:00|     0|
|  2|   500|2019-02-01 19:00:00|               null|     1|
|  3|   175|2019-02-01 00:00:00|               null|     1|
|  4|    50|2019-02-02 12:00:00|2019-02-02 14:00:00|     0|
|  4|   300|2019-02-02 14:00:00|               null|     1|
|  5|   500|2019-02-02 00:00:00|               null|     1|
+---+------+-------------------+-------------------+------+

预期输出:

+---+------+-------------------+-------------------+------+
| pk|amount|          startDate|            endDate|active|
+---+------+-------------------+-------------------+------+
|  1|    10|2019-01-01 12:00:00|2019-01-20 05:00:00|     0|
|  1|    20|2019-01-20 05:00:00|2019-02-01 07:00:00|     0|
|  1|    50|2019-02-01 07:00:00|2019-02-02 08:00:00|     0|
|  1|    75|2019-02-02 08:00:00|               null|     1|
|  2|   100|2019-01-01 00:00:00|2019-02-01 05:00:00|     0|
|  2|   200|2019-02-01 05:00:00|2019-02-01 13:00:00|     0|
|  2|    60|2019-02-01 13:00:00|2019-02-01 19:00:00|     0|
|  2|   500|2019-02-01 19:00:00|               null|     1|
|  3|    75|2019-01-01 06:00:00|2019-01-26 08:00:00|     0|
|  3|   750|2019-01-26 08:00:00|2019-02-01 00:00:00|     1|
|  3|   175|2019-02-01 00:00:00|               null|     1|
|  4|    50|2019-02-02 12:00:00|2019-02-02 14:00:00|     0|
|  4|   300|2019-02-02 14:00:00|               null|     1|
|  5|   500|2019-02-02 00:00:00|               null|     1|
| 10|    40|2019-01-01 00:00:00|               null|     1|
+---+------+-------------------+-------------------+------+

【问题讨论】:

  • 想知道为什么我在这个问题上投了反对票 (-1)。有人可以解释一下吗?
  • 请参阅 stackoverflow.com/questions/54058951/… 我回答问题的地方。使用 KUDU 效果更好,但我们通过重新声明来做到这一点,并且可以以各种方式压缩小文件问题。

标签: java scala apache-spark apache-spark-sql


【解决方案1】:

您可以首先从新 DataFrame 中为每个组 pk 选择第一个 startDate,然后加入旧的 startDate 以更新所需的列。 然后,您可以合并所有的连接结果和新的 DataFrame。

类似这样的:

// get first state by date for each pk group
val w = Window.partitionBy($"pk").orderBy($"startDate")
val updates = df_new.withColumn("rn", row_number.over(w)).filter("rn = 1").select($"pk", $"startDate")

// join with old data and update old values when there is match
val joinOldNew = df_old.join(updates.alias("new"), Seq("pk"), "left")
                       .withColumn("endDate", when($"endDate".isNull && $"active" === lit(1) && $"new.startDate".isNotNull, $"new.startDate").otherwise($"endDate"))
                       .withColumn("active", when($"endDate".isNull && $"active" === lit(1) && $"new.startDate".isNotNull, lit(0)).otherwise($"active"))
                       .drop($"new.startDate")

// union all
val result = joinOldNew.unionAll(df_new) 

【讨论】:

  • 这看起来很有希望,但我收到了这条消息:“otherwise() 只能应用于之前由 when() 生成的列”。我有这个... .withColumn("endDate", when(col("endDate").isNull && col("active") === lit(1) && col("new.startDate").isNotNull, col ("new.startDate").otherwise(col("endDate"))))
  • 你缺少一些括号......因为你在col("new.startDate")而不是when列上调用.otherwise
  • 我的错。修复了丢失的括号。它正在工作,但我更改了“活动”列的逻辑如下: .withColumn("active", when(col("endDate").isNull, lit(1)) .otherwise(lit(0))) 'endDate' 的 'withColumn' 似乎改变了该列的值,所以当控件来到 'withColumn' 时,'active' endDate 已经改变了!
【解决方案2】:
  1. 联合 2 数据帧
  2. groupByKey上pk
  3. mapGroups 将提供键元组和行迭代器。
  4. 在每个组上,对行进行排序、遍历所有行、关闭所需的记录并保留所需的行。
   val df = //read your df containing 
   df.groupByKey( row => (row.getAs[String]("pk")))
     .mapGroups( case (key, rows) => 
     // apply all logic you need to apply per PK. 
     //sort rows by date, survive the latest, close the old )

【讨论】:

  • 这种方法的问题是未更新的“组”(例如 pk = 10)将被不必要地处理。当“现有”数据很大(超过 1 亿行)时,这将产生重大影响。在“Union”之后,在这种情况下我们真正需要做的就是更新 3 行!需要更好的方法。谢谢。
  • 我以为我正在让你开始,然后你可以从那里调整。如果数据集很大,那么您将它们加入 pk 并根据活动和日期比较进行过滤。然后将结果与不需要处理的其他部分合并
【解决方案3】:

感谢@blackbishop 建议的回答,我能够让它工作。这是工作版本(以防有人感兴趣):

    // get first state by date for each pk group
    val w = Window.partitionBy("pk").orderBy("startDate")
    val updates = dfNew.withColumn("rn", row_number.over(w)).filter("rn = 1").select("pk", "startDate")

    // join with old data and update old values when there is match
    val joinOldNew = dfOld.join(updates.alias("new"), Seq("pk"), "left")
        .withColumn("endDate", when(col("endDate").isNull
            && col("active") === lit(1) && col("new.startDate").isNotNull,
            col("new.startDate")).otherwise(col("endDate")))
        .withColumn("active", when(col("endDate").isNull, lit(1))
            .otherwise(lit(0)))
        .drop(col("new.startDate"))


    // union all (Order By is not necessary! Added to facilitate testing.)
    val results = joinOldNew.union(dfNew).orderBy(col("pk"), col("startDate"))

【讨论】:

    【解决方案4】:

    如果有人正在为上述问题寻找 Delta Lake 选项,请参考下面链接的代码。

    https://github.com/EndrisKerga/Spark-Slow-Changing-Dimensions-Type-2-Demo/blob/master/SCDType2.scala

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-11-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-10-25
      • 2020-05-08
      • 1970-01-01
      相关资源
      最近更新 更多