Skip to content

Commit b10ed64

Browse files
committed
add test
1 parent bd70924 commit b10ed64

1 file changed

Lines changed: 71 additions & 0 deletions

File tree

spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ import java.nio.file.Files
2525
import org.apache.spark.sql.CometTestBase
2626
import org.apache.spark.sql.comet.CometIcebergNativeScanExec
2727
import org.apache.spark.sql.execution.SparkPlan
28+
import org.apache.spark.sql.types.{StringType, TimestampType}
2829

2930
import org.apache.comet.iceberg.RESTCatalogHelper
31+
import org.apache.comet.testing.{FuzzDataGenerator, SchemaGenOptions}
3032

3133
/**
3234
* Test suite for native Iceberg scan using FileScanTasks and iceberg-rust.
@@ -2291,6 +2293,7 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper {
22912293
}
22922294
file.delete()
22932295
}
2296+
22942297
deleteRecursively(dir)
22952298
}
22962299
}
@@ -2450,4 +2453,72 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper {
24502453
}
24512454
}
24522455
}
2456+
2457+
// Regression test for a user reported issue
2458+
test("double partitioning with range filter on top-level partition") {
2459+
assume(icebergAvailable, "Iceberg not available")
2460+
2461+
// Generate Iceberg table without Comet enabled
2462+
withTempIcebergDir { warehouseDir =>
2463+
withSQLConf(
2464+
"spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog",
2465+
"spark.sql.catalog.test_cat.type" -> "hadoop",
2466+
"spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath,
2467+
"spark.sql.files.maxRecordsPerFile" -> "50") {
2468+
2469+
// timestamp + geohash with multi-column partitioning
2470+
spark.sql("""
2471+
CREATE TABLE test_cat.db.geolocation_trips (
2472+
outputTimestamp TIMESTAMP,
2473+
geohash7 STRING,
2474+
tripId STRING
2475+
) USING iceberg
2476+
PARTITIONED BY (hours(outputTimestamp), truncate(3, geohash7))
2477+
TBLPROPERTIES (
2478+
'format-version' = '2',
2479+
'write.distribution-mode' = 'range',
2480+
'write.target-file-size-bytes' = '1073741824'
2481+
)
2482+
""")
2483+
val schema = FuzzDataGenerator.generateSchema(
2484+
SchemaGenOptions(primitiveTypes = Seq(TimestampType, StringType, StringType)))
2485+
2486+
val random = new scala.util.Random(42)
2487+
// Set baseDate to match our filter range (around 2024-01-01)
2488+
val options = testing.DataGenOptions(
2489+
allowNull = false,
2490+
baseDate = 1704067200000L
2491+
) // 2024-01-01 00:00:00
2492+
2493+
val df = FuzzDataGenerator
2494+
.generateDataFrame(random, spark, schema, 1000, options)
2495+
.toDF("outputTimestamp", "geohash7", "tripId")
2496+
2497+
df.writeTo("test_cat.db.geolocation_trips").append()
2498+
}
2499+
2500+
// Query using Comet native Iceberg scan
2501+
withSQLConf(
2502+
"spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog",
2503+
"spark.sql.catalog.test_cat.type" -> "hadoop",
2504+
"spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath,
2505+
CometConf.COMET_ENABLED.key -> "true",
2506+
CometConf.COMET_EXEC_ENABLED.key -> "true",
2507+
CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
2508+
2509+
// Filter for a range that does not align with hour boundaries
2510+
// Partitioning is hours(outputTimestamp), so filter in middle of hours forces residual filter
2511+
val startMs = 1704067200000L + 30 * 60 * 1000L // 2024-01-01 01:30:00 (30 min into hour)
2512+
val endMs = 1704078000000L - 15 * 60 * 1000L // 2024-01-01 03:45:00 (15 min before hour)
2513+
2514+
checkIcebergNativeScan(s"""
2515+
SELECT COUNT(DISTINCT(tripId)) FROM test_cat.db.geolocation_trips
2516+
WHERE timestamp_millis($startMs) <= outputTimestamp
2517+
AND outputTimestamp < timestamp_millis($endMs)
2518+
""")
2519+
2520+
spark.sql("DROP TABLE test_cat.db.geolocation_trips")
2521+
}
2522+
}
2523+
}
24532524
}

0 commit comments

Comments
 (0)