Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions spark/src/main/scala/org/apache/comet/serde/aggregates.scala
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,6 @@ object CometAverage extends CometAggregateExpressionSerde[Average] {

object CometSum extends CometAggregateExpressionSerde[Sum] {

override def getIncompatibleReasons(): Seq[String] = Seq("Falls back to Spark in ANSI mode.")

Comment thread
coderfender marked this conversation as resolved.
override def convert(
aggExpr: AggregateExpression,
sum: Sum,
Expand Down
16 changes: 15 additions & 1 deletion spark/src/main/scala/org/apache/comet/serde/strings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProtoInter

object CometStringRepeat extends CometExpressionSerde[StringRepeat] {

override def getCompatibleNotes(): Seq[String] = Seq(
override def getIncompatibleReasons(): Seq[String] = Seq(
Comment thread
coderfender marked this conversation as resolved.
Outdated
Comment thread
coderfender marked this conversation as resolved.
Outdated
"A negative argument for the number of times to repeat throws an exception" +
" instead of returning an empty string as Spark does")

Expand All @@ -42,13 +42,27 @@ object CometStringRepeat extends CometExpressionSerde[StringRepeat] {
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
val children = expr.children

children(1) match {
case Literal(count, _) if isNegativeNumber(count) =>
withInfo(expr, "Negative repeat count is not supported")
return None
case _ =>
Comment thread
coderfender marked this conversation as resolved.
Outdated
}

val leftCast = Cast(children(0), StringType)
val rightCast = Cast(children(1), LongType)
val leftExpr = exprToProtoInternal(leftCast, inputs, binding)
val rightExpr = exprToProtoInternal(rightCast, inputs, binding)
val optExpr = scalarFunctionExprToProto("repeat", leftExpr, rightExpr)
optExprWithInfo(optExpr, expr, leftCast, rightCast)
}

private def isNegativeNumber(value: Any): Boolean = value match {
case i: Int => i < 0
case l: Long => l < 0
case _ => false
}
Comment thread
coderfender marked this conversation as resolved.
Outdated
}

class CometCaseConversionBase[T <: Expression](function: String)
Expand Down
44 changes: 30 additions & 14 deletions spark/src/main/scala/org/apache/comet/serde/unixtime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,27 @@ import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithIn
// https://github.com/apache/datafusion/issues/16594
object CometFromUnixTime extends CometExpressionSerde[FromUnixTime] {

override def getUnsupportedReasons(): Seq[String] = Seq(
"Only supports the default datetime format pattern `yyyy-MM-dd HH:mm:ss`")

override def getIncompatibleReasons(): Seq[String] = Seq(
"Only supports the default datetime format pattern `yyyy-MM-dd HH:mm:ss`." +
" DataFusion's valid timestamp range differs from Spark" +
"DataFusion's valid timestamp range differs from Spark" +
" (https://github.com/apache/datafusion/issues/16594)")

override def getSupportLevel(expr: FromUnixTime): SupportLevel = Incompatible(None)
override def getSupportLevel(expr: FromUnixTime): SupportLevel = {
expr.format match {
case Literal(fmt, _) =>
val formatStr = fmt.toString
Comment thread
coderfender marked this conversation as resolved.
val defaultPattern = TimestampFormatter.defaultPattern
if (formatStr == defaultPattern) {
Incompatible(None)
} else {
Comment thread
coderfender marked this conversation as resolved.
Unsupported(Some(s"Datetime pattern format: $formatStr is unsupported"))
}
case _ =>
Unsupported(Some("Datetime pattern format is unsupported"))
Comment thread
coderfender marked this conversation as resolved.
Outdated
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to leverage the right reason to fallback based on the pattern here

}

override def convert(
expr: FromUnixTime,
Expand All @@ -48,17 +63,18 @@ object CometFromUnixTime extends CometExpressionSerde[FromUnixTime] {
val formatExpr = exprToProtoInternal(Literal("%Y-%m-%d %H:%M:%S"), inputs, binding)
val timeZone = exprToProtoInternal(Literal(expr.timeZoneId.orNull), inputs, binding)

if (expr.format != Literal(TimestampFormatter.defaultPattern)) {
withInfo(expr, "Datetime pattern format is unsupported")
None
} else if (secExpr.isDefined && formatExpr.isDefined) {
val timestampExpr =
scalarFunctionExprToProto("from_unixtime", Seq(secExpr, timeZone): _*)
val optExpr = scalarFunctionExprToProto("to_char", Seq(timestampExpr, formatExpr): _*)
optExprWithInfo(optExpr, expr, expr.sec, expr.format)
} else {
withInfo(expr, expr.sec, expr.format)
None
expr.format match {
case Literal(fmt, _) if fmt.toString != TimestampFormatter.defaultPattern =>
withInfo(expr, "Datetime pattern format is unsupported")
None
case _ if secExpr.isDefined && formatExpr.isDefined =>
val timestampExpr =
scalarFunctionExprToProto("from_unixtime", Seq(secExpr, timeZone): _*)
val optExpr = scalarFunctionExprToProto("to_char", Seq(timestampExpr, formatExpr): _*)
optExprWithInfo(optExpr, expr, expr.sec, expr.format)
Comment on lines +70 to +74
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this code still reachable?

case _ =>
withInfo(expr, expr.sec, expr.format)
None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ INSERT INTO test_from_unix_time VALUES (0), (1718451045), (-1), (NULL), (2147483
query expect_fallback(not fully compatible with Spark)
SELECT from_unixtime(t) FROM test_from_unix_time

query expect_fallback(not fully compatible with Spark)
query expect_fallback(Datetime pattern format: yyyy-MM-dd is unsupported)
SELECT from_unixtime(t, 'yyyy-MM-dd') FROM test_from_unix_time
Comment thread
coderfender marked this conversation as resolved.

-- literal arguments
query expect_fallback(not fully compatible with Spark)
SELECT from_unixtime(0)

query expect_fallback(not fully compatible with Spark)
query expect_fallback(Datetime pattern format: yyyy-MM-dd is unsupported)
SELECT from_unixtime(1718451045, 'yyyy-MM-dd')
Loading