Skip to content

Commit 2775b5e

Browse files
andygroveparthchandra
authored andcommitted
feat: Add TimestampNTZType support for casts and unix_timestamp
1 parent 0c13675 commit 2775b5e

5 files changed

Lines changed: 108 additions & 13 deletions

File tree

docs/source/user-guide/latest/compatibility.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,22 @@ the [Comet Supported Expressions Guide](expressions.md) for more information on
8888
- **StructsToJson (to_json)**: Does not support `+Infinity` and `-Infinity` for numeric types (float, double).
8989
[#3016](https://github.com/apache/datafusion-comet/issues/3016)
9090

91+
## Date and Time Functions
92+
93+
Comet's native implementation of date and time functions may produce different results than Spark for dates
94+
far in the future (approximately beyond year 2100). This is because Comet uses the chrono-tz library for
95+
timezone calculations, which has limited support for Daylight Saving Time (DST) rules beyond the IANA
96+
time zone database's explicit transitions.
97+
98+
For dates within a reasonable range (approximately 1970-2100), Comet's date and time functions are compatible
99+
with Spark. For dates beyond this range, functions that involve timezone-aware calculations (such as
100+
`date_trunc` with timezone-aware timestamps) may produce results with incorrect DST offsets.
101+
102+
If you need to process dates far in the future with accurate timezone handling, consider:
103+
104+
- Using timezone-naive types (`timestamp_ntz`) when timezone conversion is not required
105+
- Falling back to Spark for these specific operations
106+
91107
## Regular Expressions
92108

93109
Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's

native/spark-expr/src/datetime_funcs/unix_timestamp.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,27 @@ impl ScalarUDFImpl for SparkUnixTimestamp {
7878

7979
match args {
8080
[ColumnarValue::Array(array)] => match array.data_type() {
81+
DataType::Timestamp(Microsecond, None) => {
82+
// TimestampNTZ: No timezone conversion needed - simply divide microseconds
83+
// by MICROS_PER_SECOND. TimestampNTZ stores local time without timezone.
84+
let timestamp_array =
85+
array.as_primitive::<arrow::datatypes::TimestampMicrosecondType>();
86+
87+
let result: PrimitiveArray<Int64Type> = if timestamp_array.null_count() == 0 {
88+
timestamp_array
89+
.values()
90+
.iter()
91+
.map(|&micros| micros / MICROS_PER_SECOND)
92+
.collect()
93+
} else {
94+
timestamp_array
95+
.iter()
96+
.map(|v| v.map(|micros| div_floor(micros, MICROS_PER_SECOND)))
97+
.collect()
98+
};
99+
100+
Ok(ColumnarValue::Array(Arc::new(result)))
101+
}
81102
DataType::Timestamp(_, _) => {
82103
let is_utc = self.timezone == "UTC";
83104
let array = if is_utc

native/spark-expr/src/kernels/temporal.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
//! temporal kernels
1919
20-
use chrono::{DateTime, Datelike, Duration, NaiveDate, Timelike, Utc};
20+
use chrono::{
21+
DateTime, Datelike, Duration, LocalResult, NaiveDate, Offset, TimeZone, Timelike, Utc,
22+
};
2123

2224
use std::sync::Arc;
2325

@@ -153,10 +155,30 @@ where
153155
Ok(())
154156
}
155157

156-
// Apply the Tz to the Naive Date Time,,convert to UTC, and return as microseconds in Unix epoch
158+
// Apply the Tz to the Naive Date Time, convert to UTC, and return as microseconds in Unix epoch.
159+
// This function re-interprets the local datetime in the timezone to ensure the correct DST offset
160+
// is used for the target date (not the original date's offset). This is important when truncation
161+
// changes the date to a different DST period (e.g., from December/PST to October/PDT).
162+
//
163+
// Note: For far-future dates (approximately beyond year 2100), chrono-tz may not accurately
164+
// calculate DST transitions, which can result in incorrect offsets. See the compatibility
165+
// guide for more information.
157166
#[inline]
158167
fn as_micros_from_unix_epoch_utc(dt: Option<DateTime<Tz>>) -> i64 {
159-
dt.unwrap().with_timezone(&Utc).timestamp_micros()
168+
let dt = dt.unwrap();
169+
let naive = dt.naive_local();
170+
let tz = dt.timezone();
171+
172+
// Re-interpret the local time in the timezone to get the correct DST offset
173+
// for the truncated date. Use noon to avoid DST gaps that occur around midnight.
174+
let noon = naive.date().and_hms_opt(12, 0, 0).unwrap_or(naive);
175+
176+
let offset = match tz.offset_from_local_datetime(&noon) {
177+
LocalResult::Single(off) | LocalResult::Ambiguous(off, _) => off.fix(),
178+
LocalResult::None => return dt.with_timezone(&Utc).timestamp_micros(),
179+
};
180+
181+
(naive - offset).and_utc().timestamp_micros()
160182
}
161183

162184
#[inline]

spark/src/main/scala/org/apache/comet/serde/datetime.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -295,11 +295,9 @@ object CometSecond extends CometExpressionSerde[Second] {
295295
object CometUnixTimestamp extends CometExpressionSerde[UnixTimestamp] {
296296

297297
private def isSupportedInputType(expr: UnixTimestamp): Boolean = {
298-
// Note: TimestampNTZType is not supported because Comet incorrectly applies
299-
// timezone conversion to TimestampNTZ values. TimestampNTZ stores local time
300-
// without timezone, so no conversion should be applied.
301298
expr.children.head.dataType match {
302299
case TimestampType | DateType => true
300+
case dt if dt.typeName == "timestamp_ntz" => true
303301
case _ => false
304302
}
305303
}

spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -139,17 +139,55 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH
139139
}
140140
}
141141

142-
test("unix_timestamp - timestamp_ntz input falls back to Spark") {
143-
// TimestampNTZ is not supported because Comet incorrectly applies timezone
144-
// conversion. TimestampNTZ stores local time without timezone, so the unix
145-
// timestamp should just be the value divided by microseconds per second.
142+
test("unix_timestamp - timestamp_ntz input") {
146143
val r = new Random(42)
147144
val ntzSchema = StructType(Seq(StructField("ts_ntz", DataTypes.TimestampNTZType, true)))
148145
val ntzDF = FuzzDataGenerator.generateDataFrame(r, spark, ntzSchema, 100, DataGenOptions())
149146
ntzDF.createOrReplaceTempView("ntz_tbl")
150-
checkSparkAnswerAndFallbackReason(
151-
"SELECT ts_ntz, unix_timestamp(ts_ntz) from ntz_tbl order by ts_ntz",
152-
"unix_timestamp does not support input type: TimestampNTZType")
147+
checkSparkAnswerAndOperator(
148+
"SELECT ts_ntz, unix_timestamp(ts_ntz) from ntz_tbl order by ts_ntz")
149+
}
150+
151+
test("hour/minute/second - timestamp_ntz input") {
152+
val r = new Random(42)
153+
val ntzSchema = StructType(Seq(StructField("ts_ntz", DataTypes.TimestampNTZType, true)))
154+
val ntzDF = FuzzDataGenerator.generateDataFrame(r, spark, ntzSchema, 100, DataGenOptions())
155+
ntzDF.createOrReplaceTempView("ntz_tbl")
156+
checkSparkAnswerAndOperator(
157+
"SELECT ts_ntz, hour(ts_ntz), minute(ts_ntz), second(ts_ntz) from ntz_tbl order by ts_ntz")
158+
}
159+
160+
test("date_trunc - timestamp_ntz input") {
161+
val r = new Random(42)
162+
val ntzSchema = StructType(Seq(StructField("ts_ntz", DataTypes.TimestampNTZType, true)))
163+
val reasonableBaseDate =
164+
new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2024-06-15 12:00:00").getTime
165+
val ntzDF = FuzzDataGenerator.generateDataFrame(
166+
r,
167+
spark,
168+
ntzSchema,
169+
100,
170+
DataGenOptions(baseDate = reasonableBaseDate))
171+
ntzDF.createOrReplaceTempView("ntz_tbl")
172+
for (format <- CometTruncTimestamp.supportedFormats) {
173+
checkSparkAnswerAndOperator(
174+
s"SELECT ts_ntz, date_trunc('$format', ts_ntz) from ntz_tbl order by ts_ntz")
175+
}
176+
}
177+
178+
test("date_format - timestamp_ntz input") {
179+
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
180+
val r = new Random(42)
181+
val ntzSchema = StructType(Seq(StructField("ts_ntz", DataTypes.TimestampNTZType, true)))
182+
val ntzDF = FuzzDataGenerator.generateDataFrame(r, spark, ntzSchema, 100, DataGenOptions())
183+
ntzDF.createOrReplaceTempView("ntz_tbl")
184+
val supportedFormats =
185+
CometDateFormat.supportedFormats.keys.toSeq.filterNot(_.contains("'"))
186+
for (format <- supportedFormats) {
187+
checkSparkAnswerAndOperator(
188+
s"SELECT ts_ntz, date_format(ts_ntz, '$format') from ntz_tbl order by ts_ntz")
189+
}
190+
}
153191
}
154192

155193
test("unix_timestamp - string input falls back to Spark") {

0 commit comments

Comments
 (0)