【问题标题】:Dropping duplicate records based using window function in spark scala在 spark scala 中使用窗口函数删除重复记录
【发布时间】:2018-05-11 20:11:40
【问题描述】:

或者只是为了让这个简单的理解 我有一个数据框。

DataPartition   TimeStamp   OrganizationID  SourceID    AuditorID   AuditorEnumerationId    AuditorOpinionCode  AuditorOpinionId    IsPlayingAuditorRole    IsPlayingCSRAuditorRole IsPlayingTaxAdvisorRole FFAction|!| AuditorOpinionOnInternalControlCode AuditorOpinionOnGoingConcernCode    AuditorOpinionOnInternalControlsId  AuditorOpinionOnGoingConcernId  rank
Japan   2018-05-03T09:52:48+00:00   4295876589  194 null    null    null    null    null    null    null    O|!|    null    null    null    null    1
Japan   2018-05-03T09:52:48+00:00   4295876589  194 2719    3023331 AOP 3010542 true    false   true    O|!|    null    null    null    null    1
Japan   2018-05-03T09:52:48+00:00   4295876589  195 16157   1002485247  UWE 3010547 true    false   false   O|!|    null    null    null    null    1
Japan   2018-05-03T07:36:47+00:00   4295876589  196 3252    3024053 ONC 3020538 true    false   true    O|!|    null    null    null    null    1
Japan   2018-05-03T07:36:47+00:00   4295876589  195 5937    3026578 NOP 3010543 true    false   true    O|!|    null    null    null    null    1
Japan   2018-05-02T10:37:50+00:00   4295876589  156 null    null    null    null    null    null    null    O|!|    null    null    null    null    1
Japan   2018-05-02T10:37:50+00:00   4295876589  157 null    null    null    null    null    null    null    O|!|    null    null    null    null    1
Japan   2018-05-02T10:37:56+00:00   4295876589  193 null    null    null    null    null    null    null    O|!|    null    null    null    null    1
Japan   2018-05-03T08:10:19+00:00   4295876589  196 null    null    null    null    null    null    null    D|!|    null    null    null    null    1
Japan   2018-05-03T09:52:48+00:00   4295876589  195 null    null    null    null    null    null    null    O|!|    null    null    null    null    1

现在我需要选择具有 Rank =1 和 AuditorID!=null 但 AuditorID =!=null 的列仅适用于 FFAction|!|="O"。

在这种情况下,我的输出数据框应该如下所示

DataPartition   TimeStamp   OrganizationID  SourceID    AuditorID   AuditorEnumerationId    AuditorOpinionCode  AuditorOpinionId    IsPlayingAuditorRole    IsPlayingCSRAuditorRole IsPlayingTaxAdvisorRole FFAction|!| AuditorOpinionOnInternalControlCode AuditorOpinionOnGoingConcernCode    AuditorOpinionOnInternalControlsId  AuditorOpinionOnGoingConcernId  rank

Japan   2018-05-03T09:52:48+00:00   4295876589  194 2719    3023331 AOP 3010542 true    false   true    O|!|    null    null    null    null    1
Japan   2018-05-03T09:52:48+00:00   4295876589  195 16157   1002485247  UWE 3010547 true    false   false   O|!|    null    null    null    null    1
Japan   2018-05-03T07:36:47+00:00   4295876589  196 3252    3024053 ONC 3020538 true    false   true    O|!|    null    null    null    null    1
Japan   2018-05-03T07:36:47+00:00   4295876589  195 5937    3026578 NOP 3010543 true    false   true    O|!|    null    null    null    null    1
Japan   2018-05-02T10:37:56+00:00   4295876589  193 null    null    null    null    null    null    null    I|!|    null    null    null    null    1
Japan   2018-05-03T08:10:19+00:00   4295876589  196 null    null    null    null    null    null    null    D|!|    null    null    null    null    1

这是我的代码

import org.apache.spark.sql.expressions._
    val windowSpec = Window.partitionBy("OrganizationID", "SourceID", "AuditorID").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
    val latestForEachKey1 = finaldf.withColumn("rank", row_number().over(windowSpec))
    .filter($"rank" === 1 && $"AuditorID" =!= "null")

场景 2 ...

这是我的数据框

uniqueFundamentalSet    PeriodId    SourceId    StatementTypeCode   StatementCurrencyId UpdateReason_updateReasonId UpdateReasonComment UpdateReasonComment_languageId  UpdateReasonEnumerationId   FFAction|!| DataPartition   PartitionYear   TimeStamp
192730230775    297 182 INC 500186  6   UpdateReasonToDelete    505074  3019685 I|!|    Japan   2017    2018-05-10T09:57:29+00:00
192730230775    297 182 INC 500186  6   UpdateReasonToDelete    505074  3019685 I|!|    Japan   2017    2018-05-10T10:00:40+00:00
192730230775    297 182 INC 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:11:15+00:00
192730230775    310 182 INC 500186  null    null    null    null    O|!|    Japan   2018    2018-05-10T08:30:53+00:00

