【发布时间】:2020-02-26 00:29:55
【问题描述】:
我正在尝试通过读取为 DataFrame 来比较两个表()。对于这些表中的每个公共列,使用主键连接说 order_id 与 order_date、order_name、order_event 等其他列。
我正在使用的 Scala 代码
val primary_key=order_id
for (i <- commonColumnsList){
val column_name = i
val tempDataFrameForNew = newDataFrame.selectExpr(s"concat($primaryKey,$i) as concatenated")
val tempDataFrameOld = oldDataFrame.selectExpr(s"concat($primaryKey,$i) as concatenated")
//Get those records which aren common in both old/new tables
matchCountCalculated = tempDataFrameForNew.intersect(tempDataFrameOld)
//Get those records which aren't common in both old/new tables
nonMatchCountCalculated = tempDataFrameOld.unionAll(tempDataFrameForNew).except(matchCountCalculated)
//Total Null/Non-Null Counts in both old and new tables.
nullsCountInNewDataFrame = newDataFrame.select(s"$i").filter(x => x.isNullAt(0)).count().toInt
nullsCountInOldDataFrame = oldDataFrame.select(s"$i").filter(x => x.isNullAt(0)).count().toInt
nonNullsCountInNewDataFrame = newDFCount - nullsCountInNewDataFrame
nonNullsCountInOldDataFrame = oldDFCount - nullsCountInOldDataFrame
//Put the result for a given column in a Seq variable, later convert it to Dataframe.
tempSeq = tempSeq :+ Row(column_name, matchCountCalculated.toString, nonMatchCountCalculated.toString, (nullsCountInNewDataFrame - nullsCountInOldDataFrame).toString,
(nonNullsCountInNewDataFrame - nonNullsCountInOldDataFrame).toString)
}
// Final Step: Create DataFrame using Seq and some Schema.
spark.createDataFrame(spark.sparkContext.parallelize(tempSeq), schema)
上述代码对于中等数据集运行良好,但随着我的新旧表中列和记录数量的增加,执行时间也在增加。任何形式的建议表示赞赏。 提前谢谢你。
【问题讨论】:
标签: scala apache-spark hadoop apache-spark-sql