[SPARK-56887][SQL] Add dedicated sort-merge physical operator for AS-OF join#55912
Open
sarutak wants to merge 6 commits into
Open
[SPARK-56887][SQL] Add dedicated sort-merge physical operator for AS-OF join#55912sarutak wants to merge 6 commits into
sarutak wants to merge 6 commits into
Conversation
## What changes were proposed in this pull request? Add a dedicated sort-merge physical operator (SortMergeAsOfJoinExec) for AS-OF joins that replaces the existing correlated subquery rewrite path. The new operator co-partitions both sides by equi-join keys and sorts by the as-of key, then performs a single-pass merge scan per partition to find the nearest match for each left row. It exploits sort order for early termination (scanning right-to-left within each group). Gated behind spark.sql.join.sortMergeAsOfJoin.enabled (default false). When disabled, the existing RewriteAsOfJoin optimizer rule is used. ## Why are the changes needed? The current AS-OF join implementation (RewriteAsOfJoin) rewrites to a correlated scalar subquery with MIN_BY, which is O(N*M) and causes OOM on moderate data sizes. The sort-merge operator is O(N+M) per partition after sorting. Benchmark results (10K x 10K rows, 100 equi-key groups, Apple M1 Pro): - Correlated subquery: 35,566 ms - Sort-merge operator: 73 ms (486x faster) For 100K x 100K rows, the baseline OOMs while sort-merge completes in 487 ms. ## Does this PR introduce any user-facing change? No. The feature is opt-in via a new SQLConf (default false). ## How was this patch tested? - SortMergeAsOfJoinSuite: 7 tests covering backward/forward/nearest directions, equi-keys, left outer, and tolerance - AsOfJoinBenchmark: comparative benchmark - Existing DataFrameAsOfJoinSuite: all 11 tests pass (default path)
…inBenchmark (JDK 17, Scala 2.13, split 1 of 1)
…inBenchmark (JDK 21, Scala 2.13, split 1 of 1)
…inBenchmark (JDK 25, Scala 2.13, split 1 of 1)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Add
SortMergeAsOfJoinExec, a dedicated physical operator for AS-OF joins that replaces the existing correlated subquery rewrite (RewriteAsOfJoin) whenspark.sql.join.sortMergeAsOfJoin.enabledis set totrue(defaultfalse).The operator co-partitions both sides by equi-join keys, sorts by (equi-keys, as-of key), and performs a single-pass merge scan per partition to find the nearest match for each left row. It exploits sort order for early termination by scanning in the optimal direction based on the join direction (right-to-left for backward, left-to-right for forward/nearest).
Changes:
SortMergeAsOfJoinExecAsOfJoinSelectioninSparkStrategiesRewriteAsOfJoinwhen the conf is enabledspark.sql.join.sortMergeAsOfJoin.enabledWhy are the changes needed?
The current AS-OF join implementation rewrites
AsOfJointo a correlated scalar subquery withMIN_BY. This approach is O(N×M) per partition and causes OOM on moderate data sizes (100K+ rows), because the inequality condition (left.t >= right.t) cannot be decorrelated into an equi-join.The sort-merge operator is O(N+M) per partition after sorting, with early termination within each equi-key group.
Benchmark results on GitHub Actions (AMD EPYC 7763, 10K×10K rows, 100 equi-key groups):
For 100K×100K rows, the baseline OOMs while the sort-merge operator completes in ~500 ms.
Does this PR introduce any user-facing change?
No. The feature is opt-in via a new SQLConf that defaults to
false. When disabled, the existingRewriteAsOfJoinpath is used unchanged.How was this patch tested?
SortMergeAsOfJoinSuite: 18 tests covering backward/forward/nearest directions, equi-keys, left outer, tolerance, allowExactMatches=false, empty partitions, null keys, multiple data types (Int/Long/Double), self join, no equi-key, and conf-disabled fallbackAsOfJoinBenchmark: comparative benchmark (correlated subquery vs sort-merge)DataFrameAsOfJoinSuite: all 11 tests pass with default conf (no regression)Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude (via Kiro CLI, auto model selection)