【问题标题】:How to Compare columns of two tables using Spark?如何使用 Spark 比较两个表的列?
【发布时间】:2020-02-26 00:29:55
【问题描述】:

我正在尝试通过读取为 DataFrame 来比较两个表()。对于这些表中的每个公共列,使用主键连接说 order_id 与 order_date、order_name、order_event 等其他列。

我正在使用的 Scala 代码

val primary_key=order_id
for (i <- commonColumnsList){
      val column_name = i
      val tempDataFrameForNew = newDataFrame.selectExpr(s"concat($primaryKey,$i) as concatenated")
      val tempDataFrameOld = oldDataFrame.selectExpr(s"concat($primaryKey,$i) as concatenated")

      //Get those records which aren common in both old/new tables
      matchCountCalculated = tempDataFrameForNew.intersect(tempDataFrameOld)
      //Get those records which aren't common in both old/new tables
      nonMatchCountCalculated = tempDataFrameOld.unionAll(tempDataFrameForNew).except(matchCountCalculated)

      //Total Null/Non-Null Counts in both old and new tables.
      nullsCountInNewDataFrame = newDataFrame.select(s"$i").filter(x => x.isNullAt(0)).count().toInt
      nullsCountInOldDataFrame = oldDataFrame.select(s"$i").filter(x => x.isNullAt(0)).count().toInt
      nonNullsCountInNewDataFrame = newDFCount - nullsCountInNewDataFrame
      nonNullsCountInOldDataFrame = oldDFCount - nullsCountInOldDataFrame

      //Put the result for a given column in a Seq variable, later convert it to Dataframe.
      tempSeq = tempSeq :+ Row(column_name, matchCountCalculated.toString, nonMatchCountCalculated.toString, (nullsCountInNewDataFrame - nullsCountInOldDataFrame).toString,
       (nonNullsCountInNewDataFrame - nonNullsCountInOldDataFrame).toString)
}
     // Final Step: Create DataFrame using Seq and some Schema.
     spark.createDataFrame(spark.sparkContext.parallelize(tempSeq), schema)

上述代码对于中等数据集运行良好,但随着我的新旧表中列和记录数量的增加,执行时间也在增加。任何形式的建议表示赞赏。 提前谢谢你。

【问题讨论】:

    标签: scala apache-spark hadoop apache-spark-sql


    【解决方案1】:

    您可以执行以下操作:
    1. 在主键上外连接新旧数据框
    joined_df = df_old.join(df_new, primary_key, "outer") 2. 尽可能缓存它。这将为您节省大量时间
    3. 现在您可以使用 spark 函数遍历列并比较列(.isNull 表示不匹配,== 表示匹配等)

    for (col <- df_new.columns){
      val matchCount = df_joined.filter(df_new[col].isNotNull && df_old[col].isNotNull).count()
      val nonMatchCount = ...
    }
    

    这应该会快很多,尤其是当您可以缓存数据帧时。如果你不能,这可能是个好主意,所以将加入的 df 保存到磁盘,以避免每次都洗牌

    【讨论】:

    • 嗨,保罗,谢谢您的回答。但我有点困惑,或者我的问题可能没有正确构建。如果我进行外部连接,DF1 和 DF2 中的列除了主键之外不应该匹配,不是吗?但在我的旧/新数据帧中,我有相同的列名称。我有兴趣找到两个 DataFrame 中可用的列的差异。
    • 不,我的意思是主键列上的外部联接val joined_df = df_new.join(df_old, primary_key, "outer")
    • 对不起先生,我的坏人,好的,所以在主键上进行完全外连接,然后迭代为 ,,。请纠正我的理解。
    • 我为迭代添加了一些伪代码。这个想法仍然是仅迭代列(primary_key col 除外)。您不需要将键与列连接
    猜你喜欢
    • 2018-01-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-04-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多