Skip to content
46 changes: 46 additions & 0 deletions spark/src/main/scala/org/apache/comet/serde/strings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -479,4 +479,50 @@ trait CommonStringExprs {
None
}
}

def minutesOfTimeToProto(
expr: Expression,
inputs: Seq[Attribute],
binding: Boolean): Option[Expr] = {
val childOpt = expr.children.headOption.orElse {
withInfo(expr, "MinutesOfTime has no child expression")
None
}

childOpt.flatMap { child =>
val timeZoneId = {
val exprClass = expr.getClass
try {
val timeZoneIdMethod = exprClass.getMethod("timeZoneId")
timeZoneIdMethod.invoke(expr).asInstanceOf[Option[String]]
} catch {
case _: NoSuchMethodException =>
try {
val timeZoneIdField = exprClass.getField("timeZoneId")
timeZoneIdField.get(expr).asInstanceOf[Option[String]]
} catch {
case _: NoSuchFieldException | _: SecurityException => None
}
}
}

exprToProtoInternal(child, inputs, binding)
.map { childExpr =>
val builder = ExprOuterClass.Minute.newBuilder()
builder.setChild(childExpr)

val timeZone = timeZoneId.getOrElse("UTC")
builder.setTimezone(timeZone)

ExprOuterClass.Expr
.newBuilder()
.setMinute(builder)
.build()
}
.orElse {
withInfo(expr, child)
None
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.Sum
import org.apache.comet.expressions.CometEvalMode
import org.apache.comet.serde.CommonStringExprs
import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr}
import org.apache.comet.serde.QueryPlanSerde

/**
* `CometExprShim` acts as a shim for parsing expressions from different Spark versions.
Expand All @@ -44,6 +45,9 @@ trait CometExprShim extends CommonStringExprs {
// Right child is the encoding expression.
stringDecode(expr, s.charset, s.bin, inputs, binding)

case _ if expr.getClass.getSimpleName == "MinutesOfTime" =>
minutesOfTimeToProto(expr, inputs, binding)

case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ trait CometExprShim extends CommonStringExprs {
val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*)
optExprWithInfo(optExpr, wb, wb.children: _*)

case _ if expr.getClass.getSimpleName == "MinutesOfTime" =>
minutesOfTimeToProto(expr, inputs, binding)

case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ trait CometExprShim extends CommonStringExprs {
optExprWithInfo(mapSortExpr, ms, ms.child)
}

case _ if expr.getClass.getSimpleName == "MinutesOfTime" =>
minutesOfTimeToProto(expr, inputs, binding)

case _ => None
}
}
Expand Down
17 changes: 17 additions & 0 deletions spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,23 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("MinutesOfTime expression support") {
// This test verifies that minute() function works correctly with timestamp columns.
// If Spark generates MinutesOfTime expression (a RuntimeReplaceable expression),
// it will be handled by the version-specific shim and converted to Minute proto.
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "part-r-0.parquet")
makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000)
readParquetFile(path.toString) { df =>
val query = df.select(expr("minute(_1)"))

checkSparkAnswerAndOperator(query)
}
}
}
}

test("hour on int96 timestamp column") {
import testImplicits._

Expand Down