Skip to content

Commit 212ebef

Browse files
committed
Minor refactor for readability. spotless:apply
1 parent f0720f5 commit 212ebef

2 files changed

Lines changed: 13 additions & 43 deletions

File tree

spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala

Lines changed: 0 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -152,47 +152,6 @@ private[spark] class CometExecRDD(
152152

153153
object CometExecRDD {
154154

155-
/**
156-
* Creates an RDD for standalone Iceberg scan (no parent native operators).
157-
*/
158-
def apply(
159-
sc: SparkContext,
160-
commonData: Array[Byte],
161-
perPartitionData: Array[Array[Byte]],
162-
numOutputCols: Int,
163-
nativeMetrics: CometMetricNode): CometExecRDD = {
164-
165-
// Standalone mode needs a placeholder plan for PlanDataInjector to fill in.
166-
// PlanDataInjector correlates common/partition data by key (metadata_location for Iceberg).
167-
val common = OperatorOuterClass.IcebergScanCommon.parseFrom(commonData)
168-
val metadataLocation = common.getMetadataLocation
169-
170-
val placeholderCommon = OperatorOuterClass.IcebergScanCommon
171-
.newBuilder()
172-
.setMetadataLocation(metadataLocation)
173-
.build()
174-
val placeholderScan = OperatorOuterClass.IcebergScan
175-
.newBuilder()
176-
.setCommon(placeholderCommon)
177-
.build()
178-
val placeholderPlan = OperatorOuterClass.Operator
179-
.newBuilder()
180-
.setIcebergScan(placeholderScan)
181-
.build()
182-
.toByteArray
183-
184-
new CometExecRDD(
185-
sc,
186-
inputRDDs = Seq.empty,
187-
commonByKey = Map(metadataLocation -> commonData),
188-
perPartitionByKey = Map(metadataLocation -> perPartitionData),
189-
serializedPlan = placeholderPlan,
190-
defaultNumPartitions = perPartitionData.length,
191-
numOutputCols = numOutputCols,
192-
nativeMetrics = nativeMetrics,
193-
subqueries = Seq.empty)
194-
}
195-
196155
/**
197156
* Creates an RDD for native execution with optional per-partition planning data.
198157
*/

spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,8 @@ case class CometIcebergNativeScanExec(
237237
if (originalPlan == null) {
238238
Seq.empty
239239
} else {
240-
// Trigger serializedPartitionData to ensure Iceberg planning has run and metrics are populated
240+
// Trigger serializedPartitionData to ensure Iceberg planning has run and
241+
// metrics are populated
241242
val _ = serializedPartitionData
242243

243244
originalPlan.metrics
@@ -294,7 +295,17 @@ case class CometIcebergNativeScanExec(
294295
/** Executes using CometExecRDD - planning data is computed lazily on first access. */
295296
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
296297
val nativeMetrics = CometMetricNode.fromCometPlan(this)
297-
CometExecRDD(sparkContext, commonData, perPartitionData, output.length, nativeMetrics)
298+
val serializedPlan = CometExec.serializeNativePlan(nativeOp)
299+
CometExecRDD(
300+
sparkContext,
301+
inputRDDs = Seq.empty,
302+
commonByKey = Map(metadataLocation -> commonData),
303+
perPartitionByKey = Map(metadataLocation -> perPartitionData),
304+
serializedPlan = serializedPlan,
305+
numPartitions = perPartitionData.length,
306+
numOutputCols = output.length,
307+
nativeMetrics = nativeMetrics,
308+
subqueries = Seq.empty)
298309
}
299310

300311
/**

0 commit comments

Comments
 (0)