【问题标题】:Compare two Spark dataframes比较两个 Spark 数据帧
【发布时间】:2018-01-15 03:17:18
【问题描述】:

Spark 数据帧 1 -:

+------+-------+---------+----+---+-------+
|city  |product|date     |sale|exp|wastage|
+------+-------+---------+----+---+-------+
|city 1|prod 1 |9/29/2017|358 |975|193    |
|city 1|prod 2 |8/25/2017|50  |687|201    |
|city 1|prod 3 |9/9/2017 |236 |431|169    |
|city 2|prod 1 |9/28/2017|358 |975|193    |
|city 2|prod 2 |8/24/2017|50  |687|201    |
|city 3|prod 3 |9/8/2017 |236 |431|169    |
+------+-------+---------+----+---+-------+

Spark 数据帧 2 -:

+------+-------+---------+----+---+-------+
|city  |product|date     |sale|exp|wastage|
+------+-------+---------+----+---+-------+
|city 1|prod 1 |9/29/2017|358 |975|193    |
|city 1|prod 2 |8/25/2017|50  |687|201    |
|city 1|prod 3 |9/9/2017 |230 |430|160    |
|city 1|prod 4 |9/27/2017|350 |90 |190    |
|city 2|prod 2 |8/24/2017|50  |687|201    |
|city 3|prod 3 |9/8/2017 |236 |431|169    |
|city 3|prod 4 |9/18/2017|230 |431|169    |
+------+-------+---------+----+---+-------+

请找出适用于上述火花数据帧 1 和火花数据帧 2 的以下条件的火花数据帧,

  1. 已删除记录
  2. 新纪录
  3. 没有变化的记录
  4. 变化记录

    这里的关键是“城市”、“产品”、“日期”。

我们需要不使用 Spark SQL 的解决方案。

【问题讨论】:

  • 这适用于任何偶然发现此问题并需要更多信息的人。我发现except 并不总是为我提供一切,所以我创建了一个库,该库的一部分是数据集比较github.com/AbsaOSS/hermes

标签: apache-spark apache-spark-sql


【解决方案1】:

我不确定是否找到删除和修改的记录,但您可以使用 except 函数来获取差异

df2.except(df1)

这将返回已在 dataframe2 中添加或修改的行或记录更改的行。 输出:

+------+-------+---------+----+---+-------+
|  city|product|     date|sale|exp|wastage|
+------+-------+---------+----+---+-------+
|city 3| prod 4|9/18/2017| 230|431|    169|
|city 1| prod 4|9/27/2017| 350| 90|    190|
|city 1| prod 3|9/9/2017 | 230|430|    160|
+------+-------+---------+----+---+-------+

您也可以尝试使用join和filter来获取更改和未更改的数据

df1.join(df2, Seq("city","product", "date"), "left").show(false)
df1.join(df2, Seq("city","product", "date"), "right").show(false)

希望这会有所帮助!

【讨论】:

  • 太好了,感谢分享解决方案 :)
  • 嗨 Shankar,请指导我解决 url stackoverflow.com/questions/45883039/… 中给出的问题
  • 除了被弃用了吗?现在我认为是 exceptAll()
  • 你需要做 df2 除了 df1 以及 df1 除了 df2 并检查两者是否为空
【解决方案2】:

一种可扩展且简单的方法是将两个DataFrames 与spark-extension 进行区分:

import uk.co.gresearch.spark.diff._

df1.diff(df2, "city", "product", "date").show

+----+------+-------+----------+---------+----------+--------+---------+------------+-------------+
|diff|  city|product|      date|left_sale|right_sale|left_exp|right_exp|left_wastage|right_wastage|
+----+------+-------+----------+---------+----------+--------+---------+------------+-------------+
|   N|city 1|prod 2 |2017-08-25|       50|        50|     687|      687|         201|          201|
|   C|city 1|prod 3 |2017-09-09|      236|       230|     431|      430|         169|          160|
|   I|city 3|prod 4 |2017-09-18|     null|       230|    null|      431|        null|          169|
|   N|city 3|prod 3 |2017-09-08|      236|       236|     431|      431|         169|          169|
|   D|city 2|prod 1 |2017-09-28|      358|      null|     975|     null|         193|         null|
|   I|city 1|prod 4 |2017-09-27|     null|       350|    null|       90|        null|          190|
|   N|city 1|prod 1 |2017-09-29|      358|       358|     975|      975|         193|          193|
|   N|city 2|prod 2 |2017-08-24|       50|        50|     687|      687|         201|          201|
+----+------+-------+----------+---------+----------+--------+---------+------------+-------------+

它识别I插入、C挂起、D删除和uN更改的行。 p>

【讨论】:

  • 如何用pip安装?
【解决方案3】:

查看 MegaSparkDiff,它是 GitHub 上的一个开源项目,可帮助比较数据帧。该项目尚未在 maven Central 中发布,但您可以查看比较 2 个数据帧的 SparkCompare scala 类

下面的代码 sn-p 将为您提供 2 个数据帧,一个具有 inLeftButNotInRight 的行,另一个具有 InRightButNotInLeft 的行。

如果您在两者之间进行 JOIN,那么您可以应用一些逻辑来识别丢失的主键(如果可能),然后这些键将构成已删除的记录。

