Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,27 @@ under the License.

# Date/Time Expressions

<!--BEGIN:EXPR_COMPAT[datetime]-->
<!--END:EXPR_COMPAT-->
- **Hour, Minute, Second**: Incorrectly apply timezone conversion to TimestampNTZ inputs. TimestampNTZ stores local
time without timezone, so no conversion should be applied. These expressions work correctly with Timestamp inputs.
[#3180](https://github.com/apache/datafusion-comet/issues/3180)
- **TruncTimestamp (date_trunc)**: Produces incorrect results when used with non-UTC timezones. Compatible when
timezone is UTC.
[#2649](https://github.com/apache/datafusion-comet/issues/2649)

## Date and Time Functions

Comet's native implementation of date and time functions may produce different results than Spark for dates
far in the future (approximately beyond year 2100). This is because Comet uses the chrono-tz library for
timezone calculations, which has limited support for Daylight Saving Time (DST) rules beyond the IANA
time zone database's explicit transitions.

For dates within a reasonable range (approximately 1970-2100), Comet's date and time functions are compatible
with Spark. For dates beyond this range, functions that involve timezone-aware calculations (such as
`date_trunc` with timezone-aware timestamps) may produce results with incorrect DST offsets.

If you need to process dates far in the future with accurate timezone handling, consider:

- Using timezone-naive types (`timestamp_ntz`) when timezone conversion is not required
- Falling back to Spark for these specific operations
<!--BEGIN:EXPR_COMPAT[datetime]-->
<!--END:EXPR_COMPAT-->
33 changes: 23 additions & 10 deletions native/spark-expr/src/datetime_funcs/timestamp_trunc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,20 +113,33 @@ impl PhysicalExpr for TimestampTruncExpr {
let tz = self.timezone.clone();
match (timestamp, format) {
(ColumnarValue::Array(ts), ColumnarValue::Scalar(Utf8(Some(format)))) => {
let ts = array_with_timezone(
ts,
tz.clone(),
Some(&DataType::Timestamp(Microsecond, Some(tz.into()))),
)?;
// For TimestampNTZ (Timestamp(Microsecond, None)), skip timezone conversion.
// NTZ values are timezone-independent and truncation should operate directly
// on the naive microsecond values without any timezone resolution.
let is_ntz = matches!(ts.data_type(), DataType::Timestamp(Microsecond, None));
let ts = if is_ntz {
ts
} else {
array_with_timezone(
ts,
tz.clone(),
Some(&DataType::Timestamp(Microsecond, Some(tz.into()))),
)?
};
let result = timestamp_trunc_dyn(&ts, format)?;
Ok(ColumnarValue::Array(result))
}
(ColumnarValue::Array(ts), ColumnarValue::Array(formats)) => {
let ts = array_with_timezone(
ts,
tz.clone(),
Some(&DataType::Timestamp(Microsecond, Some(tz.into()))),
)?;
let is_ntz = matches!(ts.data_type(), DataType::Timestamp(Microsecond, None));
let ts = if is_ntz {
ts
} else {
array_with_timezone(
ts,
tz.clone(),
Some(&DataType::Timestamp(Microsecond, Some(tz.into()))),
)?
};
let result = timestamp_trunc_array_fmt_dyn(&ts, &formats)?;
Ok(ColumnarValue::Array(result))
}
Expand Down
23 changes: 22 additions & 1 deletion native/spark-expr/src/datetime_funcs/unix_timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,27 @@ impl ScalarUDFImpl for SparkUnixTimestamp {

match args {
[ColumnarValue::Array(array)] => match array.data_type() {
DataType::Timestamp(Microsecond, None) => {
// TimestampNTZ: No timezone conversion needed - simply divide microseconds
// by MICROS_PER_SECOND. TimestampNTZ stores local time without timezone.
let timestamp_array =
array.as_primitive::<arrow::datatypes::TimestampMicrosecondType>();

let result: PrimitiveArray<Int64Type> = if timestamp_array.null_count() == 0 {
timestamp_array
.values()
.iter()
.map(|&micros| div_floor(micros, MICROS_PER_SECOND))
.collect()
} else {
timestamp_array
.iter()
.map(|v| v.map(|micros| div_floor(micros, MICROS_PER_SECOND)))
.collect()
};

Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Timestamp(_, _) => {
let is_utc = self.timezone == "UTC";
let array = if is_utc
Expand All @@ -99,7 +120,7 @@ impl ScalarUDFImpl for SparkUnixTimestamp {
timestamp_array
.values()
.iter()
.map(|&micros| micros / MICROS_PER_SECOND)
.map(|&micros| div_floor(micros, MICROS_PER_SECOND))
.collect()
} else {
timestamp_array
Expand Down
117 changes: 114 additions & 3 deletions native/spark-expr/src/kernels/temporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

//! temporal kernels

use chrono::{DateTime, Datelike, Duration, NaiveDate, Timelike, Utc};
use chrono::{
DateTime, Datelike, Duration, LocalResult, NaiveDate, NaiveDateTime, TimeZone, Timelike, Utc,
};

use std::sync::Arc;

Expand Down Expand Up @@ -153,10 +155,23 @@ where
Ok(())
}

// Apply the Tz to the Naive Date Time,,convert to UTC, and return as microseconds in Unix epoch
// Apply the Tz to the Naive Date Time, convert to UTC, and return as microseconds in Unix epoch.
// After truncation the carried UTC offset may be wrong if the truncated time falls in a different
// DST period than the original (e.g., truncating a December/PST timestamp to QUARTER yields
// October 1 which is in PDT). We re-resolve the naive local time through the timezone so that
// chrono picks the correct offset for the target date.
#[inline]
fn as_micros_from_unix_epoch_utc(dt: Option<DateTime<Tz>>) -> i64 {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the changes here may introduce a regression in DST handling.

Could we merge #4040 first to add new regression test and make sure they still pass with this PR?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's merge #4040 first

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#4040 is merged. Rebased to make sure tests all pass

dt.unwrap().with_timezone(&Utc).timestamp_micros()
let dt = dt.unwrap();
let naive = dt.naive_local();
let tz = dt.timezone();

match tz.from_local_datetime(&naive) {
LocalResult::Single(resolved) | LocalResult::Ambiguous(resolved, _) => {
resolved.with_timezone(&Utc).timestamp_micros()
}
LocalResult::None => dt.with_timezone(&Utc).timestamp_micros(),
}
}

#[inline]
Expand Down Expand Up @@ -529,6 +544,89 @@ pub(crate) fn timestamp_trunc_dyn(
}
}

/// Convert microseconds since epoch to NaiveDateTime
#[inline]
fn micros_to_naive(micros: i64) -> Option<NaiveDateTime> {
DateTime::from_timestamp_micros(micros).map(|dt| dt.naive_utc())
}

/// Convert NaiveDateTime back to microseconds since epoch
#[inline]
fn naive_to_micros(dt: NaiveDateTime) -> i64 {
dt.and_utc().timestamp_micros()
}

/// Resolve a truncation format string to the corresponding NaiveDateTime truncation function.
fn ntz_trunc_fn_for_format(
format: &str,
) -> Result<fn(NaiveDateTime) -> Option<NaiveDateTime>, SparkError> {
match format.to_uppercase().as_str() {
"YEAR" | "YYYY" | "YY" => Ok(trunc_date_to_year),
"QUARTER" => Ok(trunc_date_to_quarter),
"MONTH" | "MON" | "MM" => Ok(trunc_date_to_month),
"WEEK" => Ok(trunc_date_to_week),
"DAY" | "DD" => Ok(trunc_date_to_day),
"HOUR" => Ok(trunc_date_to_hour),
"MINUTE" => Ok(trunc_date_to_minute),
"SECOND" => Ok(trunc_date_to_second),
"MILLISECOND" => Ok(trunc_date_to_ms),
"MICROSECOND" => Ok(trunc_date_to_microsec),
_ => Err(SparkError::Internal(format!(
"Unsupported format: {format:?} for function 'timestamp_trunc'"
))),
}
}

/// Truncate a TimestampNTZ array without any timezone conversion.
/// NTZ values are timezone-independent; we treat the raw microseconds as a naive datetime.
fn timestamp_trunc_ntz<T>(
array: &PrimitiveArray<T>,
format: String,
) -> Result<TimestampMicrosecondArray, SparkError>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
let trunc_fn = ntz_trunc_fn_for_format(&format)?;

let result: TimestampMicrosecondArray = array
.iter()
.map(|opt_val| {
opt_val.and_then(|v| {
let micros: i64 = v.into();
micros_to_naive(micros)
.and_then(trunc_fn)
.map(naive_to_micros)
})
})
.collect();

Ok(result)
}

/// Truncate a single NTZ value and append to builder
fn timestamp_trunc_ntz_single<F>(
value: Option<i64>,
builder: &mut PrimitiveBuilder<TimestampMicrosecondType>,
op: F,
) -> Result<(), SparkError>
where
F: Fn(NaiveDateTime) -> Option<NaiveDateTime>,
{
match value {
Some(micros) => match micros_to_naive(micros).and_then(op) {
Some(truncated) => builder.append_value(naive_to_micros(truncated)),
None => {
return Err(SparkError::Internal(
"Unable to truncate NTZ timestamp".to_string(),
))
}
},
None => builder.append_null(),
}
Ok(())
}

pub(crate) fn timestamp_trunc<T>(
array: &PrimitiveArray<T>,
format: String,
Expand All @@ -540,6 +638,10 @@ where
let builder = TimestampMicrosecondBuilder::with_capacity(array.len());
let iter = ArrayIter::new(array);
match array.data_type() {
DataType::Timestamp(TimeUnit::Microsecond, None) => {
// TimestampNTZ: operate directly on naive microsecond values without timezone
timestamp_trunc_ntz(array, format)
}
DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => {
match format.to_uppercase().as_str() {
"YEAR" | "YYYY" | "YY" => {
Expand Down Expand Up @@ -687,6 +789,15 @@ macro_rules! timestamp_trunc_array_fmt_helper {
"lengths of values array and format array must be the same"
);
match $datatype {
DataType::Timestamp(TimeUnit::Microsecond, None) => {
// TimestampNTZ: operate directly on naive microsecond values
for (index, val) in iter.enumerate() {
let micros_val = val.map(|v| i64::from(v));
let trunc_fn = ntz_trunc_fn_for_format($formats.value(index))?;
timestamp_trunc_ntz_single(micros_val, &mut builder, trunc_fn)?;
}
Ok(builder.finish())
}
DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => {
let tz: Tz = tz.parse()?;
for (index, val) in iter.enumerate() {
Expand Down
15 changes: 8 additions & 7 deletions spark/src/main/scala/org/apache/comet/serde/datetime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,11 @@ object CometHour extends CometExpressionSerde[Hour] {
override def getIncompatibleReasons(): Seq[String] = Seq(incompatReason)

override def getSupportLevel(expr: Hour): SupportLevel = {
if (expr.child.dataType.typeName == "timestamp_ntz") {
Incompatible(Some(incompatReason))
if (expr.child.dataType == TimestampNTZType) {
Incompatible(
Some(
"Incorrectly applies timezone conversion to TimestampNTZ inputs" +
" (https://github.com/apache/datafusion-comet/issues/3180)"))
} else {
Compatible()
}
Expand Down Expand Up @@ -221,7 +224,7 @@ object CometHour extends CometExpressionSerde[Hour] {
object CometMinute extends CometExpressionSerde[Minute] {

override def getSupportLevel(expr: Minute): SupportLevel = {
if (expr.child.dataType.typeName == "timestamp_ntz") {
if (expr.child.dataType == TimestampNTZType) {
Incompatible(
Some(
"Incorrectly applies timezone conversion to TimestampNTZ inputs" +
Expand Down Expand Up @@ -259,7 +262,7 @@ object CometMinute extends CometExpressionSerde[Minute] {
object CometSecond extends CometExpressionSerde[Second] {

override def getSupportLevel(expr: Second): SupportLevel = {
if (expr.child.dataType.typeName == "timestamp_ntz") {
if (expr.child.dataType == TimestampNTZType) {
Incompatible(
Some(
"Incorrectly applies timezone conversion to TimestampNTZ inputs" +
Expand Down Expand Up @@ -297,11 +300,9 @@ object CometSecond extends CometExpressionSerde[Second] {
object CometUnixTimestamp extends CometExpressionSerde[UnixTimestamp] {

private def isSupportedInputType(expr: UnixTimestamp): Boolean = {
// Note: TimestampNTZType is not supported because Comet incorrectly applies
// timezone conversion to TimestampNTZ values. TimestampNTZ stores local time
// without timezone, so no conversion should be applied.
expr.children.head.dataType match {
case TimestampType | DateType => true
case TimestampNTZType => true
case _ => false
}
}
Expand Down
8 changes: 7 additions & 1 deletion spark/src/test/scala/org/apache/comet/CometCastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1594,7 +1594,13 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
// CAST from TimestampNTZType

test("cast TimestampNTZType to StringType") {
castTest(generateTimestampNTZ(), DataTypes.StringType)
// TimestampNTZ is timezone-independent, so casting to string should produce
// the same result regardless of the session timezone.
for (tz <- representativeTimezones) {
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) {
castTest(generateTimestampNTZ(), DataTypes.StringType)
}
}
}

test("cast TimestampNTZType to DateType") {
Expand Down
Loading
Loading