Scala (see below for PySpark)
The spark-fast-tests library has two methods for making DataFrame comparisons (I'm the creator of the library):
The assertSmallDataFrameEquality
method collects DataFrames on the driver node and makes the comparison
def assertSmallDataFrameEquality(actualDF: DataFrame, expectedDF: DataFrame): Unit = {
if (!actualDF.schema.equals(expectedDF.schema)) {
throw new DataFrameSchemaMismatch(schemaMismatchMessage(actualDF, expectedDF))
}
if (!actualDF.collect().sameElements(expectedDF.collect())) {
throw new DataFrameContentMismatch(contentMismatchMessage(actualDF, expectedDF))
}
}
The assertLargeDataFrameEquality
method compares DataFrames spread on multiple machines (the code is basically copied from spark-testing-base)
def assertLargeDataFrameEquality(actualDF: DataFrame, expectedDF: DataFrame): Unit = {
if (!actualDF.schema.equals(expectedDF.schema)) {
throw new DataFrameSchemaMismatch(schemaMismatchMessage(actualDF, expectedDF))
}
try {
actualDF.rdd.cache
expectedDF.rdd.cache
val actualCount = actualDF.rdd.count
val expectedCount = expectedDF.rdd.count
if (actualCount != expectedCount) {
throw new DataFrameContentMismatch(countMismatchMessage(actualCount, expectedCount))
}
val expectedIndexValue = zipWithIndex(actualDF.rdd)
val resultIndexValue = zipWithIndex(expectedDF.rdd)
val unequalRDD = expectedIndexValue
.join(resultIndexValue)
.filter {
case (idx, (r1, r2)) =>
!(r1.equals(r2) || RowComparer.areRowsEqual(r1, r2, 0.0))
}
val maxUnequalRowsToShow = 10
assertEmpty(unequalRDD.take(maxUnequalRowsToShow))
} finally {
actualDF.rdd.unpersist()
expectedDF.rdd.unpersist()
}
}
assertSmallDataFrameEquality
is faster for small DataFrame comparisons and I've found it sufficient for my test suites.
PySpark
Here's a simple function that returns true if the DataFrames are equal:
def are_dfs_equal(df1, df2):
if df1.schema != df2.schema:
return False
if df1.collect() != df2.collect():
return False
return True
You'll typically perform DataFrame equality comparisons in a test suite and will want a descriptive error message when the comparisons fail (a True
/ False
return value doesn't help much when debugging).
Use the chispa library to access the assert_df_equality
method that returns descriptive error messages for test suite workflows.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…