Skip to content

[SPARK-50593][SQL] SPJ: Support truncate transform via generalized ReducibleFunction API#55885

Open
metanil wants to merge 3 commits into
apache:masterfrom
metanil:SPARK-50593-truncate_spj_support
Open

[SPARK-50593][SQL] SPJ: Support truncate transform via generalized ReducibleFunction API#55885
metanil wants to merge 3 commits into
apache:masterfrom
metanil:SPARK-50593-truncate_spj_support

Conversation

@metanil
Copy link
Copy Markdown

@metanil metanil commented May 14, 2026

What changes were proposed in this pull request?

This PR adds Storage Partitioned Join (SPJ) support for the truncate partition transform. The approach generalizes the ReducibleFunction API to accept arbitrary parameters via a new ReducibleParameters container, so SPJ can reason about any parameterized transform (bucket, truncate, future ones) through one code path.

Key changes:

  • New public API ReducibleParameters in org.apache.spark.sql.connector.catalog.functions — a typed parameter container.
  • Generalized reducer ReducibleFunction.reducer(ReducibleParameters, ReducibleFunction, ReducibleParameters). The old reducer(int, ..., int) is marked @Deprecated but preserved as the default fallback, so existing connector implementations (e.g., Iceberg 1.10.0) continue to work unchanged.
  • TransformExpression refactor: literal parameters (e.g., bucket numBuckets, truncate width) now live inside children rather than a bespoke numBucketsOpt: Option[Int] field. collectLeaves() is overridden to filter literal parameters and return only column references.
  • Generic path in TransformExpression that extracts ReducibleParameters from literal children and delegates to the new API; compatibility checks (isCompatible, reducers) work uniformly for bucket, truncate, etc.

Why are the changes needed?

Today a join on tables partitioned by truncate(col, N) always shuffles, even when both sides share identical partitioning. The write-side was fixed by SPARK-40295 (Allow v2 functions with literal args in write distribution and ordering), but the read/join side was never enabled.

Previous work in #49211 (@szehon-ho) explored direct support for transforms with literal arguments by adjusting the SPJ paths to recognize them. This PR generalizes the reducer API so the compatibility check is function-agnostic, with a default method that delegates to the deprecated single-int signature for backward compatibility.

Does this PR introduce any user-facing change?

Yes, for connector/catalog authors:

  • New public class ReducibleParameters.
  • New overload ReducibleFunction.reducer(ReducibleParameters, ...) with a default that delegates to the deprecated single-int signature for backward compatibility.
  • No action required for existing connectors; they keep working via the default fallback. Iceberg 1.10.0 (which implements only the old API) is verified via a dedicated LegacyBucketFunction test fixture.

For end users, queries joining tables partitioned by compatible truncate transforms (identical widths, or reducible pairs like truncate(3) and truncate(5)) now avoid shuffle via SPJ.

How was this patch tested?

5 New tests in KeyGroupedPartitioningSuite

cc @szehon-ho @aokolnychyi @sunchao @peter-toth

Was this patch authored or co-authored using generative AI tooling?

Yes — used only for test cases and Javadoc/Scaladoc comments.

Generated-by: Claude Code (Opus 4.7)

transform.children.size == 1 && isReference(transform.children.head)
// TransformExpression.collectLeaves() only returns column references, not literals.
// We need exactly one column reference per transform.
transform.collectLeaves().size == 1
Copy link
Copy Markdown
Member

@sunchao sunchao May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] This widens support from transforms with one direct reference child to any transform whose collectLeaves() returns one leaf. Because Transform arguments can themselves be nested Transforms, V2ExpressionUtils can materialize shapes like outer(years(k)) and outer(days(k)). The new gate accepts both, keyPositions later maps them only by leaf k, and TransformExpression.isSameFunction compares only the outer function name plus literals. That can make storage partitionings with different nested child semantics look compatible and let SPJ skip a required shuffle, which can drop join matches. Keep the old direct-reference constraint, or compare the full non-literal child semantics before admitting these transforms.

[ 🤖 posted by Codex on behalf of sunchao 🤖 ]

private def extractParameters(expr: TransformExpression): ReducibleParameters = {
import scala.jdk.CollectionConverters._
val values = expr.literalChildren.map {
case Literal(value, _) => value.asInstanceOf[AnyRef]
Copy link
Copy Markdown
Member

@sunchao sunchao May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] extractParameters forwards raw Catalyst Literal.value objects into ReducibleParameters. For string literals, Spark stores UTF8String, while the new public API documents string parameters and exposes getString() as a java.lang.String cast. A connector implementing the documented string-parameter case will get ClassCastException from getString(0) or be forced to depend on Spark internals. Convert literal values to connector-facing external values by dataType before constructing ReducibleParameters.

[ 🤖 posted by Codex on behalf of sunchao 🤖 ]

val thisParams = extractParameters(thisExpr)
val otherParams = extractParameters(otherExpr)

val res = if (!thisParams.isEmpty && !otherParams.isEmpty) {
Copy link
Copy Markdown
Member

@sunchao sunchao May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] The new generalized reducer API accepts ReducibleParameters on both sides, and this file even models zero-literal transforms as ReducibleParameters([]). But this dispatch only invokes the generalized overload when both sides are non-empty. Mixed cases such as parameterized-vs-zero-parameter transforms instead fall back to reducer(otherFunction), so connectors that correctly implement the new generalized overload for those cases are never invoked; the default legacy path can even throw UnsupportedOperationException. If mixed arity is meant to be unsupported, the new API/docs should say that explicitly. Otherwise this should dispatch through the generalized overload whenever either side wants that path.

[ 🤖 posted by Codex on behalf of sunchao 🤖 ]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants