-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-50593][SQL] SPJ: Support truncate transform via generalized ReducibleFunction API #55885
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,139 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.spark.sql.connector.catalog.functions; | ||
|
|
||
| import org.apache.spark.annotation.Evolving; | ||
| import java.util.Arrays; | ||
| import java.util.List; | ||
|
|
||
| /** | ||
| * Container for reducible function literal parameters. | ||
| * Provides type-safe access to parameters of various types. | ||
| * | ||
| * Examples: | ||
| * <ul> | ||
| * <li>bucket(4, col) → ReducibleParameters([4])</li> | ||
| * <li>truncate(col, 3) → ReducibleParameters([3])</li> | ||
| * <li>range_bucket(col, 0L, 100L, 10) → ReducibleParameters([0L, 100L, 10])</li> | ||
| * <li>custom_transform(col, "param") → ReducibleParameters(["param"])</li> | ||
| * </ul> | ||
| * | ||
| * @since 4.0.0 | ||
| */ | ||
| @Evolving | ||
| public class ReducibleParameters { | ||
| private final List<Object> values; | ||
|
|
||
| public ReducibleParameters(List<Object> values) { | ||
| this.values = values; | ||
| } | ||
|
|
||
| public ReducibleParameters(Object... values) { | ||
| this.values = Arrays.asList(values); | ||
| } | ||
|
|
||
| /** | ||
| * Get the number of parameters. | ||
| */ | ||
| public int count() { | ||
| return values.size(); | ||
| } | ||
|
|
||
| /** | ||
| * Check if this container has parameters. | ||
| */ | ||
| public boolean isEmpty() { | ||
| return values.isEmpty(); | ||
| } | ||
|
|
||
| /** | ||
| * Get parameter at index as Integer. | ||
| * @throws ClassCastException if parameter is not an Integer | ||
| * @throws IndexOutOfBoundsException if index is invalid | ||
| */ | ||
| public int getInt(int index) { | ||
| return (Integer) values.get(index); | ||
| } | ||
|
|
||
| /** | ||
| * Get parameter at index as Long. | ||
| * @throws ClassCastException if parameter is not a Long | ||
| * @throws IndexOutOfBoundsException if index is invalid | ||
| */ | ||
| public long getLong(int index) { | ||
| return (Long) values.get(index); | ||
| } | ||
|
|
||
| /** | ||
| * Get parameter at index as String. | ||
| * @throws ClassCastException if parameter is not a String | ||
| * @throws IndexOutOfBoundsException if index is invalid | ||
| */ | ||
| public String getString(int index) { | ||
| return (String) values.get(index); | ||
| } | ||
|
|
||
| /** | ||
| * Get parameter at index as Double. | ||
| * @throws ClassCastException if parameter is not a Double | ||
| * @throws IndexOutOfBoundsException if index is invalid | ||
| */ | ||
| public double getDouble(int index) { | ||
| return (Double) values.get(index); | ||
| } | ||
|
|
||
| /** | ||
| * Get parameter at index as Float. | ||
| * @throws ClassCastException if parameter is not a Float | ||
| * @throws IndexOutOfBoundsException if index is invalid | ||
| */ | ||
| public float getFloat(int index) { | ||
| return (Float) values.get(index); | ||
| } | ||
|
|
||
| /** | ||
| * Get raw parameter value at index. | ||
| */ | ||
| public Object get(int index) { | ||
| return values.get(index); | ||
| } | ||
|
|
||
| /** | ||
| * Get all parameter values as a list. | ||
| */ | ||
| public List<Object> getAll() { | ||
| return values; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) { | ||
| if (this == o) return true; | ||
| if (o == null || getClass() != o.getClass()) return false; | ||
| ReducibleParameters that = (ReducibleParameters) o; | ||
| return values.equals(that.values); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return values.hashCode(); | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "ReducibleParameters(" + values + ")"; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions | |
|
|
||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} | ||
| import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, Reducer, ReducibleFunction, ScalarFunction} | ||
| import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, Reducer, ReducibleFunction, ReducibleParameters, ScalarFunction} | ||
| import org.apache.spark.sql.errors.QueryExecutionErrors | ||
| import org.apache.spark.sql.types.DataType | ||
|
|
||
|
|
@@ -28,35 +28,87 @@ import org.apache.spark.sql.types.DataType | |
| * | ||
| * @param function the transform function itself. Spark will use it to decide whether two | ||
| * partition transform expressions are compatible. | ||
| * @param numBucketsOpt the number of buckets if the transform is `bucket`. Unset otherwise. | ||
| */ | ||
| case class TransformExpression( | ||
| function: BoundFunction, | ||
| children: Seq[Expression], | ||
| numBucketsOpt: Option[Int] = None) extends Expression { | ||
| case class TransformExpression(function: BoundFunction, children: Seq[Expression]) | ||
| extends Expression { | ||
|
|
||
| override def nullable: Boolean = true | ||
|
|
||
| /** | ||
| * Whether this [[TransformExpression]] has the same semantics as `other`. | ||
| * For instance, `bucket(32, c)` is equal to `bucket(32, d)`, but not to `bucket(16, d)` or | ||
| * `year(c)`. | ||
| * Extract literal children (constant parameters) from this transform. These are constant | ||
| * arguments like width in truncate(col, width). Literals are compared when checking if two | ||
| * transforms are the same. | ||
| */ | ||
| private lazy val literalChildren: Seq[Literal] = | ||
| children.collect { case l: Literal => l } | ||
|
|
||
| /** | ||
| * Whether this [[TransformExpression]] has the same semantics as `other`. For instance, | ||
| * `bucket(32, c)` is equal to `bucket(32, d)`, but not to `bucket(16, d)` or `year(c)`. | ||
| * Similarly, `truncate(c, 2)` is equal to `truncate(d, 2)`, but may not to `truncate(c, 4)`. | ||
| * | ||
| * This will be used, for instance, by Spark to determine whether storage-partitioned join can | ||
| * be triggered, by comparing partition transforms from both sides of the join and checking | ||
| * whether they are compatible. | ||
| * | ||
| * @param other the transform expression to compare to | ||
| * @return true if this and `other` has the same semantics w.r.t to transform, false otherwise. | ||
| * Two transforms are considered the same if: | ||
| * 1. They have the same function name | ||
| * 2. They have the same literal arguments (e.g., numBuckets for bucket, width for truncate) | ||
| * | ||
| * @param other | ||
| * the transform expression to compare to | ||
| * @return | ||
| * true if this and `other` has the same semantics w.r.t to transform, false otherwise. | ||
| */ | ||
| def isSameFunction(other: TransformExpression): Boolean = other match { | ||
| case TransformExpression(otherFunction, _, otherNumBucketsOpt) => | ||
| function.canonicalName() == otherFunction.canonicalName() && | ||
| numBucketsOpt == otherNumBucketsOpt | ||
| case TransformExpression(otherFunction, _) => | ||
| val sameFunctionName = function.canonicalName() == otherFunction.canonicalName() | ||
|
|
||
| // Compare literal arguments to ensure transforms with different parameters | ||
| // (e.g., bucket(32, col) vs bucket(16, col), truncate(col, 2) vs truncate(col, 4)) | ||
| // are not considered the same | ||
| val otherLiterals = other.literalChildren | ||
| val sameLiterals = literalChildren.length == otherLiterals.length && | ||
| literalChildren.zip(otherLiterals).forall { case (l1, l2) => | ||
| l1.equals(l2) | ||
| } | ||
|
|
||
| sameFunctionName && sameLiterals | ||
| case _ => | ||
| false | ||
| } | ||
|
|
||
| /** | ||
| * Override canonicalized to ensure transforms with the same function and literals are | ||
| * considered semantically equal, regardless of which specific column references they use. | ||
| * | ||
| * This is crucial for Storage Partitioned Joins - we need bucket(4, tableA.id) and bucket(4, | ||
| * tableB.id) to be semantically equal so SPJ can be triggered. | ||
| */ | ||
| override lazy val canonicalized: Expression = { | ||
| // Canonicalize only the non-literal children (i.e., column references) | ||
| val canonicalizedReferenceChildren = children.map { | ||
| case l: Literal => l | ||
| case other => other.canonicalized | ||
| } | ||
| TransformExpression(function, canonicalizedReferenceChildren) | ||
| } | ||
|
|
||
| /** | ||
| * Override collectLeaves to only return reference children (columns), not literal parameters. | ||
| * | ||
| * For TransformExpression, literal children are metadata about the transform function (e.g., | ||
| * numBuckets=4 in bucket(4, col), width=2 in truncate(col, 2)). All consumers of | ||
| * collectLeaves() expect only column references, not these metadata literals. | ||
| * | ||
| */ | ||
| override def collectLeaves(): Seq[Expression] = { | ||
| children.flatMap { | ||
| case _: Literal => Seq.empty // Skip literal parameters (metadata) | ||
| case other => other.collectLeaves() // Include column references | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Whether this [[TransformExpression]]'s function is compatible with the `other` | ||
| * [[TransformExpression]]'s function. | ||
|
|
@@ -73,8 +125,8 @@ case class TransformExpression( | |
| } else { | ||
| (function, other.function) match { | ||
| case (f: ReducibleFunction[_, _], o: ReducibleFunction[_, _]) => | ||
| val thisReducer = reducer(f, numBucketsOpt, o, other.numBucketsOpt) | ||
| val otherReducer = reducer(o, other.numBucketsOpt, f, numBucketsOpt) | ||
| val thisReducer = reducer(f, this, o, other) | ||
| val otherReducer = reducer(o, other, f, this) | ||
| thisReducer.isDefined || otherReducer.isDefined | ||
| case _ => false | ||
| } | ||
|
|
@@ -92,22 +144,47 @@ case class TransformExpression( | |
| */ | ||
| def reducers(other: TransformExpression): Option[Reducer[_, _]] = { | ||
| (function, other.function) match { | ||
| case(e1: ReducibleFunction[_, _], e2: ReducibleFunction[_, _]) => | ||
| reducer(e1, numBucketsOpt, e2, other.numBucketsOpt) | ||
| case (e1: ReducibleFunction[_, _], e2: ReducibleFunction[_, _]) => | ||
| reducer(e1, this, e2, other) | ||
| case _ => None | ||
| } | ||
| } | ||
|
|
||
| // Return a Reducer for a reducible function on another reducible function | ||
| /** | ||
| * Extract all literal parameters from a transform expression. | ||
| * Returns ReducibleParameters containing the literal values in order. | ||
| * | ||
| * Examples: | ||
| * bucket(4, col) => ReducibleParameters([4]) | ||
| * truncate(col, 3) => ReducibleParameters([3]) | ||
| * days(col) => ReducibleParameters([]) (no literals) | ||
| */ | ||
| private def extractParameters(expr: TransformExpression): ReducibleParameters = { | ||
| import scala.jdk.CollectionConverters._ | ||
| val values = expr.literalChildren.map { | ||
| case Literal(value, _) => value.asInstanceOf[AnyRef] | ||
| } | ||
| new ReducibleParameters(values.asJava) | ||
| } | ||
|
|
||
| /** | ||
| * Return a Reducer for a reducible function on another reducible function | ||
| * Handles both parameterized (bucket, truncate) and non-parameterized (days, hours) functions. | ||
| */ | ||
| private def reducer( | ||
| thisFunction: ReducibleFunction[_, _], | ||
| thisNumBucketsOpt: Option[Int], | ||
| thisExpr: TransformExpression, | ||
| otherFunction: ReducibleFunction[_, _], | ||
| otherNumBucketsOpt: Option[Int]): Option[Reducer[_, _]] = { | ||
| val res = (thisNumBucketsOpt, otherNumBucketsOpt) match { | ||
| case (Some(numBuckets), Some(otherNumBuckets)) => | ||
| thisFunction.reducer(numBuckets, otherFunction, otherNumBuckets) | ||
| case _ => thisFunction.reducer(otherFunction) | ||
| otherExpr: TransformExpression): Option[Reducer[_, _]] = { | ||
| val thisParams = extractParameters(thisExpr) | ||
| val otherParams = extractParameters(otherExpr) | ||
|
|
||
| val res = if (!thisParams.isEmpty && !otherParams.isEmpty) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [P2] The new generalized reducer API accepts [ 🤖 posted by Codex on behalf of sunchao 🤖 ] |
||
| // Parameterized functions (bucket, truncate, etc.) | ||
| thisFunction.reducer(thisParams, otherFunction, otherParams) | ||
| } else { | ||
| // Non-parameterized functions (days, hours, etc.) | ||
| thisFunction.reducer(otherFunction) | ||
| } | ||
| Option(res) | ||
| } | ||
|
|
@@ -118,10 +195,7 @@ case class TransformExpression( | |
| copy(children = newChildren) | ||
|
|
||
| private lazy val resolvedFunction: Option[Expression] = this match { | ||
| case TransformExpression(scalarFunc: ScalarFunction[_], arguments, Some(numBuckets)) => | ||
| Some(V2ExpressionUtils.resolveScalarFunction(scalarFunc, | ||
| Seq(Literal(numBuckets)) ++ arguments)) | ||
| case TransformExpression(scalarFunc: ScalarFunction[_], arguments, None) => | ||
| case TransformExpression(scalarFunc: ScalarFunction[_], arguments) => | ||
| Some(V2ExpressionUtils.resolveScalarFunction(scalarFunc, arguments)) | ||
| case _ => None | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[P1]
extractParametersforwards raw CatalystLiteral.valueobjects intoReducibleParameters. For string literals, Spark storesUTF8String, while the new public API documents string parameters and exposesgetString()as ajava.lang.Stringcast. A connector implementing the documented string-parameter case will getClassCastExceptionfromgetString(0)or be forced to depend on Spark internals. Convert literal values to connector-facing external values bydataTypebefore constructingReducibleParameters.[ 🤖 posted by Codex on behalf of sunchao 🤖 ]