diff --git a/docs/source/user-guide/latest/compatibility/expressions/datetime.md b/docs/source/user-guide/latest/compatibility/expressions/datetime.md index e07bde3c23..78ed131a89 100644 --- a/docs/source/user-guide/latest/compatibility/expressions/datetime.md +++ b/docs/source/user-guide/latest/compatibility/expressions/datetime.md @@ -19,5 +19,27 @@ under the License. # Date/Time Expressions - - +- **Hour, Minute, Second**: Incorrectly apply timezone conversion to TimestampNTZ inputs. TimestampNTZ stores local + time without timezone, so no conversion should be applied. These expressions work correctly with Timestamp inputs. + [#3180](https://github.com/apache/datafusion-comet/issues/3180) +- **TruncTimestamp (date_trunc)**: Produces incorrect results when used with non-UTC timezones. Compatible when + timezone is UTC. + [#2649](https://github.com/apache/datafusion-comet/issues/2649) + +## Date and Time Functions + +Comet's native implementation of date and time functions may produce different results than Spark for dates +far in the future (approximately beyond year 2100). This is because Comet uses the chrono-tz library for +timezone calculations, which has limited support for Daylight Saving Time (DST) rules beyond the IANA +time zone database's explicit transitions. + +For dates within a reasonable range (approximately 1970-2100), Comet's date and time functions are compatible +with Spark. For dates beyond this range, functions that involve timezone-aware calculations (such as +`date_trunc` with timezone-aware timestamps) may produce results with incorrect DST offsets. + +If you need to process dates far in the future with accurate timezone handling, consider: + +- Using timezone-naive types (`timestamp_ntz`) when timezone conversion is not required +- Falling back to Spark for these specific operations + + diff --git a/native/spark-expr/src/datetime_funcs/timestamp_trunc.rs b/native/spark-expr/src/datetime_funcs/timestamp_trunc.rs index 2d7a571b76..3435d3ee50 100644 --- a/native/spark-expr/src/datetime_funcs/timestamp_trunc.rs +++ b/native/spark-expr/src/datetime_funcs/timestamp_trunc.rs @@ -113,20 +113,33 @@ impl PhysicalExpr for TimestampTruncExpr { let tz = self.timezone.clone(); match (timestamp, format) { (ColumnarValue::Array(ts), ColumnarValue::Scalar(Utf8(Some(format)))) => { - let ts = array_with_timezone( - ts, - tz.clone(), - Some(&DataType::Timestamp(Microsecond, Some(tz.into()))), - )?; + // For TimestampNTZ (Timestamp(Microsecond, None)), skip timezone conversion. + // NTZ values are timezone-independent and truncation should operate directly + // on the naive microsecond values without any timezone resolution. + let is_ntz = matches!(ts.data_type(), DataType::Timestamp(Microsecond, None)); + let ts = if is_ntz { + ts + } else { + array_with_timezone( + ts, + tz.clone(), + Some(&DataType::Timestamp(Microsecond, Some(tz.into()))), + )? + }; let result = timestamp_trunc_dyn(&ts, format)?; Ok(ColumnarValue::Array(result)) } (ColumnarValue::Array(ts), ColumnarValue::Array(formats)) => { - let ts = array_with_timezone( - ts, - tz.clone(), - Some(&DataType::Timestamp(Microsecond, Some(tz.into()))), - )?; + let is_ntz = matches!(ts.data_type(), DataType::Timestamp(Microsecond, None)); + let ts = if is_ntz { + ts + } else { + array_with_timezone( + ts, + tz.clone(), + Some(&DataType::Timestamp(Microsecond, Some(tz.into()))), + )? + }; let result = timestamp_trunc_array_fmt_dyn(&ts, &formats)?; Ok(ColumnarValue::Array(result)) } diff --git a/native/spark-expr/src/datetime_funcs/unix_timestamp.rs b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs index c4f1576293..f8e932f396 100644 --- a/native/spark-expr/src/datetime_funcs/unix_timestamp.rs +++ b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs @@ -78,6 +78,27 @@ impl ScalarUDFImpl for SparkUnixTimestamp { match args { [ColumnarValue::Array(array)] => match array.data_type() { + DataType::Timestamp(Microsecond, None) => { + // TimestampNTZ: No timezone conversion needed - simply divide microseconds + // by MICROS_PER_SECOND. TimestampNTZ stores local time without timezone. + let timestamp_array = + array.as_primitive::(); + + let result: PrimitiveArray = if timestamp_array.null_count() == 0 { + timestamp_array + .values() + .iter() + .map(|µs| div_floor(micros, MICROS_PER_SECOND)) + .collect() + } else { + timestamp_array + .iter() + .map(|v| v.map(|micros| div_floor(micros, MICROS_PER_SECOND))) + .collect() + }; + + Ok(ColumnarValue::Array(Arc::new(result))) + } DataType::Timestamp(_, _) => { let is_utc = self.timezone == "UTC"; let array = if is_utc @@ -99,7 +120,7 @@ impl ScalarUDFImpl for SparkUnixTimestamp { timestamp_array .values() .iter() - .map(|µs| micros / MICROS_PER_SECOND) + .map(|µs| div_floor(micros, MICROS_PER_SECOND)) .collect() } else { timestamp_array diff --git a/native/spark-expr/src/kernels/temporal.rs b/native/spark-expr/src/kernels/temporal.rs index 2668e5095a..653789a049 100644 --- a/native/spark-expr/src/kernels/temporal.rs +++ b/native/spark-expr/src/kernels/temporal.rs @@ -17,7 +17,9 @@ //! temporal kernels -use chrono::{DateTime, Datelike, Duration, NaiveDate, Timelike, Utc}; +use chrono::{ + DateTime, Datelike, Duration, LocalResult, NaiveDate, NaiveDateTime, TimeZone, Timelike, Utc, +}; use std::sync::Arc; @@ -153,10 +155,23 @@ where Ok(()) } -// Apply the Tz to the Naive Date Time,,convert to UTC, and return as microseconds in Unix epoch +// Apply the Tz to the Naive Date Time, convert to UTC, and return as microseconds in Unix epoch. +// After truncation the carried UTC offset may be wrong if the truncated time falls in a different +// DST period than the original (e.g., truncating a December/PST timestamp to QUARTER yields +// October 1 which is in PDT). We re-resolve the naive local time through the timezone so that +// chrono picks the correct offset for the target date. #[inline] fn as_micros_from_unix_epoch_utc(dt: Option>) -> i64 { - dt.unwrap().with_timezone(&Utc).timestamp_micros() + let dt = dt.unwrap(); + let naive = dt.naive_local(); + let tz = dt.timezone(); + + match tz.from_local_datetime(&naive) { + LocalResult::Single(resolved) | LocalResult::Ambiguous(resolved, _) => { + resolved.with_timezone(&Utc).timestamp_micros() + } + LocalResult::None => dt.with_timezone(&Utc).timestamp_micros(), + } } #[inline] @@ -529,6 +544,89 @@ pub(crate) fn timestamp_trunc_dyn( } } +/// Convert microseconds since epoch to NaiveDateTime +#[inline] +fn micros_to_naive(micros: i64) -> Option { + DateTime::from_timestamp_micros(micros).map(|dt| dt.naive_utc()) +} + +/// Convert NaiveDateTime back to microseconds since epoch +#[inline] +fn naive_to_micros(dt: NaiveDateTime) -> i64 { + dt.and_utc().timestamp_micros() +} + +/// Resolve a truncation format string to the corresponding NaiveDateTime truncation function. +fn ntz_trunc_fn_for_format( + format: &str, +) -> Result Option, SparkError> { + match format.to_uppercase().as_str() { + "YEAR" | "YYYY" | "YY" => Ok(trunc_date_to_year), + "QUARTER" => Ok(trunc_date_to_quarter), + "MONTH" | "MON" | "MM" => Ok(trunc_date_to_month), + "WEEK" => Ok(trunc_date_to_week), + "DAY" | "DD" => Ok(trunc_date_to_day), + "HOUR" => Ok(trunc_date_to_hour), + "MINUTE" => Ok(trunc_date_to_minute), + "SECOND" => Ok(trunc_date_to_second), + "MILLISECOND" => Ok(trunc_date_to_ms), + "MICROSECOND" => Ok(trunc_date_to_microsec), + _ => Err(SparkError::Internal(format!( + "Unsupported format: {format:?} for function 'timestamp_trunc'" + ))), + } +} + +/// Truncate a TimestampNTZ array without any timezone conversion. +/// NTZ values are timezone-independent; we treat the raw microseconds as a naive datetime. +fn timestamp_trunc_ntz( + array: &PrimitiveArray, + format: String, +) -> Result +where + T: ArrowTemporalType + ArrowNumericType, + i64: From, +{ + let trunc_fn = ntz_trunc_fn_for_format(&format)?; + + let result: TimestampMicrosecondArray = array + .iter() + .map(|opt_val| { + opt_val.and_then(|v| { + let micros: i64 = v.into(); + micros_to_naive(micros) + .and_then(trunc_fn) + .map(naive_to_micros) + }) + }) + .collect(); + + Ok(result) +} + +/// Truncate a single NTZ value and append to builder +fn timestamp_trunc_ntz_single( + value: Option, + builder: &mut PrimitiveBuilder, + op: F, +) -> Result<(), SparkError> +where + F: Fn(NaiveDateTime) -> Option, +{ + match value { + Some(micros) => match micros_to_naive(micros).and_then(op) { + Some(truncated) => builder.append_value(naive_to_micros(truncated)), + None => { + return Err(SparkError::Internal( + "Unable to truncate NTZ timestamp".to_string(), + )) + } + }, + None => builder.append_null(), + } + Ok(()) +} + pub(crate) fn timestamp_trunc( array: &PrimitiveArray, format: String, @@ -540,6 +638,10 @@ where let builder = TimestampMicrosecondBuilder::with_capacity(array.len()); let iter = ArrayIter::new(array); match array.data_type() { + DataType::Timestamp(TimeUnit::Microsecond, None) => { + // TimestampNTZ: operate directly on naive microsecond values without timezone + timestamp_trunc_ntz(array, format) + } DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => { match format.to_uppercase().as_str() { "YEAR" | "YYYY" | "YY" => { @@ -687,6 +789,15 @@ macro_rules! timestamp_trunc_array_fmt_helper { "lengths of values array and format array must be the same" ); match $datatype { + DataType::Timestamp(TimeUnit::Microsecond, None) => { + // TimestampNTZ: operate directly on naive microsecond values + for (index, val) in iter.enumerate() { + let micros_val = val.map(|v| i64::from(v)); + let trunc_fn = ntz_trunc_fn_for_format($formats.value(index))?; + timestamp_trunc_ntz_single(micros_val, &mut builder, trunc_fn)?; + } + Ok(builder.finish()) + } DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => { let tz: Tz = tz.parse()?; for (index, val) in iter.enumerate() { diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index 52b086184b..ade2ad488e 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -186,8 +186,11 @@ object CometHour extends CometExpressionSerde[Hour] { override def getIncompatibleReasons(): Seq[String] = Seq(incompatReason) override def getSupportLevel(expr: Hour): SupportLevel = { - if (expr.child.dataType.typeName == "timestamp_ntz") { - Incompatible(Some(incompatReason)) + if (expr.child.dataType == TimestampNTZType) { + Incompatible( + Some( + "Incorrectly applies timezone conversion to TimestampNTZ inputs" + + " (https://github.com/apache/datafusion-comet/issues/3180)")) } else { Compatible() } @@ -221,7 +224,7 @@ object CometHour extends CometExpressionSerde[Hour] { object CometMinute extends CometExpressionSerde[Minute] { override def getSupportLevel(expr: Minute): SupportLevel = { - if (expr.child.dataType.typeName == "timestamp_ntz") { + if (expr.child.dataType == TimestampNTZType) { Incompatible( Some( "Incorrectly applies timezone conversion to TimestampNTZ inputs" + @@ -259,7 +262,7 @@ object CometMinute extends CometExpressionSerde[Minute] { object CometSecond extends CometExpressionSerde[Second] { override def getSupportLevel(expr: Second): SupportLevel = { - if (expr.child.dataType.typeName == "timestamp_ntz") { + if (expr.child.dataType == TimestampNTZType) { Incompatible( Some( "Incorrectly applies timezone conversion to TimestampNTZ inputs" + @@ -297,11 +300,9 @@ object CometSecond extends CometExpressionSerde[Second] { object CometUnixTimestamp extends CometExpressionSerde[UnixTimestamp] { private def isSupportedInputType(expr: UnixTimestamp): Boolean = { - // Note: TimestampNTZType is not supported because Comet incorrectly applies - // timezone conversion to TimestampNTZ values. TimestampNTZ stores local time - // without timezone, so no conversion should be applied. expr.children.head.dataType match { case TimestampType | DateType => true + case TimestampNTZType => true case _ => false } } diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index dadfbfc93a..08fc8e32cd 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -1594,7 +1594,13 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { // CAST from TimestampNTZType test("cast TimestampNTZType to StringType") { - castTest(generateTimestampNTZ(), DataTypes.StringType) + // TimestampNTZ is timezone-independent, so casting to string should produce + // the same result regardless of the session timezone. + for (tz <- representativeTimezones) { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { + castTest(generateTimestampNTZ(), DataTypes.StringType) + } + } } test("cast TimestampNTZType to DateType") { diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index bcc6289cb9..a8147089d9 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -35,6 +35,10 @@ import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator} class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { + /** Timezones used to verify that TimestampNTZ operations are timezone-independent. */ + private val crossTimezones = + Seq("UTC", "America/Los_Angeles", "Europe/London", "Asia/Tokyo") + test("trunc (TruncDate)") { val supportedFormats = CometTruncDate.supportedFormats val unsupportedFormats = Seq("invalid") @@ -139,17 +143,153 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH } } - test("unix_timestamp - timestamp_ntz input falls back to Spark") { - // TimestampNTZ is not supported because Comet incorrectly applies timezone - // conversion. TimestampNTZ stores local time without timezone, so the unix - // timestamp should just be the value divided by microseconds per second. + test("unix_timestamp - timestamp_ntz input") { + // TimestampNTZ stores local time without timezone, so the unix + // timestamp is the value divided by microseconds per second (no timezone conversion). + // Verify this produces the same result regardless of session timezone. val r = new Random(42) val ntzSchema = StructType(Seq(StructField("ts_ntz", DataTypes.TimestampNTZType, true))) val ntzDF = FuzzDataGenerator.generateDataFrame(r, spark, ntzSchema, 100, DataGenOptions()) ntzDF.createOrReplaceTempView("ntz_tbl") - checkSparkAnswerAndFallbackReason( - "SELECT ts_ntz, unix_timestamp(ts_ntz) from ntz_tbl order by ts_ntz", - "unix_timestamp does not support input type: TimestampNTZType") + for (tz <- crossTimezones) { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { + checkSparkAnswerAndOperator( + "SELECT ts_ntz, unix_timestamp(ts_ntz) from ntz_tbl order by ts_ntz") + } + } + } + + test("hour/minute/second - timestamp_ntz input") { + // TimestampNTZ extracts time components directly from the stored local time, + // so the result should be the same regardless of session timezone. + // Comet currently falls back to Spark for hour/minute/second on TimestampNTZ + // inputs (https://github.com/apache/datafusion-comet/issues/3180); we verify + // correctness (matching Spark's output) in all session timezones. + val r = new Random(42) + val ntzSchema = StructType(Seq(StructField("ts_ntz", DataTypes.TimestampNTZType, true))) + val ntzDF = FuzzDataGenerator.generateDataFrame(r, spark, ntzSchema, 100, DataGenOptions()) + ntzDF.createOrReplaceTempView("ntz_tbl") + for (tz <- crossTimezones) { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { + checkSparkAnswer( + "SELECT ts_ntz, hour(ts_ntz), minute(ts_ntz), second(ts_ntz) from ntz_tbl order by ts_ntz") + } + } + } + + test("date_trunc - timestamp_ntz input") { + // TimestampNTZ truncation should be timezone-independent. + // Verify the result is the same regardless of session timezone. + // Catalyst wraps the NTZ child in cast(ts_ntz as timestamp), so Comet runs + // date_trunc natively only when the session timezone is UTC (see + // https://github.com/apache/datafusion-comet/issues/2649); for non-UTC + // sessions it falls back to Spark but must still produce correct results. + val r = new Random(42) + val ntzSchema = StructType(Seq(StructField("ts_ntz", DataTypes.TimestampNTZType, true))) + // Use a reasonable date range (around year 2024) to avoid chrono-tz DST calculation + // issues with far-future dates. The default baseDate is year 3333 which is beyond + // the range where chrono-tz can reliably calculate DST transitions. + val reasonableBaseDate = + new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2024-06-15 12:00:00").getTime + val ntzDF = FuzzDataGenerator.generateDataFrame( + r, + spark, + ntzSchema, + 100, + DataGenOptions(baseDate = reasonableBaseDate)) + ntzDF.createOrReplaceTempView("ntz_tbl") + for (tz <- crossTimezones) { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { + for (format <- CometTruncTimestamp.supportedFormats) { + val sql = + s"SELECT ts_ntz, date_trunc('$format', ts_ntz) from ntz_tbl order by ts_ntz" + if (tz == "UTC") { + checkSparkAnswerAndOperator(sql) + } else { + checkSparkAnswer(sql) + } + } + } + } + } + + test("date_format - timestamp_ntz input") { + // TimestampNTZ is timezone-independent, so date_format should produce the same + // formatted string regardless of session timezone. Comet currently only runs this + // natively for UTC; for non-UTC it falls back to Spark. We verify correctness + // (matching Spark's output) in all cases. + val r = new Random(42) + val ntzSchema = StructType(Seq(StructField("ts_ntz", DataTypes.TimestampNTZType, true))) + val ntzDF = FuzzDataGenerator.generateDataFrame(r, spark, ntzSchema, 100, DataGenOptions()) + ntzDF.createOrReplaceTempView("ntz_tbl") + val supportedFormats = + CometDateFormat.supportedFormats.keys.toSeq.filterNot(_.contains("'")) + for (tz <- crossTimezones) { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { + for (format <- supportedFormats) { + if (tz == "UTC") { + checkSparkAnswerAndOperator( + s"SELECT ts_ntz, date_format(ts_ntz, '$format') from ntz_tbl order by ts_ntz") + } else { + // Non-UTC falls back to Spark but should still produce correct results + checkSparkAnswer( + s"SELECT ts_ntz, date_format(ts_ntz, '$format') from ntz_tbl order by ts_ntz") + } + } + } + } + } + + test("timestamp_ntz - cross-timezone Parquet round-trip") { + // This test verifies the key TimestampNTZ invariant: data written to a + // timestamp_ntz Parquet column under one session timezone can be read by + // another session with a different timezone and produce identical results. + // This is the defining characteristic of TimestampNTZ vs TimestampType. + val writeTimezones = Seq("America/Los_Angeles", "Asia/Tokyo", "UTC") + val readTimezones = Seq("Europe/London", "America/New_York", "UTC", "Pacific/Auckland") + + for (writeTz <- writeTimezones) { + withTempDir { dir => + // Write data with one session timezone + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> writeTz) { + val data = Seq( + Row("2024-01-15T08:30:00"), + Row("2024-07-04T23:59:59.999999"), + Row("1970-01-01T00:00:00"), + Row("2024-03-10T02:30:00"), // DST spring-forward time in US + Row("2024-11-03T01:30:00"), // DST fall-back time in US + Row(null)) + val schema = StructType(Seq(StructField("ts_str", DataTypes.StringType, true))) + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .selectExpr("CAST(ts_str AS TIMESTAMP_NTZ) AS ts_ntz") + .write + .mode(SaveMode.Overwrite) + .parquet(dir.toString) + } + + // Read with different session timezones and verify results are identical + for (readTz <- readTimezones) { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> readTz) { + spark.read.parquet(dir.toString).createOrReplaceTempView("ntz_cross_tz") + // Casts and unix_timestamp are supported natively for NTZ in any session TZ + checkSparkAnswerAndOperator( + "SELECT ts_ntz, CAST(ts_ntz AS STRING) FROM ntz_cross_tz ORDER BY ts_ntz") + checkSparkAnswerAndOperator( + "SELECT ts_ntz, CAST(ts_ntz AS DATE) FROM ntz_cross_tz ORDER BY ts_ntz") + checkSparkAnswerAndOperator( + "SELECT ts_ntz, unix_timestamp(ts_ntz) FROM ntz_cross_tz ORDER BY ts_ntz") + // hour/minute/second fall back for NTZ (issue #3180); date_trunc falls + // back when the session timezone is non-UTC (issue #2649). Verify + // correctness only. + checkSparkAnswer( + "SELECT ts_ntz, hour(ts_ntz), minute(ts_ntz), second(ts_ntz) FROM ntz_cross_tz ORDER BY ts_ntz") + checkSparkAnswer( + "SELECT ts_ntz, date_trunc('HOUR', ts_ntz) FROM ntz_cross_tz ORDER BY ts_ntz") + } + } + } + } } test("unix_timestamp - string input falls back to Spark") { @@ -555,11 +695,7 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH .createDataFrame(spark.sparkContext.parallelize(data), schema) .createOrReplaceTempView("dst_tbl") - // We `allowIncompatible` here because casts involving TimestampNTZ are marked - // as Incompatible (due to incorrect behaviour when casting from a string) - withSQLConf( - SQLConf.SESSION_LOCAL_TIMEZONE.key -> "Europe/London", - "spark.comet.expression.Cast.allowIncompatible" -> "true") { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "Europe/London") { checkSparkAnswerAndOperator( "SELECT ts_ntz, CAST(ts_ntz AS TIMESTAMP) FROM dst_tbl ORDER BY ts_ntz") }