Scala(PySpark 见下文)
spark-fast-tests 库有两种进行 DataFrame 比较的方法(我是该库的创建者):
assertSmallDataFrameEquality方法收集驱动节点上的DataFrame并进行比较
def assertSmallDataFrameEquality(actualDF: DataFrame, expectedDF: DataFrame): Unit = {
if (!actualDF.schema.equals(expectedDF.schema)) {
throw new DataFrameSchemaMismatch(schemaMismatchMessage(actualDF, expectedDF))
}
if (!actualDF.collect().sameElements(expectedDF.collect())) {
throw new DataFrameContentMismatch(contentMismatchMessage(actualDF, expectedDF))
}
}
assertLargeDataFrameEquality方法比较分布在多台机器上的DataFrames(代码基本是从spark-testing-base复制过来的)
def assertLargeDataFrameEquality(actualDF: DataFrame, expectedDF: DataFrame): Unit = {
if (!actualDF.schema.equals(expectedDF.schema)) {
throw new DataFrameSchemaMismatch(schemaMismatchMessage(actualDF, expectedDF))
}
try {
actualDF.rdd.cache
expectedDF.rdd.cache
val actualCount = actualDF.rdd.count
val expectedCount = expectedDF.rdd.count
if (actualCount != expectedCount) {
throw new DataFrameContentMismatch(countMismatchMessage(actualCount, expectedCount))
}
val expectedIndexValue = zipWithIndex(actualDF.rdd)
val resultIndexValue = zipWithIndex(expectedDF.rdd)
val unequalRDD = expectedIndexValue
.join(resultIndexValue)
.filter {
case (idx, (r1, r2)) =>
!(r1.equals(r2) || RowComparer.areRowsEqual(r1, r2, 0.0))
}
val maxUnequalRowsToShow = 10
assertEmpty(unequalRDD.take(maxUnequalRowsToShow))
} finally {
actualDF.rdd.unpersist()
expectedDF.rdd.unpersist()
}
}
assertSmallDataFrameEquality 对于小型 DataFrame 比较更快,我发现它对于我的测试套件来说已经足够了。
PySpark
这是一个简单的函数,如果 DataFrame 相等则返回 true:
def are_dfs_equal(df1, df2):
if df1.schema != df2.schema:
return False
if df1.collect() != df2.collect():
return False
return True
您通常会在测试套件中执行 DataFrame 相等性比较,并且在比较失败时需要描述性错误消息(True / False 返回值在调试时没有多大帮助)。
使用chispa 库访问assert_df_equality 方法,该方法返回测试套件工作流的描述性错误消息。