Skip to content

Commit 2559ca0

Browse files
committed
fix: cast to and from timestamp_ntz
1 parent a2a3dd3 commit 2559ca0

7 files changed

Lines changed: 364 additions & 78 deletions

File tree

native/spark-expr/src/conversion_funcs/cast.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use crate::conversion_funcs::temporal::{
3535
is_df_cast_from_timestamp_spark_compatible,
3636
};
3737
use crate::conversion_funcs::utils::spark_cast_postprocess;
38-
use crate::utils::array_with_timezone;
38+
use crate::utils::{array_with_timezone, cast_timestamp_to_ntz, timestamp_ntz_to_timestamp};
3939
use crate::EvalMode::Legacy;
4040
use crate::{cast_whole_num_to_binary, BinaryOutputStyle};
4141
use crate::{EvalMode, SparkError};
@@ -441,6 +441,18 @@ pub(crate) fn cast_array(
441441
(Float32 | Float64, Timestamp(_, tz)) => cast_float_to_timestamp(&array, tz, eval_mode),
442442
(Boolean, Timestamp(_, tz)) => cast_boolean_to_timestamp(&array, tz),
443443
(Decimal128(_, scale), Timestamp(_, tz)) => cast_decimal_to_timestamp(&array, tz, *scale),
444+
// NTZ → TIMESTAMP: interpret NTZ local-epoch value as session-TZ local time, convert to UTC.
445+
// Must come before the is_datafusion_spark_compatible fallthrough which would
446+
// incorrectly copy raw μs without any timezone conversion.
447+
(Timestamp(_, None), Timestamp(_, Some(target_tz))) => Ok(timestamp_ntz_to_timestamp(
448+
array,
449+
&cast_options.timezone,
450+
Some(target_tz.as_ref()),
451+
)?),
452+
// TIMESTAMP → NTZ: shift UTC epoch to local time in session TZ, store as local epoch.
453+
(Timestamp(_, Some(_)), Timestamp(_, None)) => {
454+
Ok(cast_timestamp_to_ntz(array, &cast_options.timezone)?)
455+
}
444456
_ if cast_options.is_adapting_schema
445457
|| is_datafusion_spark_compatible(&from_type, to_type) =>
446458
{

native/spark-expr/src/conversion_funcs/string.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1516,7 +1516,7 @@ fn extract_offset_suffix(value: &str) -> Option<(&str, timezone::Tz)> {
15161516

15171517
type TimestampParsePattern<T> = (&'static Regex, fn(&str, &T) -> SparkResult<Option<i64>>);
15181518

1519-
static RE_YEAR: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^-?\d{4,7}$").unwrap());
1519+
static RE_YEAR: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^-?\d{4,6}$").unwrap());
15201520
static RE_MONTH: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^-?\d{4,7}-\d{2}$").unwrap());
15211521
static RE_DAY: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^-?\d{4,7}-\d{2}-\d{2}$").unwrap());
15221522
static RE_HOUR: LazyLock<Regex> =
@@ -1802,6 +1802,9 @@ mod tests {
18021802
Some("T2"),
18031803
Some("0100-01-01T12:34:56.123456"),
18041804
Some("10000-01-01T12:34:56.123456"),
1805+
// 7-digit year-only strings must return null (Spark returns null for these)
1806+
Some("0119704"),
1807+
Some("2024001"),
18051808
]));
18061809
let tz = &timezone::Tz::from_str("UTC").unwrap();
18071810

@@ -1826,7 +1829,10 @@ mod tests {
18261829
result.data_type(),
18271830
&DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()))
18281831
);
1829-
assert_eq!(result.len(), 4);
1832+
assert_eq!(result.len(), 6);
1833+
// 7-digit year-only strings must be null
1834+
assert!(result.is_null(4), "0119704 should be null");
1835+
assert!(result.is_null(5), "2024001 should be null");
18301836
}
18311837

18321838
#[test]

native/spark-expr/src/conversion_funcs/temporal.rs

Lines changed: 89 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use crate::utils::resolve_local_datetime;
1819
use crate::{timezone, SparkCastOptions, SparkResult};
1920
use arrow::array::{ArrayRef, AsArray, TimestampMicrosecondBuilder};
2021
use arrow::datatypes::{DataType, Date32Type};
21-
use chrono::{NaiveDate, TimeZone};
22+
use chrono::NaiveDate;
2223
use std::str::FromStr;
2324
use std::sync::Arc;
2425

@@ -38,37 +39,49 @@ pub(crate) fn cast_date_to_timestamp(
3839
cast_options: &SparkCastOptions,
3940
target_tz: &Option<Arc<str>>,
4041
) -> SparkResult<ArrayRef> {
41-
let tz_str = if cast_options.timezone.is_empty() {
42-
"UTC"
43-
} else {
44-
cast_options.timezone.as_str()
45-
};
46-
// safe to unwrap since we are falling back to UTC above
47-
let tz = timezone::Tz::from_str(tz_str)?;
48-
let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
4942
let date_array = array_ref.as_primitive::<Date32Type>();
50-
5143
let mut builder = TimestampMicrosecondBuilder::with_capacity(date_array.len());
5244

53-
for date in date_array.iter() {
54-
match date {
55-
Some(date) => {
56-
// safe to unwrap since chrono's range ( 262,143 yrs) is higher than
57-
// number of years possible with days as i32 (~ 6 mil yrs)
58-
// convert date in session timezone to timestamp in UTC
59-
let naive_date = epoch + chrono::Duration::days(date as i64);
60-
let local_midnight = naive_date.and_hms_opt(0, 0, 0).unwrap();
61-
let local_midnight_in_microsec = tz
62-
.from_local_datetime(&local_midnight)
63-
// return earliest possible time (edge case with spring / fall DST changes)
64-
.earliest()
65-
.map(|dt| dt.timestamp_micros())
66-
// in case there is an issue with DST and returns None , we fall back to UTC
67-
.unwrap_or((date as i64) * 86_400 * 1_000_000);
68-
builder.append_value(local_midnight_in_microsec);
45+
if target_tz.is_none() {
46+
// TIMESTAMP_NTZ: pure day arithmetic, no session-TZ offset.
47+
// Matches Spark: daysToMicros(d, ZoneOffset.UTC)
48+
for date in date_array.iter() {
49+
match date {
50+
Some(d) => builder.append_value((d as i64) * 86_400 * 1_000_000),
51+
None => builder.append_null(),
6952
}
70-
None => {
71-
builder.append_null();
53+
}
54+
} else {
55+
// TIMESTAMP: midnight in session TZ → UTC epoch μs
56+
let tz_str = if cast_options.timezone.is_empty() {
57+
"UTC"
58+
} else {
59+
cast_options.timezone.as_str()
60+
};
61+
// safe to unwrap since we are falling back to UTC above
62+
let tz = timezone::Tz::from_str(tz_str)?;
63+
let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
64+
for date in date_array.iter() {
65+
match date {
66+
Some(d) => {
67+
// safe to unwrap since chrono's range ( 262,143 yrs) is higher than
68+
// number of years possible with days as i32 (~ 6 mil yrs)
69+
// convert date in session timezone to timestamp in UTC
70+
let naive_date = epoch + chrono::Duration::days(d as i64);
71+
let local_midnight = naive_date.and_hms_opt(0, 0, 0).unwrap();
72+
// Use resolve_local_datetime to correctly handle DST transitions:
73+
// - Single: normal case, uses the given offset
74+
// - Ambiguous (fall back): uses the earlier/DST occurrence, matching Spark
75+
// - None (spring forward gap at midnight, e.g. America/Sao_Paulo): uses the
76+
// pre-transition offset to compute the correct UTC time, matching Spark's
77+
// LocalDate.atStartOfDay(zoneId) behaviour.
78+
let local_midnight_in_microsec =
79+
resolve_local_datetime(&tz, local_midnight).timestamp_micros();
80+
builder.append_value(local_midnight_in_microsec);
81+
}
82+
None => {
83+
builder.append_null();
84+
}
7285
}
7386
}
7487
}
@@ -142,4 +155,52 @@ mod tests {
142155
assert_eq!(ts.value(2), dst_date + seven_hours_ts);
143156
assert!(ts.is_null(3));
144157
}
158+
159+
#[test]
160+
fn test_cast_date_to_timestamp_ntz() {
161+
use crate::EvalMode;
162+
use arrow::array::Date32Array;
163+
use arrow::array::{Array, ArrayRef};
164+
use arrow::datatypes::TimestampMicrosecondType;
165+
166+
// For NTZ, result is always days * 86_400_000_000 regardless of session TZ
167+
let dates: ArrayRef = Arc::new(Date32Array::from(vec![
168+
Some(0), // 1970-01-01
169+
Some(1), // 1970-01-02
170+
Some(-1), // 1969-12-31
171+
Some(19723), // 2024-01-01
172+
None,
173+
]));
174+
175+
// NTZ target: no timezone annotation
176+
let ntz_target: Option<Arc<str>> = None;
177+
178+
// session TZ should be ignored for NTZ
179+
for tz in &[
180+
"UTC",
181+
"America/Los_Angeles",
182+
"America/New_York",
183+
"Asia/Kolkata",
184+
] {
185+
let result = cast_date_to_timestamp(
186+
&dates,
187+
&SparkCastOptions::new(EvalMode::Legacy, tz, false),
188+
&ntz_target,
189+
)
190+
.unwrap();
191+
let ts = result.as_primitive::<TimestampMicrosecondType>();
192+
// values are pure arithmetic regardless of session TZ
193+
assert_eq!(ts.value(0), 0, "epoch, tz={tz}");
194+
assert_eq!(ts.value(1), 86_400_000_000i64, "day+1, tz={tz}");
195+
assert_eq!(ts.value(2), -86_400_000_000i64, "day-1, tz={tz}");
196+
assert_eq!(
197+
ts.value(3),
198+
19723i64 * 86_400_000_000i64,
199+
"2024-01-01, tz={tz}"
200+
);
201+
assert!(ts.is_null(4), "null, tz={tz}");
202+
// output array has no timezone annotation
203+
assert_eq!(ts.timezone(), None, "no tz annotation, tz={tz}");
204+
}
205+
}
145206
}

native/spark-expr/src/utils.rs

Lines changed: 97 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ pub fn array_with_timezone(
7676
assert!(!timezone.is_empty());
7777
match to_type {
7878
Some(DataType::Utf8) | Some(DataType::Date32) => Ok(array),
79-
Some(DataType::Timestamp(_, Some(_))) => {
80-
timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(timezone.as_str()))
81-
}
79+
// Pass through for Timestamp(_, Some(_)) targets: the cast_array dispatch arm
80+
// handles NTZ → TIMESTAMP conversion with correct "UTC" output annotation.
81+
Some(DataType::Timestamp(_, Some(_))) => Ok(array),
8282
Some(DataType::Timestamp(TimeUnit::Microsecond, None)) => {
8383
// Convert from Timestamp(Millisecond, None) to Timestamp(Microsecond, None)
8484
let millis_array = as_primitive_array::<TimestampMillisecondType>(&array);
@@ -100,9 +100,9 @@ pub fn array_with_timezone(
100100
assert!(!timezone.is_empty());
101101
match to_type {
102102
Some(DataType::Utf8) | Some(DataType::Date32) => Ok(array),
103-
Some(DataType::Timestamp(_, Some(_))) => {
104-
timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(timezone.as_str()))
105-
}
103+
// Pass through for Timestamp(_, Some(_)) targets: the cast_array dispatch arm
104+
// handles NTZ → TIMESTAMP conversion with correct "UTC" output annotation.
105+
Some(DataType::Timestamp(_, Some(_))) => Ok(array),
106106
_ => {
107107
// Not supported
108108
Err(ArrowError::CastError(format!(
@@ -117,9 +117,9 @@ pub fn array_with_timezone(
117117
assert!(!timezone.is_empty());
118118
match to_type {
119119
Some(DataType::Utf8) | Some(DataType::Date32) => Ok(array),
120-
Some(DataType::Timestamp(_, Some(_))) => {
121-
timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(timezone.as_str()))
122-
}
120+
// Pass through for Timestamp(_, Some(_)) targets: the cast_array dispatch arm
121+
// handles NTZ → TIMESTAMP conversion with correct "UTC" output annotation.
122+
Some(DataType::Timestamp(_, Some(_))) => Ok(array),
123123
_ => {
124124
// Not supported
125125
Err(ArrowError::CastError(format!(
@@ -179,7 +179,7 @@ fn datetime_cast_err(value: i64) -> ArrowError {
179179
/// Parameters:
180180
/// tz - timezone used to interpret local_datetime
181181
/// local_datetime - a naive local datetime to resolve
182-
fn resolve_local_datetime(tz: &Tz, local_datetime: NaiveDateTime) -> DateTime<Tz> {
182+
pub(crate) fn resolve_local_datetime(tz: &Tz, local_datetime: NaiveDateTime) -> DateTime<Tz> {
183183
match tz.from_local_datetime(&local_datetime) {
184184
LocalResult::Single(dt) => dt,
185185
LocalResult::Ambiguous(dt, _) => dt,
@@ -210,7 +210,7 @@ fn resolve_local_datetime(tz: &Tz, local_datetime: NaiveDateTime) -> DateTime<Tz
210210
/// array - input array of timestamp without timezone
211211
/// tz - timezone of the values in the input array
212212
/// to_timezone - timezone to change the input values to
213-
fn timestamp_ntz_to_timestamp(
213+
pub(crate) fn timestamp_ntz_to_timestamp(
214214
array: ArrayRef,
215215
tz: &str,
216216
to_timezone: Option<&str>,
@@ -259,6 +259,41 @@ fn timestamp_ntz_to_timestamp(
259259
}
260260
}
261261

262+
/// Converts a `Timestamp(Microsecond, Some(_))` array to `Timestamp(Microsecond, None)`
263+
/// (TIMESTAMP_NTZ) by interpreting the UTC epoch value in the given session timezone and
264+
/// storing the resulting local datetime as epoch-relative microseconds without a TZ annotation.
265+
///
266+
/// Matches Spark: `convertTz(ts, ZoneOffset.UTC, zoneId)`
267+
pub(crate) fn cast_timestamp_to_ntz(
268+
array: ArrayRef,
269+
timezone: &str,
270+
) -> Result<ArrayRef, ArrowError> {
271+
assert!(!timezone.is_empty());
272+
let tz: Tz = timezone.parse()?;
273+
match array.data_type() {
274+
DataType::Timestamp(TimeUnit::Microsecond, Some(_)) => {
275+
let array = as_primitive_array::<TimestampMicrosecondType>(&array);
276+
let result: PrimitiveArray<TimestampMicrosecondType> = array.try_unary(|value| {
277+
as_datetime::<TimestampMicrosecondType>(value)
278+
.ok_or_else(|| datetime_cast_err(value))
279+
.map(|utc_naive| {
280+
// Convert UTC naive datetime → local datetime in session TZ
281+
let local_dt = tz.from_utc_datetime(&utc_naive);
282+
// Re-encode as epoch-relative μs treating local time as UTC anchor.
283+
// This produces the NTZ representation (no offset applied).
284+
local_dt.naive_local().and_utc().timestamp_micros()
285+
})
286+
})?;
287+
// No timezone annotation on output = TIMESTAMP_NTZ
288+
Ok(Arc::new(result))
289+
}
290+
_ => Err(ArrowError::CastError(format!(
291+
"cast_timestamp_to_ntz: unexpected input type {:?}",
292+
array.data_type()
293+
))),
294+
}
295+
}
296+
262297
/// This takes for special pre-casting cases of Spark. E.g., Timestamp to String.
263298
fn pre_timestamp_cast(array: ArrayRef, timezone: String) -> Result<ArrayRef, ArrowError> {
264299
assert!(!timezone.is_empty());
@@ -401,4 +436,55 @@ mod tests {
401436
micros_for("2024-10-27 00:30:00")
402437
);
403438
}
439+
440+
// Helper: build a Timestamp(Microsecond, Some(tz)) array from a UTC datetime string
441+
fn ts_with_tz(utc_datetime: &str, tz: &str) -> ArrayRef {
442+
let dt = NaiveDateTime::parse_from_str(utc_datetime, "%Y-%m-%d %H:%M:%S").unwrap();
443+
let ts = dt.and_utc().timestamp_micros();
444+
Arc::new(TimestampMicrosecondArray::from(vec![ts]).with_timezone(tz.to_string()))
445+
}
446+
447+
#[test]
448+
fn test_cast_timestamp_to_ntz_utc() {
449+
// In UTC, local time == UTC time, so NTZ value == UTC epoch value
450+
let input = ts_with_tz("2024-01-15 10:30:00", "UTC");
451+
let result = cast_timestamp_to_ntz(input, "UTC").unwrap();
452+
let out = as_primitive_array::<TimestampMicrosecondType>(&result);
453+
// Expected NTZ value: epoch μs for "2024-01-15 10:30:00" as if it were UTC
454+
let expected = NaiveDateTime::parse_from_str("2024-01-15 10:30:00", "%Y-%m-%d %H:%M:%S")
455+
.unwrap()
456+
.and_utc()
457+
.timestamp_micros();
458+
assert_eq!(out.value(0), expected);
459+
assert_eq!(out.timezone(), None); // no TZ annotation = NTZ
460+
}
461+
462+
#[test]
463+
fn test_cast_timestamp_to_ntz_offset_timezone() {
464+
// UTC epoch for "2024-01-15 15:30:00 UTC" cast to NTZ with session TZ = America/New_York (UTC-5)
465+
// Local time in NY = 10:30:00 → NTZ should store epoch μs for "2024-01-15 10:30:00"
466+
let input = ts_with_tz("2024-01-15 15:30:00", "UTC");
467+
let result = cast_timestamp_to_ntz(input, "America/New_York").unwrap();
468+
let out = as_primitive_array::<TimestampMicrosecondType>(&result);
469+
let expected = NaiveDateTime::parse_from_str("2024-01-15 10:30:00", "%Y-%m-%d %H:%M:%S")
470+
.unwrap()
471+
.and_utc()
472+
.timestamp_micros();
473+
assert_eq!(out.value(0), expected);
474+
assert_eq!(out.timezone(), None);
475+
}
476+
477+
#[test]
478+
fn test_cast_timestamp_to_ntz_dst() {
479+
// During DST: UTC epoch for "2024-07-04 16:30:00 UTC", session TZ = America/New_York (UTC-4 in summer)
480+
// Local time in NY = 12:30:00 → NTZ stores epoch μs for "2024-07-04 12:30:00"
481+
let input = ts_with_tz("2024-07-04 16:30:00", "UTC");
482+
let result = cast_timestamp_to_ntz(input, "America/New_York").unwrap();
483+
let out = as_primitive_array::<TimestampMicrosecondType>(&result);
484+
let expected = NaiveDateTime::parse_from_str("2024-07-04 12:30:00", "%Y-%m-%d %H:%M:%S")
485+
.unwrap()
486+
.and_utc()
487+
.timestamp_micros();
488+
assert_eq!(out.value(0), expected);
489+
}
404490
}

0 commit comments

Comments
 (0)