【问题标题】:How does Apache Spark detect duplicates? Can it be modified?Apache Spark 如何检测重复项?可以修改吗?
【发布时间】:2017-08-24 14:40:53
【问题描述】:

Apache Spark 如何检测重复行?

我问的原因是我想有一些不同的行为:

在用于重复检测的列集中,对于其中一些(类型为double),我希望重复检测基于两个值之间的差异低于某个阈值(由我)。

我想这可以使用crossJoin() 和适当的where 语句之后,但是,我希望有一个更优雅的解决方案?

谢谢!

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql pyspark-sql


    【解决方案1】:

    它使用HashArggregate:

    scala> df.distinct.explain
    == Physical Plan ==
    *HashAggregate(keys=[x#12], functions=[])
    +- Exchange hashpartitioning(x#12, 200)
       +- *HashAggregate(keys=[x#12], functions=[])
          +- LocalTableScan [x#12]
    

    我希望有一个更优雅的解决方案?

    您可以尝试 LSH 运算符提供的近似连接:

    但它不太可能使用单一功能。

    您可以对窗口函数使用类似会话的方法,但这仅在您可以将数据划分为分区时才有用。如果您对近似值满意,您可以使用固定大小范围,然后应用我在Spark - Window with recursion? - Conditionally propagating values across rows中描述的方法@

    sort 后跟mapPartitions 可以实现另一个近似值。

    df.sortBy("someColumn").rdd.mapPartitions(drop_duplicates).toDF()
    

    dropDuplicates 的实现方式类似于:

    def drop_duplicates(xs):
        prev = None
        for x in xs:
            if prev is None or abs(x - prev) > threshold:
                yield x
            prev = x   
    

    通过一些努力,您也可以使其在分区边界上保持一致。

    【讨论】:

    • 感谢您提供有趣而有趣的指点。问题是:我正在尝试获得 exact 解决方案。对于近似解决方案,我可以根据阈值进行乘法/除法/舍入,然后完成。顺便说一句:我确实有一个groupID 列,可用于Window.partitionBy('groupID')
    猜你喜欢
    • 2020-07-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-12-03
    • 2011-11-17
    • 2022-01-14
    • 2018-05-24
    • 1970-01-01
    相关资源
    最近更新 更多