当我在答案中应用建议的代码时,我得到以下输出

val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "UpdateReason_updateReasonId").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
    val latestForEachKey1 = tempReorder.withColumn("rank", row_number().over(windowSpec))
      .filter($"rank" === 1 && (($"UpdateReason_updateReasonId" =!= "null" && $"FFAction|!|" === "O|!|") || $"FFAction|!|" =!= "O|!|")).drop("rank")

192730230775    297 182 INC 500186  6   UpdateReasonToDelete    505074  3019685 I|!|    Japan   2017    2018-05-10T10:00:40+00:00

但我的预期输出是这样的。

192730230775    297 182 INC 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:11:15+00:00

【问题讨论】:

  • 您只想检查 AuditorID?
  • @RameshMaharjan 是,如果 DataPartition|OrganizationID|SourceID 此列匹配
  • 然后将该列按顺序包含在窗口函数中,但按降序排列
  • @RameshMaharjan 让我试试 ..我的意思是我必须得到最新的,以防我只基于两列删除和覆盖,在这种情况下,不应添加 AuditorId

标签: scala apache-spark apache-spark-sql spark-dataframe


【解决方案1】:

这是你的工作代码

import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("OrganizationID", "SourceID", "AuditorID").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
val latestForEachKey1 = finaldf.withColumn("rank", row_number().over(windowSpec))
  .filter($"rank" === 1 && (($"AuditorID" =!= "null" && $"FFAction|!|" === "O|!|") || $"FFAction|!|" =!= "O|!|"))

这应该给你

+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+----+
|DataPartition|TimeStamp                |OrganizationID|SourceID|AuditorID|AuditorEnumerationId|AuditorOpinionCode|AuditorOpinionId|IsPlayingAuditorRole|IsPlayingCSRAuditorRole|IsPlayingTaxAdvisorRole|FFAction|!||AuditorOpinionOnInternalControlCode|AuditorOpinionOnGoingConcernCode|AuditorOpinionOnInternalControlsId|AuditorOpinionOnGoingConcernId|rank|
+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+----+
|Japan        |2018-05-03T09:52:48+00:00|4295876589    |194     |2719     |3023331             |AOP               |3010542         |true                |false                  |true                   |O|!|       |null                               |null                            |null                              |null                          |1   |
|Japan        |2018-05-03T09:52:48+00:00|4295876589    |195     |16157    |1002485247          |UWE               |3010547         |true                |false                  |false                  |O|!|       |null                               |null                            |null                              |null                          |1   |
|Japan        |2018-05-03T07:36:47+00:00|4295876589    |196     |3252     |3024053             |ONC               |3020538         |true                |false                  |true                   |O|!|       |null                               |null                            |null                              |null                          |1   |
|Japan        |2018-05-03T07:36:47+00:00|4295876589    |195     |5937     |3026578             |NOP               |3010543         |true                |false                  |true                   |O|!|       |null                               |null                            |null                              |null                          |1   |
|Japan        |2018-05-03T08:10:19+00:00|4295876589    |196     |null     |null                |null              |null            |null                |null                   |null                   |D|!|       |null                               |null                            |null                              |null                          |1   |
+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+----+

注意:sourceID 为 193 的记录有 o|!|和 null 所以它不应该在输出中

【讨论】:

  • 您好先生...您能看一个场景吗...这里失败了...因为我属于 Action 类型..
  • 我会向您解释问题先生..我不会创建另一个问题,因为人们会将其标记为重复...问题是即使 182 UpdateReason_updateReasonId 具有空值但它具有最大的时间戳..所以我实际上需要具有最新时间戳的那个..如果时间戳相同,那么我需要选择一个具有上述值的空值和|O|..我希望我进行降神会..这就是我试图解释的内容第一个场景..
  • @RameshMaharjan 是的,就是这样..所以第一选择是最新的时间戳..如果时间戳相同,那么我们需要考虑没有 UpdateReason_updateReasonId 为 null 和 O..
  • @SUDARSHAN 原因是您在 partitionBy 中包含了UpdateReason_updateReasonId,这将前三行分为两组。因此具有 null 和最新日期的记录永远不会在同一组中。你所要做的就是从 partitionBy 中删除 UpdateReason_updateReasonId ,你应该没问题
  • @RameshMaharjan 在这种情况下,如果UpdateReason_updateReasonId 不同,并且除了所有列都相同,那么其他记录将被丢弃..我的主键是列的组合,直到UpdateReason_updateReasonId 来自OrgnizationId ..
【解决方案2】:

您可以使用 rownum udf 删除重复项并检查 rownum =1 且 authorid 不为空

【讨论】:

  • 不,它不会一直工作。假设如果我只得到一行,那么排名将为 1,审核员 ID 将为空..在这种情况下,我想保留该行..
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-07-10
相关资源
最近更新 更多