【问题标题】:Spark dropDuplicates source codeSpark drop 复制源代码
【发布时间】:2018-06-20 12:26:35
【问题描述】:

我正在研究 Spark 源代码以了解 dropDuplicates 方法的工作原理。在方法定义中有一个方法Deduplicate 调用。但我找不到它的定义或参考。如果有人能指出我正确的方向,那就太好了。链接是here

【问题讨论】:

  • 如果我猜对了,你实现了一个有条件的减少“删除相似”,你可以发布你的实现吗?

标签: apache-spark apache-spark-sql open-source


【解决方案1】:

它在火花催化剂中,见here

由于实现有点混乱,我将添加一些解释。

Deduplicate 的当前实现是:

/** A logical plan for `dropDuplicates`. */
case class Deduplicate(
    keys: Seq[Attribute],
    child: LogicalPlan) extends UnaryNode {

  override def output: Seq[Attribute] = child.output
}

不清楚这里发生了什么,但是如果你看一下Optimizer 类,你会看到ReplaceDeduplicateWithAggregate 对象,然后它就会变得更加清晰。

/**
 * Replaces logical [[Deduplicate]] operator with an [[Aggregate]] operator.
 */
object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case Deduplicate(keys, child) if !child.isStreaming =>
      val keyExprIds = keys.map(_.exprId)
      val aggCols = child.output.map { attr =>
        if (keyExprIds.contains(attr.exprId)) {
          attr
        } else {
          Alias(new First(attr).toAggregateExpression(), attr.name)(attr.exprId)
        }
      }
      // SPARK-22951: Physical aggregate operators distinguishes global aggregation and grouping
      // aggregations by checking the number of grouping keys. The key difference here is that a
      // global aggregation always returns at least one row even if there are no input rows. Here
      // we append a literal when the grouping key list is empty so that the result aggregate
      // operator is properly treated as a grouping aggregation.
      val nonemptyKeys = if (keys.isEmpty) Literal(1) :: Nil else keys
      Aggregate(nonemptyKeys, aggCols, child)
  }
}

底线,df 与列 col1, col2, col3, col4

df.dropDuplicates("col1", "col2") 

或多或少

df.groupBy("col1", "col2").agg(first("col3"), first("col4"))

【讨论】:

  • 我只是下载了源代码并可以通过我的 IDE (IntelliJ) 浏览它们。
  • 奇怪。我也在 Intellij 中下载和查看,但在我的情况下,它并没有指向我那里。
猜你喜欢
  • 1970-01-01
  • 2016-05-30
  • 2012-03-29
  • 1970-01-01
  • 1970-01-01
  • 2018-05-06
  • 1970-01-01
  • 2013-10-16
  • 2019-01-06
相关资源
最近更新 更多