Skip to content

Commit d9e8ff0

Browse files
authored
fix: use UTC for Arrow schema timezone in SparkToColumnar conversions (#3878)
1 parent 9a51e1b commit d9e8ff0

4 files changed

Lines changed: 74 additions & 14 deletions

File tree

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -222,31 +222,28 @@ object CometConf extends ShimCometConf {
222222

223223
val COMET_CONVERT_FROM_PARQUET_ENABLED: ConfigEntry[Boolean] =
224224
conf("spark.comet.convert.parquet.enabled")
225-
.category(CATEGORY_TESTING)
225+
.category(CATEGORY_EXEC)
226226
.doc(
227227
"When enabled, data from Spark (non-native) Parquet v1 and v2 scans will be converted to " +
228-
"Arrow format. This is an experimental feature and has known issues with " +
229-
"non-UTC timezones.")
228+
"Arrow format.")
230229
.booleanConf
231230
.createWithDefault(false)
232231

233232
val COMET_CONVERT_FROM_JSON_ENABLED: ConfigEntry[Boolean] =
234233
conf("spark.comet.convert.json.enabled")
235-
.category(CATEGORY_TESTING)
234+
.category(CATEGORY_EXEC)
236235
.doc(
237236
"When enabled, data from Spark (non-native) JSON v1 and v2 scans will be converted to " +
238-
"Arrow format. This is an experimental feature and has known issues with " +
239-
"non-UTC timezones.")
237+
"Arrow format.")
240238
.booleanConf
241239
.createWithDefault(false)
242240

243241
val COMET_CONVERT_FROM_CSV_ENABLED: ConfigEntry[Boolean] =
244242
conf("spark.comet.convert.csv.enabled")
245-
.category(CATEGORY_TESTING)
243+
.category(CATEGORY_EXEC)
246244
.doc(
247245
"When enabled, data from Spark (non-native) CSV v1 and v2 scans will be converted to " +
248-
"Arrow format. This is an experimental feature and has known issues with " +
249-
"non-UTC timezones.")
246+
"Arrow format.")
250247
.booleanConf
251248
.createWithDefault(false)
252249

@@ -743,17 +740,17 @@ object CometConf extends ShimCometConf {
743740

744741
val COMET_SPARK_TO_ARROW_ENABLED: ConfigEntry[Boolean] =
745742
conf("spark.comet.sparkToColumnar.enabled")
746-
.category(CATEGORY_TESTING)
743+
.category(CATEGORY_EXEC)
747744
.doc("Whether to enable Spark to Arrow columnar conversion. When this is turned on, " +
748745
"Comet will convert operators in " +
749746
"`spark.comet.sparkToColumnar.supportedOperatorList` into Arrow columnar format before " +
750-
"processing. This is an experimental feature and has known issues with non-UTC timezones.")
747+
"processing.")
751748
.booleanConf
752749
.createWithDefault(false)
753750

754751
val COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST: ConfigEntry[Seq[String]] =
755752
conf("spark.comet.sparkToColumnar.supportedOperatorList")
756-
.category(CATEGORY_TESTING)
753+
.category(CATEGORY_EXEC)
757754
.doc("A comma-separated list of operators that will be converted to Arrow columnar " +
758755
s"format when `${COMET_SPARK_TO_ARROW_ENABLED.key}` is true.")
759756
.stringConf

spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ case class CometLocalTableScanExec(
6666
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
6767
val numInputRows = longMetric("numOutputRows")
6868
val maxRecordsPerBatch = CometConf.COMET_BATCH_SIZE.get(conf)
69-
val timeZoneId = conf.sessionLocalTimeZone
69+
// Use UTC to match native side expectations. See CometSparkToColumnarExec.
70+
val timeZoneId = "UTC"
7071
rdd.mapPartitionsInternal { sparkBatches =>
7172
val context = TaskContext.get()
7273
val batches = CometArrowConverters.rowToArrowBatchIter(

spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,12 @@ case class CometSparkToColumnarExec(child: SparkPlan)
9797
val numOutputBatches = longMetric("numOutputBatches")
9898
val conversionTime = longMetric("conversionTime")
9999
val maxRecordsPerBatch = CometConf.COMET_BATCH_SIZE.get(conf)
100-
val timeZoneId = conf.sessionLocalTimeZone
100+
// Use UTC for Arrow schema timezone to match the native side, which always
101+
// deserializes Timestamp as Timestamp(Microsecond, Some("UTC")). Spark's internal
102+
// timestamp representation is always UTC microseconds, so the timezone here is
103+
// purely schema metadata. Using session timezone would cause Arrow RowConverter
104+
// schema mismatch errors in non-UTC sessions. See COMET-2720.
105+
val timeZoneId = "UTC"
101106
val schema = child.schema
102107

103108
if (child.supportsColumnar) {

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2209,6 +2209,63 @@ class CometExecSuite extends CometTestBase {
22092209
}
22102210
}
22112211

2212+
test("LocalTableScanExec with timestamps in non-UTC timezone") {
2213+
withSQLConf(
2214+
CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true",
2215+
SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") {
2216+
val df = Seq(
2217+
(1, java.sql.Timestamp.valueOf("2024-01-15 10:30:00")),
2218+
(2, java.sql.Timestamp.valueOf("2024-06-15 14:00:00")),
2219+
(3, java.sql.Timestamp.valueOf("2024-12-25 08:00:00")))
2220+
.toDF("id", "ts")
2221+
.orderBy("ts")
2222+
checkSparkAnswerAndOperator(df)
2223+
}
2224+
}
2225+
2226+
test("SparkToColumnar with timestamps in non-UTC timezone") {
2227+
withTempDir { dir =>
2228+
val path = new java.io.File(dir, "data").getAbsolutePath
2229+
Seq(
2230+
(1, java.sql.Timestamp.valueOf("2024-01-15 10:30:00")),
2231+
(2, java.sql.Timestamp.valueOf("2024-06-15 14:00:00")),
2232+
(3, java.sql.Timestamp.valueOf("2024-12-25 08:00:00")))
2233+
.toDF("id", "ts")
2234+
.write
2235+
.parquet(path)
2236+
withSQLConf(
2237+
CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false",
2238+
CometConf.COMET_SPARK_TO_ARROW_ENABLED.key -> "true",
2239+
CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "true",
2240+
SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") {
2241+
val df = spark.read.parquet(path).orderBy("ts")
2242+
checkSparkAnswerAndOperator(df)
2243+
}
2244+
}
2245+
}
2246+
2247+
test("sort on timestamps with non-UTC timezone via LocalTableScan") {
2248+
// When session timezone is non-UTC, CometLocalTableScanExec and
2249+
// CometSparkToColumnarExec must use UTC for the Arrow schema timezone
2250+
// to match the native side's expectations. Without this, the native
2251+
// ScanExec sees a timezone mismatch and performs an unnecessary cast.
2252+
// The cast is currently a no-op (Arrow timestamps with timezone are
2253+
// always UTC microseconds), but using UTC avoids the overhead and
2254+
// keeps schemas consistent throughout the native plan.
2255+
withSQLConf(
2256+
CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true",
2257+
SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") {
2258+
val df = Seq(
2259+
(1, java.sql.Timestamp.valueOf("2024-01-15 10:30:00")),
2260+
(2, java.sql.Timestamp.valueOf("2024-06-15 14:00:00")),
2261+
(3, java.sql.Timestamp.valueOf("2024-12-25 08:00:00")))
2262+
.toDF("id", "ts")
2263+
.repartition(2)
2264+
.orderBy("ts")
2265+
checkSparkAnswer(df)
2266+
}
2267+
}
2268+
22122269
test("Native_datafusion reports correct files and bytes scanned") {
22132270
val inputFiles = 2
22142271

0 commit comments

Comments
 (0)