Skip to content

Commit 05ec659

Browse files
committed
support expression years
1 parent 14cd6c9 commit 05ec659

3 files changed

Lines changed: 73 additions & 5 deletions

File tree

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,8 @@ object QueryPlanSerde extends Logging with CometExprShim {
206206
classOf[WeekDay] -> CometWeekDay,
207207
classOf[DayOfYear] -> CometDayOfYear,
208208
classOf[WeekOfYear] -> CometWeekOfYear,
209-
classOf[Quarter] -> CometQuarter)
209+
classOf[Quarter] -> CometQuarter,
210+
classOf[Years] -> CometYears)
210211

211212
private val conversionExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
212213
classOf[Cast] -> CometCast)

spark/src/main/scala/org/apache/comet/serde/datetime.scala

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@
2020
package org.apache.comet.serde
2121

2222
import java.util.Locale
23-
24-
import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
25-
import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampType}
23+
import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year, Years}
24+
import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampNTZType, TimestampType}
2625
import org.apache.spark.unsafe.types.UTF8String
2726

2827
import org.apache.comet.CometSparkSessionExtensions.withInfo
@@ -537,3 +536,35 @@ object CometDateFormat extends CometExpressionSerde[DateFormatClass] {
537536
}
538537
}
539538
}
539+
540+
object CometYears extends CometExpressionSerde[Years] {
541+
542+
override def getSupportLevel(expr: Years): SupportLevel = {
543+
expr.child.dataType match {
544+
case DateType | TimestampType | TimestampNTZType => Compatible()
545+
case _ => Unsupported(Some(s"Years does not support type: ${expr.child.dataType}"))
546+
}
547+
}
548+
549+
override def convert(expr: Years,
550+
inputs: Seq[Attribute],
551+
binding: Boolean): Option[ExprOuterClass.Expr] = {
552+
val periodType = exprToProtoInternal(Literal("year"), inputs, binding)
553+
val childExpr = exprToProtoInternal(expr.child, inputs, binding)
554+
val optExpr = scalarFunctionExprToProto("datepart", Seq(periodType, childExpr): _*)
555+
.map(e => {
556+
Expr
557+
.newBuilder()
558+
.setCast(
559+
ExprOuterClass.Cast
560+
.newBuilder()
561+
.setChild(e)
562+
.setDatatype(serializeDataType(IntegerType).get)
563+
.setEvalMode(ExprOuterClass.EvalMode.LEGACY)
564+
.setAllowIncompat(false)
565+
.build())
566+
.build()
567+
})
568+
optExprWithInfo(optExpr, expr, expr.child)
569+
}
570+
}

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.scalatest.Tag
2828

2929
import org.apache.hadoop.fs.Path
3030
import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
31-
import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, FromUnixTime, Literal, TruncDate, TruncTimestamp}
31+
import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, FromUnixTime, Literal, TruncDate, TruncTimestamp, Years}
3232
import org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps
3333
import org.apache.spark.sql.comet.CometProjectExec
3434
import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
@@ -39,6 +39,8 @@ import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
3939
import org.apache.spark.sql.types._
4040

4141
import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus
42+
import org.apache.comet.serde.{CometYears, Compatible, Unsupported}
43+
import org.apache.comet.serde.QueryPlanSerde.exprToProto
4244
import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator}
4345

4446
class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
@@ -3162,4 +3164,38 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
31623164
}
31633165
}
31643166

3167+
test("support Years partition transform (serialization only)") {
3168+
val input = Seq(java.sql.Date.valueOf("2024-01-15")).toDF("col")
3169+
val inputAttrs = input.queryExecution.analyzed.output
3170+
val yearsExpr = Years(input.col("col").expr)
3171+
val proto = exprToProto(yearsExpr, inputAttrs, binding = false)
3172+
3173+
assert(proto.isDefined, "Comet failed to serialize the Years expression!")
3174+
3175+
val expr = proto.get
3176+
assert(expr.hasCast, "Expected the result to be a Cast (to Integer)")
3177+
assert(expr.getCast.getChild.hasScalarFunc, "Expected Cast child to be a Scalar Function")
3178+
assert(expr.getCast.getChild.getScalarFunc.getFunc == "datepart", "Expected function to be 'datepart'")
3179+
}
3180+
3181+
test("Years support level") {
3182+
val supportedTypes = Seq(DateType, TimestampType, TimestampNTZType)
3183+
val unsupportedTypes = Seq(StringType, IntegerType, LongType)
3184+
3185+
supportedTypes.foreach { dt =>
3186+
val child = Literal.default(dt)
3187+
val expr = Years(child)
3188+
val result = CometYears.getSupportLevel(expr)
3189+
3190+
assert(result.isInstanceOf[Compatible], s"Expected $dt to be Compatible")
3191+
}
3192+
3193+
unsupportedTypes.foreach { dt =>
3194+
val child = Literal.default(dt)
3195+
val expr = Years(child)
3196+
val result = CometYears.getSupportLevel(expr)
3197+
3198+
assert(result.isInstanceOf[Unsupported], s"Expected $dt to be Unsupported")
3199+
}
3200+
}
31653201
}

0 commit comments

Comments
 (0)