我们正在努力添加您在项目中寻找的用例。 https://github.com/FINRAOS/MegaSparkDiff

https://github.com/FINRAOS/MegaSparkDiff/blob/master/src/main/scala/org/finra/msd/sparkcompare/SparkCompare.scala

private def compareSchemaDataFrames(left: DataFrame , leftViewName: String
                              , right: DataFrame , rightViewName: String) :Pair[DataFrame, DataFrame] = {
    //make sure that column names match in both dataFrames
    if (!left.columns.sameElements(right.columns))
      {
        println("column names were different")
        throw new Exception("Column Names Did Not Match")
      }

    val leftCols = left.columns.mkString(",")
    val rightCols = right.columns.mkString(",")

    //group by all columns in both data frames
    val groupedLeft = left.sqlContext.sql("select " + leftCols + " , count(*) as recordRepeatCount from " +  leftViewName + " group by " + leftCols )
    val groupedRight = left.sqlContext.sql("select " + rightCols + " , count(*) as recordRepeatCount from " +  rightViewName + " group by " + rightCols )

    //do the except/subtract command
    val inLnotinR = groupedLeft.except(groupedRight).toDF()
    val inRnotinL = groupedRight.except(groupedLeft).toDF()

    return new ImmutablePair[DataFrame, DataFrame](inLnotinR, inRnotinL)
  }

【讨论】:

  • 这还在积极开发中吗?它与 DataComPy 相比如何?
【解决方案4】:

请参阅下面的实用函数,我使用以下标准比较两个数据帧

  1. 列长
  2. 记录数
  3. 逐列比较所有记录

任务三是通过使用记录中所有列的连接哈希来完成的。

def verifyMatchAndSaveSignatureDifferences(oldDF: DataFrame, newDF: DataFrame, pkColumn: String) : Long = {
  assert(oldDF.columns.length == newDF.columns.length, s"column lengths don't match")
  assert(oldDF.count == newDF.count, s"record count don't match")

  def createHashColumn(df: DataFrame) : Column = {
     val colArr = df.columns
     md5(concat_ws("", (colArr.map(col(_))) : _*))
  }

  val newSigDF = newDF.select(col(pkColumn), createHashColumn(newDF).as("signature_new"))
  val oldSigDF = oldDF.select(col(pkColumn), createHashColumn(oldDF).as("signature"))

  val joinDF = newSigDF.join(oldSigDF, newSigDF("pkColumn") === oldSigDF("pkColumn")).where($"signature" !== $"signature_new").cache

  val diff = joinDF.count
  //write out any recorsd that don't match
  if (diff > 0)
     joinDF.write.saveAsTable("signature_table")

  joinDF.unpersist()

  diff
}

如果该方法返回 0,则两个数据帧在其他所有内容中完全相同,在 hive 的默认架构中名为 signature_table 的表将包含两者中不同的所有记录。

希望这会有所帮助。

【讨论】:

    【解决方案5】:

    Spark 版本:2.2.0

    同时使用 except 和 left anti join

    df2.except(df1) 会像:

    city product date sale exp wastage
    city 3 prod 4 9/18/2017 230 431 169
    city 1 prod 4 9/27/2017 350 90 190
    city 1 prod 3 9/9/2017 230 430 160

    正如 koiralo 所说,但删除的项目 'city 2 prod 1' 丢失了,所以我们需要左反连接(或带过滤器的左连接):

    select * from df1 left anti join df2 on df1.city=df2.city and df1.product=df2.product
    

    然后联合df2.except(df1)和左反加入的结果

    但是我没有在大数据集上测试左反连接的性能

    PS:如果你的 spark 版本超过 2.4,使用 spark-extension 会更容易

    【讨论】:

      【解决方案6】:

      使用 Spark 不同的连接类型似乎是计算行上的删除、添加和更新的关键。

      这个问题说明了不同类型的连接,具体取决于您要实现的目标: What are the various join types in Spark?

      【讨论】:

        【解决方案7】:

        假设我们有两个DataFrames,z1 和 z1。选项 1 适用于 没有 重复的行。你可以在spark-shell试试这些。

        • 选项 1:直接执行除外
        
        val inZ1NotInZ2 = z1.except(z2).toDF()
        val inZ2NotInZ1 = z2.except(z1).toDF()
        
        inZ1NotInZ2.show
        inZ2NotInZ1.show
        
        • 选项 2:使用 GroupBy(对于具有重复行的 DataFrame)
        val z1Grouped = z1.groupBy(z1.columns.map(c => z1(c)).toSeq : _*).count().withColumnRenamed("count", "recordRepeatCount")
        val z2Grouped = z2.groupBy(z2.columns.map(c => z2(c)).toSeq : _*).count().withColumnRenamed("count", "recordRepeatCount")
        
        val inZ1NotInZ2 = z1Grouped.except(z2Grouped).toDF()
        val inZ2NotInZ1 = z2Grouped.except(z1Grouped).toDF()
        
        
        • 选项 3,使用 exceptAll,它也适用于具有重复行的数据
        // Source Code: https://github.com/apache/spark/blob/50538600ec/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2029
        val inZ1NotInZ2 = z1.exceptAll(z2).toDF()
        val inZ2NotInZ1 = z2.exceptAll(z1).toDF()
        
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2019-06-02
          • 2019-07-21
          • 2020-06-07
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多