diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index ddfe80443d561..b99cbd0750ce4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -214,13 +214,17 @@ abstract class Optimizer(catalogManager: CatalogManager) OptimizeSubqueries, OptimizeOneRowRelationSubquery), Batch("Replace Operators", fixedPoint, + // SPARK-51262: ReplaceDeduplicateWithAggregate must run before RewriteExceptAll because + // it replaces Deduplicate with Aggregate(First(...)), creating new attribute exprIds. + // If RewriteExceptAll runs first, its Generate node captures stale exprIds that no + // longer exist after the Deduplicate-to-Aggregate rewrite. + ReplaceDeduplicateWithAggregate, RewriteExceptAll, RewriteIntersectAll, ReplaceIntersectWithSemiJoin, ReplaceExceptWithFilter, ReplaceExceptWithAntiJoin, - ReplaceDistinctWithAggregate, - ReplaceDeduplicateWithAggregate), + ReplaceDistinctWithAggregate), Batch("Aggregate", fixedPoint, RemoveLiteralFromGroupExpressions, RemoveRepetitionFromGroupExpressions), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala index e65942689bc06..d838ba4c234f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala @@ -1642,6 +1642,43 @@ class DataFrameSetOperationsSuite extends SharedSparkSession with AdaptiveSparkP } } } + + test("SPARK-51262: exceptAll after dropDuplicates with subset should not throw") { + // Data where dropDuplicates(subset) produces deterministic results - to avoid test flakiness. + val df1 = spark.createDataFrame(Seq( + (1, "a", 100), + (2, "b", 200), + (3, "c", 300) + )).toDF("id", "name", "value") + + val df2 = spark.createDataFrame(Seq( + (1, "a", 100) + )).toDF("id", "name", "value") + + // dropDuplicates with subset - each (id, name) is already unique so output is deterministic + val deduped = df1.dropDuplicates("id", "name") + + // exceptAll should work without INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND + val result = deduped.exceptAll(df2) + assert(result.columns === Array("id", "name", "value")) + val rows = result.collect().sortBy(_.getInt(0)) + assert(rows.length === 2) + assert(rows(0) === Row(2, "b", 200)) + assert(rows(1) === Row(3, "c", 300)) + + // Also verify except (non-all) works and returns correct values + val result2 = deduped.except(df2) + val rows2 = result2.collect().sortBy(_.getInt(0)) + assert(rows2.length === 2) + assert(rows2(0) === Row(2, "b", 200)) + assert(rows2(1) === Row(3, "c", 300)) + + // intersectAll should also work and return the matching row + val result3 = deduped.intersectAll(df2) + val rows3 = result3.collect() + assert(rows3.length === 1) + assert(rows3.head === Row(1, "a", 100)) + } } case class UnionClass1a(a: Int, b: Long, nested: UnionClass2)