Skip to content

Commit e3761e0

Browse files
authored
fix: Fall back on dynamicpruning expressions for CometIcebergNativeScan (#3335)
1 parent 6eaf948 commit e3761e0

3 files changed

Lines changed: 76 additions & 7 deletions

File tree

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,10 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
327327
case _
328328
if scanExec.scan.getClass.getName ==
329329
"org.apache.iceberg.spark.source.SparkBatchQueryScan" =>
330+
if (scanExec.runtimeFilters.exists(isDynamicPruningFilter)) {
331+
return withInfo(scanExec, "Dynamic Partition Pruning is not supported")
332+
}
333+
330334
val fallbackReasons = new ListBuffer[String]()
331335

332336
// Native Iceberg scan requires both configs to be enabled

spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -700,10 +700,9 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit
700700
// If metadata is None, this is a programming error - metadata should have been extracted
701701
// in CometScanRule before creating CometBatchScanExec
702702
val metadata = scan.nativeIcebergScanMetadata.getOrElse {
703-
logError(
703+
throw new IllegalStateException(
704704
"Programming error: CometBatchScanExec.nativeIcebergScanMetadata is None. " +
705705
"Metadata should have been extracted in CometScanRule.")
706-
return None
707706
}
708707

709708
// Use pre-extracted metadata (no reflection needed)
@@ -979,11 +978,11 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit
979978
}
980979
} catch {
981980
case e: Exception =>
982-
val msg =
983-
"Iceberg reflection failure: Failed to extract FileScanTasks from Iceberg scan RDD: " +
984-
s"${e.getMessage}"
985-
logError(msg, e)
986-
return None
981+
// CometScanRule already validated this scan should use native execution.
982+
// Failure here is a programming error, not a graceful fallback scenario.
983+
throw new IllegalStateException(
984+
s"Native Iceberg scan serialization failed unexpectedly: ${e.getMessage}",
985+
e)
987986
}
988987

989988
// Log deduplication summary

spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2294,4 +2294,70 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper {
22942294
deleteRecursively(dir)
22952295
}
22962296
}
2297+
2298+
test("runtime filtering - join with dynamic partition pruning") {
2299+
assume(icebergAvailable, "Iceberg not available")
2300+
withTempIcebergDir { warehouseDir =>
2301+
val dimDir = new File(warehouseDir, "dim_parquet")
2302+
withSQLConf(
2303+
"spark.sql.catalog.runtime_cat" -> "org.apache.iceberg.spark.SparkCatalog",
2304+
"spark.sql.catalog.runtime_cat.type" -> "hadoop",
2305+
"spark.sql.catalog.runtime_cat.warehouse" -> warehouseDir.getAbsolutePath,
2306+
CometConf.COMET_ENABLED.key -> "true",
2307+
CometConf.COMET_EXEC_ENABLED.key -> "true",
2308+
CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
2309+
2310+
// Create partitioned Iceberg table (fact table)
2311+
spark.sql("""
2312+
CREATE TABLE runtime_cat.db.fact_table (
2313+
id BIGINT,
2314+
data STRING,
2315+
date DATE
2316+
) USING iceberg
2317+
PARTITIONED BY (date)
2318+
""")
2319+
2320+
// Insert data across multiple partitions
2321+
spark.sql("""
2322+
INSERT INTO runtime_cat.db.fact_table VALUES
2323+
(1, 'a', DATE '1970-01-01'),
2324+
(2, 'b', DATE '1970-01-02'),
2325+
(3, 'c', DATE '1970-01-02'),
2326+
(4, 'd', DATE '1970-01-03')
2327+
""")
2328+
2329+
// Create dimension table (Parquet) in temp directory
2330+
spark
2331+
.createDataFrame(Seq((1L, java.sql.Date.valueOf("1970-01-02"))))
2332+
.toDF("id", "date")
2333+
.write
2334+
.parquet(dimDir.getAbsolutePath)
2335+
spark.read.parquet(dimDir.getAbsolutePath).createOrReplaceTempView("dim")
2336+
2337+
// This join should trigger dynamic partition pruning
2338+
val query =
2339+
"""SELECT f.* FROM runtime_cat.db.fact_table f
2340+
|JOIN dim d ON f.date = d.date AND d.id = 1
2341+
|ORDER BY f.id""".stripMargin
2342+
2343+
// Verify the initial plan contains dynamic pruning expression
2344+
val df = spark.sql(query)
2345+
val initialPlan = df.queryExecution.executedPlan
2346+
val planStr = initialPlan.toString
2347+
assert(
2348+
planStr.contains("dynamicpruning"),
2349+
s"Expected dynamic pruning in plan but got:\n$planStr")
2350+
2351+
// Check results match Spark
2352+
// Note: AQE re-plans after subquery executes, converting dynamicpruningexpression(...)
2353+
// to dynamicpruningexpression(true), which allows native Iceberg scan to proceed.
2354+
// This is correct behavior - no actual subquery to wait for after AQE re-planning.
2355+
// However, the rest of the still contains non-native operators because CometExecRule
2356+
// doesn't run again.
2357+
checkSparkAnswer(df)
2358+
2359+
spark.sql("DROP TABLE runtime_cat.db.fact_table")
2360+
}
2361+
}
2362+
}
22972363
}

0 commit comments

Comments
 (0)