Skip to content

Commit 507b9f8

Browse files
andygroveparthchandra
authored andcommitted
feat: Add TimestampNTZType support for casts and unix_timestamp
1 parent 0c2fa63 commit 507b9f8

6 files changed

Lines changed: 372 additions & 28 deletions

File tree

docs/source/user-guide/latest/compatibility.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,22 @@ the [Comet Supported Expressions Guide](expressions.md) for more information on
8282
- **StructsToJson (to_json)**: Does not support `+Infinity` and `-Infinity` for numeric types (float, double).
8383
[#3016](https://github.com/apache/datafusion-comet/issues/3016)
8484

85+
## Date and Time Functions
86+
87+
Comet's native implementation of date and time functions may produce different results than Spark for dates
88+
far in the future (approximately beyond year 2100). This is because Comet uses the chrono-tz library for
89+
timezone calculations, which has limited support for Daylight Saving Time (DST) rules beyond the IANA
90+
time zone database's explicit transitions.
91+
92+
For dates within a reasonable range (approximately 1970-2100), Comet's date and time functions are compatible
93+
with Spark. For dates beyond this range, functions that involve timezone-aware calculations (such as
94+
`date_trunc` with timezone-aware timestamps) may produce results with incorrect DST offsets.
95+
96+
If you need to process dates far in the future with accurate timezone handling, consider:
97+
98+
- Using timezone-naive types (`timestamp_ntz`) when timezone conversion is not required
99+
- Falling back to Spark for these specific operations
100+
85101
## Regular Expressions
86102

87103
Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's

native/spark-expr/src/datetime_funcs/timestamp_trunc.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -113,20 +113,33 @@ impl PhysicalExpr for TimestampTruncExpr {
113113
let tz = self.timezone.clone();
114114
match (timestamp, format) {
115115
(ColumnarValue::Array(ts), ColumnarValue::Scalar(Utf8(Some(format)))) => {
116-
let ts = array_with_timezone(
117-
ts,
118-
tz.clone(),
119-
Some(&DataType::Timestamp(Microsecond, Some(tz.into()))),
120-
)?;
116+
// For TimestampNTZ (Timestamp(Microsecond, None)), skip timezone conversion.
117+
// NTZ values are timezone-independent and truncation should operate directly
118+
// on the naive microsecond values without any timezone resolution.
119+
let is_ntz = matches!(ts.data_type(), DataType::Timestamp(Microsecond, None));
120+
let ts = if is_ntz {
121+
ts
122+
} else {
123+
array_with_timezone(
124+
ts,
125+
tz.clone(),
126+
Some(&DataType::Timestamp(Microsecond, Some(tz.into()))),
127+
)?
128+
};
121129
let result = timestamp_trunc_dyn(&ts, format)?;
122130
Ok(ColumnarValue::Array(result))
123131
}
124132
(ColumnarValue::Array(ts), ColumnarValue::Array(formats)) => {
125-
let ts = array_with_timezone(
126-
ts,
127-
tz.clone(),
128-
Some(&DataType::Timestamp(Microsecond, Some(tz.into()))),
129-
)?;
133+
let is_ntz = matches!(ts.data_type(), DataType::Timestamp(Microsecond, None));
134+
let ts = if is_ntz {
135+
ts
136+
} else {
137+
array_with_timezone(
138+
ts,
139+
tz.clone(),
140+
Some(&DataType::Timestamp(Microsecond, Some(tz.into()))),
141+
)?
142+
};
130143
let result = timestamp_trunc_array_fmt_dyn(&ts, &formats)?;
131144
Ok(ColumnarValue::Array(result))
132145
}

native/spark-expr/src/datetime_funcs/unix_timestamp.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,27 @@ impl ScalarUDFImpl for SparkUnixTimestamp {
7878

7979
match args {
8080
[ColumnarValue::Array(array)] => match array.data_type() {
81+
DataType::Timestamp(Microsecond, None) => {
82+
// TimestampNTZ: No timezone conversion needed - simply divide microseconds
83+
// by MICROS_PER_SECOND. TimestampNTZ stores local time without timezone.
84+
let timestamp_array =
85+
array.as_primitive::<arrow::datatypes::TimestampMicrosecondType>();
86+
87+
let result: PrimitiveArray<Int64Type> = if timestamp_array.null_count() == 0 {
88+
timestamp_array
89+
.values()
90+
.iter()
91+
.map(|&micros| micros / MICROS_PER_SECOND)
92+
.collect()
93+
} else {
94+
timestamp_array
95+
.iter()
96+
.map(|v| v.map(|micros| div_floor(micros, MICROS_PER_SECOND)))
97+
.collect()
98+
};
99+
100+
Ok(ColumnarValue::Array(Arc::new(result)))
101+
}
81102
DataType::Timestamp(_, _) => {
82103
let is_utc = self.timezone == "UTC";
83104
let array = if is_utc

native/spark-expr/src/kernels/temporal.rs

Lines changed: 163 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717

1818
//! temporal kernels
1919
20-
use chrono::{DateTime, Datelike, Duration, NaiveDate, Timelike, Utc};
20+
use chrono::{
21+
DateTime, Datelike, Duration, LocalResult, NaiveDate, NaiveDateTime, Offset, TimeZone,
22+
Timelike, Utc,
23+
};
2124

2225
use std::sync::Arc;
2326

@@ -153,10 +156,30 @@ where
153156
Ok(())
154157
}
155158

156-
// Apply the Tz to the Naive Date Time,,convert to UTC, and return as microseconds in Unix epoch
159+
// Apply the Tz to the Naive Date Time, convert to UTC, and return as microseconds in Unix epoch.
160+
// This function re-interprets the local datetime in the timezone to ensure the correct DST offset
161+
// is used for the target date (not the original date's offset). This is important when truncation
162+
// changes the date to a different DST period (e.g., from December/PST to October/PDT).
163+
//
164+
// Note: For far-future dates (approximately beyond year 2100), chrono-tz may not accurately
165+
// calculate DST transitions, which can result in incorrect offsets. See the compatibility
166+
// guide for more information.
157167
#[inline]
158168
fn as_micros_from_unix_epoch_utc(dt: Option<DateTime<Tz>>) -> i64 {
159-
dt.unwrap().with_timezone(&Utc).timestamp_micros()
169+
let dt = dt.unwrap();
170+
let naive = dt.naive_local();
171+
let tz = dt.timezone();
172+
173+
// Re-interpret the local time in the timezone to get the correct DST offset
174+
// for the truncated date. Use noon to avoid DST gaps that occur around midnight.
175+
let noon = naive.date().and_hms_opt(12, 0, 0).unwrap_or(naive);
176+
177+
let offset = match tz.offset_from_local_datetime(&noon) {
178+
LocalResult::Single(off) | LocalResult::Ambiguous(off, _) => off.fix(),
179+
LocalResult::None => return dt.with_timezone(&Utc).timestamp_micros(),
180+
};
181+
182+
(naive - offset).and_utc().timestamp_micros()
160183
}
161184

162185
#[inline]
@@ -529,6 +552,85 @@ pub(crate) fn timestamp_trunc_dyn(
529552
}
530553
}
531554

555+
/// Convert microseconds since epoch to NaiveDateTime
556+
#[inline]
557+
fn micros_to_naive(micros: i64) -> Option<NaiveDateTime> {
558+
DateTime::from_timestamp_micros(micros).map(|dt| dt.naive_utc())
559+
}
560+
561+
/// Convert NaiveDateTime back to microseconds since epoch
562+
#[inline]
563+
fn naive_to_micros(dt: NaiveDateTime) -> i64 {
564+
dt.and_utc().timestamp_micros()
565+
}
566+
567+
/// Truncate a TimestampNTZ array without any timezone conversion.
568+
/// NTZ values are timezone-independent; we treat the raw microseconds as a naive datetime.
569+
fn timestamp_trunc_ntz<T>(
570+
array: &PrimitiveArray<T>,
571+
format: String,
572+
) -> Result<TimestampMicrosecondArray, SparkError>
573+
where
574+
T: ArrowTemporalType + ArrowNumericType,
575+
i64: From<T::Native>,
576+
{
577+
let trunc_fn: fn(NaiveDateTime) -> Option<NaiveDateTime> = match format.to_uppercase().as_str()
578+
{
579+
"YEAR" | "YYYY" | "YY" => trunc_date_to_year,
580+
"QUARTER" => trunc_date_to_quarter,
581+
"MONTH" | "MON" | "MM" => trunc_date_to_month,
582+
"WEEK" => trunc_date_to_week,
583+
"DAY" | "DD" => trunc_date_to_day,
584+
"HOUR" => trunc_date_to_hour,
585+
"MINUTE" => trunc_date_to_minute,
586+
"SECOND" => trunc_date_to_second,
587+
"MILLISECOND" => trunc_date_to_ms,
588+
"MICROSECOND" => trunc_date_to_microsec,
589+
_ => {
590+
return Err(SparkError::Internal(format!(
591+
"Unsupported format: {format:?} for function 'timestamp_trunc'"
592+
)))
593+
}
594+
};
595+
596+
let result: TimestampMicrosecondArray = array
597+
.iter()
598+
.map(|opt_val| {
599+
opt_val.and_then(|v| {
600+
let micros: i64 = v.into();
601+
micros_to_naive(micros)
602+
.and_then(trunc_fn)
603+
.map(naive_to_micros)
604+
})
605+
})
606+
.collect();
607+
608+
Ok(result)
609+
}
610+
611+
/// Truncate a single NTZ value and append to builder
612+
fn timestamp_trunc_ntz_single<F>(
613+
value: Option<i64>,
614+
builder: &mut PrimitiveBuilder<TimestampMicrosecondType>,
615+
op: F,
616+
) -> Result<(), SparkError>
617+
where
618+
F: Fn(NaiveDateTime) -> Option<NaiveDateTime>,
619+
{
620+
match value {
621+
Some(micros) => match micros_to_naive(micros).and_then(op) {
622+
Some(truncated) => builder.append_value(naive_to_micros(truncated)),
623+
None => {
624+
return Err(SparkError::Internal(
625+
"Unable to truncate NTZ timestamp".to_string(),
626+
))
627+
}
628+
},
629+
None => builder.append_null(),
630+
}
631+
Ok(())
632+
}
633+
532634
pub(crate) fn timestamp_trunc<T>(
533635
array: &PrimitiveArray<T>,
534636
format: String,
@@ -540,6 +642,10 @@ where
540642
let builder = TimestampMicrosecondBuilder::with_capacity(array.len());
541643
let iter = ArrayIter::new(array);
542644
match array.data_type() {
645+
DataType::Timestamp(TimeUnit::Microsecond, None) => {
646+
// TimestampNTZ: operate directly on naive microsecond values without timezone
647+
timestamp_trunc_ntz(array, format)
648+
}
543649
DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => {
544650
match format.to_uppercase().as_str() {
545651
"YEAR" | "YYYY" | "YY" => {
@@ -687,6 +793,60 @@ macro_rules! timestamp_trunc_array_fmt_helper {
687793
"lengths of values array and format array must be the same"
688794
);
689795
match $datatype {
796+
DataType::Timestamp(TimeUnit::Microsecond, None) => {
797+
// TimestampNTZ: operate directly on naive microsecond values
798+
for (index, val) in iter.enumerate() {
799+
let micros_val = val.map(|v| i64::from(v));
800+
let op_result = match $formats.value(index).to_uppercase().as_str() {
801+
"YEAR" | "YYYY" | "YY" => {
802+
timestamp_trunc_ntz_single(micros_val, &mut builder, trunc_date_to_year)
803+
}
804+
"QUARTER" => timestamp_trunc_ntz_single(
805+
micros_val,
806+
&mut builder,
807+
trunc_date_to_quarter,
808+
),
809+
"MONTH" | "MON" | "MM" => timestamp_trunc_ntz_single(
810+
micros_val,
811+
&mut builder,
812+
trunc_date_to_month,
813+
),
814+
"WEEK" => {
815+
timestamp_trunc_ntz_single(micros_val, &mut builder, trunc_date_to_week)
816+
}
817+
"DAY" | "DD" => {
818+
timestamp_trunc_ntz_single(micros_val, &mut builder, trunc_date_to_day)
819+
}
820+
"HOUR" => {
821+
timestamp_trunc_ntz_single(micros_val, &mut builder, trunc_date_to_hour)
822+
}
823+
"MINUTE" => timestamp_trunc_ntz_single(
824+
micros_val,
825+
&mut builder,
826+
trunc_date_to_minute,
827+
),
828+
"SECOND" => timestamp_trunc_ntz_single(
829+
micros_val,
830+
&mut builder,
831+
trunc_date_to_second,
832+
),
833+
"MILLISECOND" => {
834+
timestamp_trunc_ntz_single(micros_val, &mut builder, trunc_date_to_ms)
835+
}
836+
"MICROSECOND" => timestamp_trunc_ntz_single(
837+
micros_val,
838+
&mut builder,
839+
trunc_date_to_microsec,
840+
),
841+
_ => Err(SparkError::Internal(format!(
842+
"Unsupported format: {:?} for function 'timestamp_trunc'",
843+
$formats.value(index)
844+
))),
845+
};
846+
op_result?
847+
}
848+
Ok(builder.finish())
849+
}
690850
DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => {
691851
let tz: Tz = tz.parse()?;
692852
for (index, val) in iter.enumerate() {

spark/src/main/scala/org/apache/comet/serde/datetime.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -295,11 +295,9 @@ object CometSecond extends CometExpressionSerde[Second] {
295295
object CometUnixTimestamp extends CometExpressionSerde[UnixTimestamp] {
296296

297297
private def isSupportedInputType(expr: UnixTimestamp): Boolean = {
298-
// Note: TimestampNTZType is not supported because Comet incorrectly applies
299-
// timezone conversion to TimestampNTZ values. TimestampNTZ stores local time
300-
// without timezone, so no conversion should be applied.
301298
expr.children.head.dataType match {
302299
case TimestampType | DateType => true
300+
case dt if dt.typeName == "timestamp_ntz" => true
303301
case _ => false
304302
}
305303
}

0 commit comments

Comments
 (0)