【问题标题】:Change Data Capture using Apache Spark使用 Apache Spark 进行更改数据捕获
【发布时间】:2019-09-29 10:48:38
【问题描述】:

使用 Apache Spark 解决问题的最佳方法是什么?

我的数据集如下-

ID, DATE,       TIME, VALUE
001,2019-01-01, 0010, 150
001,2019-01-01, 0020, 150
001,2019-01-01, 0030, 160
001,2019-01-01, 0040, 160
001,2019-01-01, 0050, 150
002,2019-01-01, 0010, 151
002,2019-01-01, 0020, 151
002,2019-01-01, 0030, 161
002,2019-01-01, 0040, 162
002,2019-01-01, 0051, 152

当每个 ID 的“VALUE”发生更改时,我需要保留这些行。

我的预期输出-

ID, DATE,       TIME, VALUE
001,2019-01-01, 0010, 150
001,2019-01-01, 0030, 160
001,2019-01-01, 0050, 150
002,2019-01-01, 0010, 151
002,2019-01-01, 0030, 161
002,2019-01-01, 0040, 162
002,2019-01-01, 0051, 152

【问题讨论】:

  • 聚合到id和values,根据时间选择第一个。

标签: apache-spark window-functions


【解决方案1】:

您可以在窗口中使用lag 函数:

val df = Seq(
  ("001", "2019-01-01", "0010", "150"),
  ("001", "2019-01-01", "0020", "150"),
  ("001", "2019-01-01", "0030", "160"),
  ("001", "2019-01-01", "0040", "160"),
  ("001", "2019-01-01", "0050", "150"),
  ("002", "2019-01-01", "0010", "151"),
  ("002", "2019-01-01", "0020", "151"),
  ("002", "2019-01-01", "0030", "161"),
  ("002", "2019-01-01", "0040", "162"),
  ("002", "2019-01-01", "0051", "152")
).toDF("ID", "DATE", "TIME", "VALUE")


df
  .withColumn("change",coalesce($"VALUE"=!=lag($"VALUE",1).over(Window.partitionBy($"ID").orderBy($"TIME")),lit(true)))
  .where($"change")
  //.drop($"change")
  .show()

给予:

+---+----------+----+-----+------+
| ID|      DATE|TIME|VALUE|change|
+---+----------+----+-----+------+
|001|2019-01-01|0010|  150|  true|
|001|2019-01-01|0030|  160|  true|
|001|2019-01-01|0050|  150|  true|
|002|2019-01-01|0010|  151|  true|
|002|2019-01-01|0030|  161|  true|
|002|2019-01-01|0040|  162|  true|
|002|2019-01-01|0051|  152|  true|
+---+----------+----+-----+------+

【讨论】:

    猜你喜欢
    • 2017-05-13
    • 1970-01-01
    • 2015-03-13
    • 2015-03-18
    • 2018-03-01
    • 2019-03-08
    • 1970-01-01
    • 2021-05-18
    • 2016-02-11
    相关资源
    最近更新 更多