【问题标题】:Structured streaming custom deduplication结构化流式自定义重复数据删除
【发布时间】:2019-02-08 03:38:14
【问题描述】:

我有一个从 kafka 进入 dataFrame 的流数据。 我想删除基于 Id 的重复项,并根据时间戳保留最新记录。

样本数据如下:

Id  Name    count   timestamp
1   Vikas   20      2018-09-19T10:10:10
2   Vijay   50      2018-09-19T10:10:20
3   Vilas   30      2018-09-19T10:10:30
4   Vishal  10      2018-09-19T10:10:40
1   Vikas   50      2018-09-19T10:10:50
4   Vishal  40      2018-09-19T10:11:00
1   Vikas   10      2018-09-19T10:11:10
3   Vilas   20      2018-09-19T10:11:20

我期望的输出是:

Id  Name    count   timestamp
1   Vikas   10      2018-09-19T10:11:10
2   Vijay   50      2018-09-19T10:10:20
3   Vilas   20      2018-09-19T10:11:20
4   Vishal  40      2018-09-19T10:11:00

删除较旧的重复项,并根据时间戳字段仅保留最近的记录。

我正在为时间戳字段使用水印。 我曾尝试使用“df.removeDuplicate”,但它会保持旧记录完好无损,并且任何新记录都会被丢弃。

当前代码如下:

df = df.withWatermark("timestamp", "1 Day").dropDuplicates("Id", "timestamp")

我们如何实现自定义去重方法,以便我们可以将最新记录保留为唯一记录?

感谢任何帮助。

【问题讨论】:

  • 这里有什么解决办法吗?
  • @thebluephantom 现在我们已经到了 2020 年,您能找到任何解决方案吗?
  • 嗯,我正在处理 KAFKA 等方面的其他事情,但目前还没有 Spark、KAFKA 集成。可以解决其他问题。 @conetfun

标签: scala spark-structured-streaming


【解决方案1】:

在删除重复项之前先对时间戳列进行排序。

df.withWatermark("timestamp", "1 Day")
  .sort($"timestamp".desc)
  .dropDuplicates("Id", "timestamp")

【讨论】:

  • “流数据帧/数据集不支持排序,除非它在完整输出模式下聚合数据帧/数据集。”我们正在使用水印,所以在我们的例子中,它不是一个完整的输出模式。
猜你喜欢
  • 2023-03-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-12-22
  • 2021-04-21
  • 1970-01-01
  • 2015-09-12
相关资源
最近更新 更多