Skip to content

Commit 6b8931a

Browse files
andygroveclaude
andcommitted
fix: [iceberg] Keep deep copy for Iceberg Java integration scan path
CometScanWrapper unconditionally set isFfiSafe=true, which told native ScanExec to skip deep copies for all scans. This is correct for CometScanExec (native_iceberg_compat) which now uses immutable Arrow readers, but incorrect for CometBatchScanExec (Iceberg Java integration via SupportsComet) which still uses mutable buffers. Make isFfiSafe conditional on the scan type: true for CometScanExec, false for CometBatchScanExec. Also remove the stale hasScanUsingMutableBuffers check for CometScanExec since PR apache#3411 replaced mutable buffers with immutable Arrow readers. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 4fe6452 commit 6b8931a

4 files changed

Lines changed: 12 additions & 10 deletions

File tree

spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ package org.apache.comet.rules
2222
import org.apache.spark.sql.SparkSession
2323
import org.apache.spark.sql.catalyst.rules.Rule
2424
import org.apache.spark.sql.catalyst.util.sideBySide
25-
import org.apache.spark.sql.comet.{CometBatchScanExec, CometCollectLimitExec, CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec, CometPlan, CometScanExec, CometSparkToColumnarExec}
25+
import org.apache.spark.sql.comet.{CometBatchScanExec, CometCollectLimitExec, CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec, CometPlan, CometSparkToColumnarExec}
2626
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec}
2727
import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, SparkPlan}
2828
import org.apache.spark.sql.execution.adaptive.QueryStageExec
@@ -155,8 +155,6 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa
155155
* with such scans because the buffers may be modified after C2R reads them.
156156
*
157157
* This includes:
158-
* - CometScanExec with native_iceberg_compat and partition columns - uses
159-
* ConstantColumnReader
160158
* - CometBatchScanExec with CometParquetScan (V2 Parquet path) - uses BatchReader
161159
*/
162160
private def hasScanUsingMutableBuffers(op: SparkPlan): Boolean = {
@@ -165,9 +163,6 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa
165163
case c: ReusedExchangeExec => hasScanUsingMutableBuffers(c.child)
166164
case _ =>
167165
op.exists {
168-
case scan: CometScanExec =>
169-
scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT &&
170-
scan.relation.partitionSchema.nonEmpty
171166
case scan: CometBatchScanExec => scan.scan.isInstanceOf[CometParquetScan]
172167
case _ => false
173168
}

spark/src/main/scala/org/apache/comet/serde/operator/CometSink.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.comet.serde.QueryPlanSerde.{serializeDataType, supportedDataTy
3737
abstract class CometSink[T <: SparkPlan] extends CometOperatorSerde[T] {
3838

3939
/** Whether the data produced by the Comet operator is FFI safe */
40-
def isFfiSafe: Boolean = false
40+
def isFfiSafe(op: T): Boolean = false
4141

4242
override def enabledConfig: Option[ConfigEntry[Boolean]] = None
4343

@@ -61,7 +61,7 @@ abstract class CometSink[T <: SparkPlan] extends CometOperatorSerde[T] {
6161
} else {
6262
scanBuilder.setSource(source)
6363
}
64-
scanBuilder.setArrowFfiSafe(isFfiSafe)
64+
scanBuilder.setArrowFfiSafe(isFfiSafe(op))
6565

6666
val scanTypes = op.output.flatten { attr =>
6767
serializeDataType(attr.dataType)
@@ -93,7 +93,7 @@ object CometExchangeSink extends CometSink[SparkPlan] {
9393
*
9494
* Source of shuffle exchange batches is NativeBatchDecoderIterator.
9595
*/
96-
override def isFfiSafe: Boolean = true
96+
override def isFfiSafe(op: SparkPlan): Boolean = true
9797

9898
override def createExec(nativeOp: Operator, op: SparkPlan): CometNativeExec =
9999
CometSinkPlaceHolder(nativeOp, op, op)

spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ object CometBroadcastExchangeExec extends CometSink[BroadcastExchangeExec] {
271271
*
272272
* Source of broadcast exchange batches is ArrowStreamReader.
273273
*/
274-
override def isFfiSafe: Boolean = true
274+
override def isFfiSafe(op: BroadcastExchangeExec): Boolean = true
275275

276276
override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(
277277
CometConf.COMET_EXEC_BROADCAST_EXCHANGE_ENABLED)

spark/src/main/scala/org/apache/spark/sql/comet/operators.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2072,6 +2072,13 @@ case class CometSortMergeJoinExec(
20722072
}
20732073

20742074
object CometScanWrapper extends CometSink[SparkPlan] {
2075+
override def isFfiSafe(op: SparkPlan): Boolean = op match {
2076+
// CometScanExec (native_iceberg_compat) uses immutable Arrow readers
2077+
case _: CometScanExec => true
2078+
// CometBatchScanExec (Iceberg Java integration) still uses mutable buffers
2079+
case _ => false
2080+
}
2081+
20752082
override def createExec(nativeOp: Operator, op: SparkPlan): CometNativeExec = {
20762083
CometScanWrapper(nativeOp, op)
20772084
}

0 commit comments

Comments
 (0)