Skip to content

[SPARK-56887][SQL] Add dedicated sort-merge physical operator for AS-OF join#55912

Open
sarutak wants to merge 6 commits into
apache:masterfrom
sarutak:sort-merge-asof-join
Open

[SPARK-56887][SQL] Add dedicated sort-merge physical operator for AS-OF join#55912
sarutak wants to merge 6 commits into
apache:masterfrom
sarutak:sort-merge-asof-join

Conversation

@sarutak
Copy link
Copy Markdown
Member

@sarutak sarutak commented May 15, 2026

What changes were proposed in this pull request?

Add SortMergeAsOfJoinExec, a dedicated physical operator for AS-OF joins that replaces the existing correlated subquery rewrite (RewriteAsOfJoin) when spark.sql.join.sortMergeAsOfJoin.enabled is set to true (default false).

The operator co-partitions both sides by equi-join keys, sorts by (equi-keys, as-of key), and performs a single-pass merge scan per partition to find the nearest match for each left row. It exploits sort order for early termination by scanning in the optimal direction based on the join direction (right-to-left for backward, left-to-right for forward/nearest).

Changes:

  • New physical operator: SortMergeAsOfJoinExec
  • New planner strategy: AsOfJoinSelection in SparkStrategies
  • Conditional skip of RewriteAsOfJoin when the conf is enabled
  • New SQLConf: spark.sql.join.sortMergeAsOfJoin.enabled

Why are the changes needed?

The current AS-OF join implementation rewrites AsOfJoin to a correlated scalar subquery with MIN_BY. This approach is O(N×M) per partition and causes OOM on moderate data sizes (100K+ rows), because the inequality condition (left.t >= right.t) cannot be decorrelated into an equi-join.

The sort-merge operator is O(N+M) per partition after sorting, with early termination within each equi-key group.

Benchmark results on GitHub Actions (AMD EPYC 7763, 10K×10K rows, 100 equi-key groups):

JDK 17 JDK 21 JDK 25
With equi-key 631.8× 601.2× 676.5×
Without equi-key 14.0× 13.3× 13.7×

For 100K×100K rows, the baseline OOMs while the sort-merge operator completes in ~500 ms.

Does this PR introduce any user-facing change?

No. The feature is opt-in via a new SQLConf that defaults to false. When disabled, the existing RewriteAsOfJoin path is used unchanged.

How was this patch tested?

  • SortMergeAsOfJoinSuite: 18 tests covering backward/forward/nearest directions, equi-keys, left outer, tolerance, allowExactMatches=false, empty partitions, null keys, multiple data types (Int/Long/Double), self join, no equi-key, and conf-disabled fallback
  • AsOfJoinBenchmark: comparative benchmark (correlated subquery vs sort-merge)
  • Existing DataFrameAsOfJoinSuite: all 11 tests pass with default conf (no regression)

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude (via Kiro CLI, auto model selection)

sarutak and others added 6 commits May 15, 2026 16:06
## What changes were proposed in this pull request?

Add a dedicated sort-merge physical operator (SortMergeAsOfJoinExec) for
AS-OF joins that replaces the existing correlated subquery rewrite path.

The new operator co-partitions both sides by equi-join keys and sorts by
the as-of key, then performs a single-pass merge scan per partition to
find the nearest match for each left row. It exploits sort order for
early termination (scanning right-to-left within each group).

Gated behind spark.sql.join.sortMergeAsOfJoin.enabled (default false).
When disabled, the existing RewriteAsOfJoin optimizer rule is used.

## Why are the changes needed?

The current AS-OF join implementation (RewriteAsOfJoin) rewrites to a
correlated scalar subquery with MIN_BY, which is O(N*M) and causes OOM
on moderate data sizes. The sort-merge operator is O(N+M) per partition
after sorting.

Benchmark results (10K x 10K rows, 100 equi-key groups, Apple M1 Pro):
- Correlated subquery: 35,566 ms
- Sort-merge operator:     73 ms  (486x faster)

For 100K x 100K rows, the baseline OOMs while sort-merge completes in
487 ms.

## Does this PR introduce any user-facing change?

No. The feature is opt-in via a new SQLConf (default false).

## How was this patch tested?

- SortMergeAsOfJoinSuite: 7 tests covering backward/forward/nearest
  directions, equi-keys, left outer, and tolerance
- AsOfJoinBenchmark: comparative benchmark
- Existing DataFrameAsOfJoinSuite: all 11 tests pass (default path)
…inBenchmark (JDK 17, Scala 2.13, split 1 of 1)
…inBenchmark (JDK 21, Scala 2.13, split 1 of 1)
…inBenchmark (JDK 25, Scala 2.13, split 1 of 1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant