Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion spark/src/main/scala/org/apache/comet/serde/datetime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.comet.serde

import java.util.Locale

import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, GetDateField, Hour, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, Expression, GetDateField, Hour, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -589,6 +589,39 @@ object CometDateFormat extends CometExpressionSerde[DateFormatClass] {
}
}

trait CommonDateTimeExprs {

def secondsOfTimeToProto(
expr: Expression,
inputs: Seq[Attribute],
binding: Boolean): Option[Expr] = {
val childOpt = expr.children.headOption.orElse {
withInfo(expr, "SecondsOfTime has no child expression")
None
}

childOpt.flatMap { child =>
exprToProtoInternal(child, inputs, binding)
.map { childExpr =>
val builder = ExprOuterClass.Second.newBuilder()
builder.setChild(childExpr)

// SecondsOfTime does not carry a timeZoneId; assume UTC.
builder.setTimezone("UTC")

ExprOuterClass.Expr
.newBuilder()
.setSecond(builder)
.build()
}
.orElse {
withInfo(expr, child)
None
}
}
}
}

/**
* Converts a timestamp or date to the number of days since Unix epoch (1970-01-01). This is a V2
* partition transform expression.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ package org.apache.comet.shims
import org.apache.spark.sql.catalyst.expressions._

import org.apache.comet.expressions.CometEvalMode
import org.apache.comet.serde.CommonStringExprs
import org.apache.comet.serde.{CommonDateTimeExprs, CommonStringExprs}
import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr}

/**
* `CometExprShim` acts as a shim for parsing expressions from different Spark versions.
*/
trait CometExprShim extends CommonStringExprs {
trait CometExprShim extends CommonStringExprs with CommonDateTimeExprs {
protected def evalMode(c: Cast): CometEvalMode.Value =
CometEvalModeUtil.fromSparkEvalMode(c.evalMode)

Expand All @@ -43,6 +43,10 @@ trait CometExprShim extends CommonStringExprs {
// Right child is the encoding expression.
stringDecode(expr, s.charset, s.bin, inputs, binding)

case _: UnaryExpression if expr.prettyName == "seconds_of_time" =>
// SecondsOfTime may not exist in all Spark versions, so we match by prettyName
secondsOfTimeToProto(expr, inputs, binding)

case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import org.apache.spark.sql.types.DataTypes

import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.expressions.{CometCast, CometEvalMode}
import org.apache.comet.serde.{CommonStringExprs, Compatible, ExprOuterClass, Incompatible}
import org.apache.comet.serde.{CommonDateTimeExprs, CommonStringExprs, Compatible, ExprOuterClass, Incompatible}
import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr}
import org.apache.comet.serde.QueryPlanSerde.exprToProtoInternal

/**
* `CometExprShim` acts as a shim for parsing expressions from different Spark versions.
*/
trait CometExprShim extends CommonStringExprs {
trait CometExprShim extends CommonStringExprs with CommonDateTimeExprs {
protected def evalMode(c: Cast): CometEvalMode.Value =
CometEvalModeUtil.fromSparkEvalMode(c.evalMode)

Expand Down Expand Up @@ -91,6 +91,10 @@ trait CometExprShim extends CommonStringExprs {
// val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*)
// optExprWithInfo(optExpr, wb, wb.children: _*)

case _: UnaryExpression if expr.prettyName == "seconds_of_time" =>
// SecondsOfTime may not exist in all Spark versions, so we match by prettyName
secondsOfTimeToProto(expr, inputs, binding)

case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ import org.apache.spark.sql.types.{BinaryType, BooleanType, DataTypes, StringTyp

import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.expressions.{CometCast, CometEvalMode}
import org.apache.comet.serde.{CommonStringExprs, Compatible, ExprOuterClass, Incompatible}
import org.apache.comet.serde.{CommonDateTimeExprs, CommonStringExprs, Compatible, ExprOuterClass, Incompatible}
import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr}
import org.apache.comet.serde.QueryPlanSerde.exprToProtoInternal

/**
* `CometExprShim` acts as a shim for parsing expressions from different Spark versions.
*/
trait CometExprShim extends CommonStringExprs {
trait CometExprShim extends CommonStringExprs with CommonDateTimeExprs {
protected def evalMode(c: Cast): CometEvalMode.Value =
CometEvalModeUtil.fromSparkEvalMode(c.evalMode)

Expand Down Expand Up @@ -113,6 +113,10 @@ trait CometExprShim extends CommonStringExprs {
// val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*)
// optExprWithInfo(optExpr, wb, wb.children: _*)

case _: UnaryExpression if expr.prettyName == "seconds_of_time" =>
// SecondsOfTime may not exist in all Spark versions, so we match by prettyName
secondsOfTimeToProto(expr, inputs, binding)

case _ => None
}
}
Expand Down
17 changes: 17 additions & 0 deletions spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,23 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("seconds_of_time expression") {
// This test verifies that seconds() function works correctly with timestamp columns.
// If Spark generates SecondOfTime expression (a RuntimeReplaceable expression),
// it will be handled by the version-specific shim and converted to Second proto.
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "part-r-0.parquet")
makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000)
readParquetFile(path.toString) { df =>
val query = df.select(expr("second(_1)"))

checkSparkAnswerAndOperator(query)
}
}
}
}

test("hour on int96 timestamp column") {
import testImplicits._

Expand Down
Loading