File tree Expand file tree Collapse file tree
spark/src/main/scala/org/apache/spark/sql/comet Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -55,7 +55,7 @@ private[spark] class CometExecPartition(
5555 */
5656private [spark] class CometExecRDD (
5757 sc : SparkContext ,
58- inputRDDs : Seq [RDD [ColumnarBatch ]],
58+ var inputRDDs : Seq [RDD [ColumnarBatch ]],
5959 commonByKey : Map [String , Array [Byte ]],
6060 @ transient perPartitionByKey : Map [String , Array [Array [Byte ]]],
6161 serializedPlan : Array [Byte ],
@@ -135,14 +135,19 @@ private[spark] class CometExecRDD(
135135
136136 // Duplicates logic from Spark's ZippedPartitionsBaseRDD.getPreferredLocations
137137 override def getPreferredLocations (split : Partition ): Seq [String ] = {
138- if (inputRDDs.isEmpty) return Nil
138+ if (inputRDDs == null || inputRDDs .isEmpty) return Nil
139139
140140 val idx = split.index
141141 val prefs = inputRDDs.map(rdd => rdd.preferredLocations(rdd.partitions(idx)))
142142 // Prefer nodes where all inputs are local; fall back to any input's preferred location
143143 val intersection = prefs.reduce((a, b) => a.intersect(b))
144144 if (intersection.nonEmpty) intersection else prefs.flatten.distinct
145145 }
146+
147+ override def clearDependencies (): Unit = {
148+ super .clearDependencies()
149+ inputRDDs = null
150+ }
146151}
147152
148153object CometExecRDD {
You can’t perform that action at this time.
0 commit comments