Skip to content

Commit f0720f5

Browse files
committed
Add docs to try to address PR feedback.
1 parent 82e4513 commit f0720f5

1 file changed

Lines changed: 23 additions & 8 deletions

File tree

spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,12 @@ case class CometIcebergNativeScanExec(
6666
override val nodeName: String = "CometIcebergNativeScan"
6767

6868
/**
69-
* Prepare DPP subquery plans. Called by Spark's prepare() before doExecuteColumnar(). Only
70-
* kicks off async work - doesn't wait for results (that happens in serializedPartitionData).
69+
* Prepare DPP subquery plans. Called by Spark's prepare() before doExecuteColumnar().
70+
*
71+
* This follows Spark's convention of preparing subqueries in doPrepare() rather than
72+
* doExecuteColumnar(). While the actual waiting for DPP results happens later in
73+
* serializedPartitionData, calling prepare() here ensures subquery plans are set up before
74+
* execution begins.
7175
*/
7276
override protected def doPrepare(): Unit = {
7377
originalPlan.runtimeFilters.foreach {
@@ -79,7 +83,11 @@ case class CometIcebergNativeScanExec(
7983
}
8084

8185
/**
82-
* Lazy partition serialization - computed after doPrepare() resolves DPP.
86+
* Lazy partition serialization - deferred until execution time for DPP support.
87+
*
88+
* Entry points: This lazy val may be triggered from either doExecuteColumnar() (via
89+
* commonData/perPartitionData) or capturedMetricValues (for Iceberg metrics). Lazy val
90+
* semantics ensure single evaluation regardless of entry point.
8391
*
8492
* DPP (Dynamic Partition Pruning) Flow:
8593
*
@@ -92,10 +100,10 @@ case class CometIcebergNativeScanExec(
92100
* Execution time:
93101
* 1. Spark calls prepare() on the plan tree
94102
* - doPrepare() calls e.plan.prepare() for each DPP filter
95-
* - Broadcast exchange starts materializing
103+
* - Subquery plans are set up (but not yet executed)
96104
*
97-
* 2. Spark calls doExecuteColumnar()
98-
* - Accesses perPartitionData
105+
* 2. Spark calls doExecuteColumnar() (or metrics are accessed)
106+
* - Accesses perPartitionData (or capturedMetricValues)
99107
* - Forces serializedPartitionData evaluation (here)
100108
* - Waits for DPP values (updateResult or reflection)
101109
* - Calls serializePartitions with DPP-filtered inputRDD
@@ -216,13 +224,20 @@ case class CometIcebergNativeScanExec(
216224
}
217225
}
218226

227+
/**
228+
* Captures Iceberg planning metrics for display in Spark UI.
229+
*
230+
* This lazy val intentionally triggers serializedPartitionData evaluation because Iceberg
231+
* populates metrics during planning (when inputRDD is accessed). Both this and
232+
* doExecuteColumnar() may trigger serializedPartitionData, but lazy val semantics ensure it's
233+
* evaluated only once.
234+
*/
219235
@transient private lazy val capturedMetricValues: Seq[MetricValue] = {
220236
// Guard against null originalPlan (from doCanonicalize)
221237
if (originalPlan == null) {
222238
Seq.empty
223239
} else {
224-
// Force serializedPartitionData evaluation first - this triggers serializePartitions which
225-
// accesses inputRDD, which triggers Iceberg planning and populates metrics
240+
// Trigger serializedPartitionData to ensure Iceberg planning has run and metrics are populated
226241
val _ = serializedPartitionData
227242

228243
originalPlan.metrics

0 commit comments

Comments
 (0)