diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index d3f51dfbe2..585b5d6f2b 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -93,6 +93,17 @@ object CometConf extends ShimCometConf { .booleanConf .createWithEnvVarOrDefault("ENABLE_COMET", true) + val COMET_USE_PLANNER: ConfigEntry[Boolean] = conf("spark.comet.planner.enabled") + .category(CATEGORY_EXEC) + .doc( + "When true, Comet registers the single-rule CometPlanner in place of the legacy " + + "CometScanRule + CometExecRule pair. Default true. Flip to false to run the legacy " + + "path as a rollback. The choice is evaluated once at session extension injection, so " + + "changes after session creation do not switch rules. The selected rules assert on this " + + "flag at entry to surface configuration drift early.") + .booleanConf + .createWithDefault(true) + val COMET_NATIVE_SCAN_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.scan.enabled") .category(CATEGORY_SCAN) .doc( diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 679005d9b1..0b2133fca0 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -32,7 +32,8 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.internal.SQLConf import org.apache.comet.CometConf._ -import org.apache.comet.rules.{CometExecRule, CometPlanAdaptiveDynamicPruningFilters, CometReuseSubquery, CometScanRule, CometSpark34AqeDppFallbackRule, EliminateRedundantTransitions} +import org.apache.comet.planner.CometPlanner +import org.apache.comet.rules.{CometExecRule, CometPlanAdaptiveDynamicPruningFilters, CometReuseSubquery, CometScanRule, CometShuffleRule, CometSpark34AqeDppFallbackRule, EliminateRedundantTransitions} import org.apache.comet.shims.ShimCometSparkSessionExtensions /** @@ -93,18 +94,29 @@ class CometSparkSessionExtensions // Registered before CometScanRule/CometExecRule so tags are in place when conversion runs. // No-op on Spark 3.5+; see CometSpark34AqeDppFallbackRule's class docstring. injectPreSpark35QueryStagePrepRuleShim(extensions, CometSpark34AqeDppFallbackRule) - extensions.injectQueryStagePrepRule { session => CometScanRule(session) } - extensions.injectQueryStagePrepRule { session => CometExecRule(session) } + if (CometConf.COMET_USE_PLANNER.get()) { + extensions.injectQueryStagePrepRule { session => CometPlanner(session) } + // Covers the legacy `exec=off + shuffle=on` mode that CometPlanner intentionally + // leaves untouched. See CometShuffleRule for the removal plan. + extensions.injectQueryStagePrepRule { session => CometShuffleRule(session) } + } else { + extensions.injectQueryStagePrepRule { session => CometScanRule(session) } + extensions.injectQueryStagePrepRule { session => CometExecRule(session) } + } injectQueryStageOptimizerRuleShim(extensions, CometPlanAdaptiveDynamicPruningFilters) injectQueryStageOptimizerRuleShim(extensions, CometReuseSubquery) } case class CometScanColumnar(session: SparkSession) extends ColumnarRule { - override def preColumnarTransitions: Rule[SparkPlan] = CometScanRule(session) + override def preColumnarTransitions: Rule[SparkPlan] = + if (CometConf.COMET_USE_PLANNER.get()) CometPlanner(session) + else CometScanRule(session) } case class CometExecColumnar(session: SparkSession) extends ColumnarRule { - override def preColumnarTransitions: Rule[SparkPlan] = CometExecRule(session) + override def preColumnarTransitions: Rule[SparkPlan] = + if (CometConf.COMET_USE_PLANNER.get()) CometShuffleRule(session) + else CometExecRule(session) override def postColumnarTransitions: Rule[SparkPlan] = EliminateRedundantTransitions(session) diff --git a/spark/src/main/scala/org/apache/comet/planner/CometPlanner.scala b/spark/src/main/scala/org/apache/comet/planner/CometPlanner.scala new file mode 100644 index 0000000000..a3fbac4150 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/planner/CometPlanner.scala @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.planner + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{InputFileBlockLength, InputFileBlockStart, InputFileName} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometExec, CometNativeExec} +import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec + +import org.apache.comet.CometConf +import org.apache.comet.CometSparkSessionExtensions.isCometLoaded +import org.apache.comet.planner.phases.{NormalizePrePass, Phase1LikelyComet, Phase2Decision, Phase3Emit, SubqueryBroadcastRewrite} +import org.apache.comet.planner.tags.CometTags +import org.apache.comet.rules.RewriteJoin + +/** + * Single-pass compiler from Spark physical plans to Comet-accelerated plans. Replaces the + * two-rule split of CometScanRule and CometExecRule with a structured pipeline: + * + * 1. Pre-pass: NaN/zero normalization on float comparisons, RewriteJoin (SMJ to SHJ/BHJ). 2. + * Phase 1 (LIKELY_COMET annotator): predict per node whether serde supports it in isolation. + * Also index BroadcastExchangeExec to likely Comet consumers so Phase 2 can judge broadcast + * demand without retrying. 3. Phase 2 (DECISION annotator): per-operator demand-aware rules + * decide convert / passthrough / fallback. Catches cases like "columnar shuffle with JVM + * aggregate on both sides" upstream, replacing the revertRedundantColumnarShuffle post-pass. + * 4. Phase 3 (emitter): bottom-up transform that builds protobuf and constructs + * CometNativeExec subtrees. Sets logical link at construction. Tags emitted roots with + * COMET_CONVERTED for AQE short-circuit. 5. Post-pass: SubqueryBroadcast rewrite in a single + * expression walk. + * + * Idempotency: AQE re-runs the rule on each stage as it materializes. The top-level `apply` + * checks the root node's `COMET_CONVERTED` tag and short-circuits to `convertBlocks` when set, so + * we don't re-classify a tree we already emitted. + * + * Assertions guard invariants the rule relies on for correctness; if they trip, something further + * up the pipeline is violating a contract. + */ +case class CometPlanner(session: SparkSession) extends Rule[SparkPlan] with Logging { + + override def apply(plan: SparkPlan): SparkPlan = { + if (!isCometLoaded(conf)) { + logDebug("CometPlanner skip: Comet extension not loaded") + return plan + } + assert( + CometConf.COMET_USE_PLANNER.get(conf), + s"CometPlanner ran while ${CometConf.COMET_USE_PLANNER.key}=false. The legacy " + + "CometScanRule + CometExecRule should be the sole rules on this path. Either " + + "COMET_USE_PLANNER was flipped after session creation or CometPlanner was registered " + + "by mistake.") + + // Comet exec globally disabled OR root already converted (AQE re-entry): skip phase 1/2/3 + // but still run convertBlocks. AQE re-planning can graft a previously-emitted CometNativeExec + // subtree (via LogicalQueryStage) into a freshly Spark-planned outer plan; that subtree's + // top node may have been an interior node (no SerializedPlan) under the prior root. Without + // a fresh convertBlocks pass it would crash at execution with "should not be executed + // directly without a serialized plan". + val skipPhases = !CometConf.COMET_EXEC_ENABLED.get(conf) || + plan.getTagValue(CometTags.COMET_CONVERTED).isDefined + if (skipPhases) { + return convertBlocks(plan) + } + + val prepared = prePass(plan) + + // Pre-compute plan-wide flags once so per-node phases do not re-walk the tree. Any + // input_file_name / input_file_block_start / input_file_block_length reference anywhere in + // the plan disqualifies all native DataFusion scans; see V1ScanGate. + val hasInputFileExpressions = prepared.exists(node => + node.expressions.exists(_.exists { + case _: InputFileName | _: InputFileBlockStart | _: InputFileBlockLength => true + case _ => false + })) + + val broadcastConsumers = BroadcastConsumerIndex.build(prepared, conf) + + val context = PlanningContext( + session = session, + conf = conf, + broadcastConsumers = broadcastConsumers, + hasInputFileExpressions = hasInputFileExpressions) + + val annotated1 = phase1LikelyComet(prepared, context) + val annotated2 = phase2Decision(annotated1, context) + val emitted = phase3Emit(annotated2, context) + val reverted = revertOrphanedBroadcasts(emitted) + val cleaned = cleanupLogicalLinks(reverted) + val blocked = convertBlocks(cleaned) + val finalPlan = postPass(blocked, context) + + val nativeCount = countNative(finalPlan) + logDebug(s"CometPlanner: planId=${plan.id} nativeExecs=$nativeCount") + checkPostEmitInvariants(finalPlan) + finalPlan + } + + /** + * Walk the emitted plan top-down and serialize each native-subtree root. A Comet native + * operator delegates its entire block (itself plus all CometNativeExec descendants) to a single + * JNI call at execution time. The block root holds the serialized protobuf; children of a + * serialized root don't carry their own serialized plans because they're embedded in the root's + * protobuf. Without this step, block roots carry `SerializedPlan(None)` and trip the "should + * not be executed directly" guard in `CometNativeExec.doExecuteColumnar`. + * + * Block boundary rules (copied from `CometExecRule`): + * - first CometNativeExec seen is a block root. `convertBlock()` serializes its subtree. + * - subsequent CometNativeExec descendants inside that subtree are non-roots. + * - hitting a leaf CometNativeExec (e.g. a scan) resets `firstNativeOp` so a sibling subtree + * starts a new block. + * - hitting a non-CometNativeExec node ALSO resets (e.g. JVM-orchestrated + * `CometShuffleExchangeExec` / `CometBroadcastExchangeExec` / `CometCollectLimitExec` after + * placeholder unwrap). Without this the native subtree below a shuffle / broadcast never + * gets its root serialized. + * - `CometNativeWriteExec` resets too: it serializes its own nativeOp on demand, so its + * CometNativeExec children start fresh blocks. + */ + private def convertBlocks(plan: SparkPlan): SparkPlan = { + var firstNativeOp = true + val out = plan.transformDown { + case op: CometNativeExec => + val rewritten = if (firstNativeOp) { + firstNativeOp = false + op.convertBlock() + } else { + op + } + if (op.children.isEmpty) { + firstNativeOp = true + } + if (op.getClass.getSimpleName == "CometNativeWriteExec") { + firstNativeOp = true + } + rewritten + case op => + firstNativeOp = true + op + } + out + } + + /** + * Recovery pass for the classic "Phase 1 predicts parent is convertible, Phase 2 converts the + * broadcast child on that prediction, Phase 3 then fails to emit the parent (serde.convert + * returns None or the BHJ rejects complex join keys)" scenario. The resulting plan is Spark BHJ + * + CometBroadcastExchange, which crashes at runtime because Spark BHJ calls + * `buildSide.executeBroadcast` and Comet broadcast exchange is not a `BroadcastExchangeLike` + * that Spark can read directly. + * + * Walk the plan and for any non-native-compatible parent that holds a + * `CometBroadcastExchangeExec` child, substitute the original Spark `BroadcastExchangeExec` + * (preserving the already- converted Comet subtree under it). Spark's + * `ApplyColumnarRulesAndInsertTransitions` inserts `CometColumnarToRow` at the columnar-to-row + * boundary so execution works. + * + * Shuffle doesn't need the equivalent revert because a Spark parent with a Comet columnar + * shuffle child is handled naturally by Spark's transition insertion. + */ + private def revertOrphanedBroadcasts(plan: SparkPlan): SparkPlan = { + if (CometConf.COMET_EXEC_BROADCAST_FORCE_ENABLED.get()) { + return plan + } + var reverted = 0 + val out = plan.transformUp { + case parent if !isNativeCompatible(parent) && hasCometBroadcastChild(parent) => + val newChildren = parent.children.map { + case c: CometBroadcastExchangeExec => + reverted += 1 + BroadcastExchangeExec(c.mode, c.child) + case other => other + } + parent.withNewChildren(newChildren) + case op => op + } + if (reverted > 0) logDebug(s"CometPlanner: reverted $reverted orphaned broadcasts") + out + } + + private def isNativeCompatible(node: SparkPlan): Boolean = + node.isInstanceOf[CometNativeExec] || node.getTagValue(CometTags.NATIVE_OP).isDefined + + private def hasCometBroadcastChild(parent: SparkPlan): Boolean = + parent.children.exists(_.isInstanceOf[CometBroadcastExchangeExec]) + + /** + * Reconcile each Comet operator's `logicalLink` with its `originalPlan.logicalLink`. For every + * `CometExec` / `CometShuffleExchangeExec` / `CometBroadcastExchangeExec`: + * - if `originalPlan.logicalLink.isDefined`, copy that link onto the Comet operator. + * - if `originalPlan.logicalLink.isEmpty`, explicitly unset both `LOGICAL_PLAN_TAG` and + * `LOGICAL_PLAN_INHERITED_TAG` on the Comet operator. + * + * The unset branch is the load-bearing one. Spark's `SparkPlan.setLogicalLink` recurses into + * children, writing `LOGICAL_PLAN_INHERITED_TAG` on every descendant that lacks its own + * `LOGICAL_PLAN_TAG` (recursion stops at descendants that already have a tag of their own). + * Phase 3 sets logical links bottom-up while emitting, so when the parent join emits and calls + * `setLogicalLink`, propagation reaches a `CometShuffleExchangeExec` whose source Spark + * exchange had no logical link of its own. The exchange now carries an inherited link that + * points at the parent join's logical node rather than its own (stage-boundary) logical node. + * + * Why it matters: AQE's `AdaptiveSparkPlanExec.replaceWithQueryStagesInLogicalPlan` walks the + * current physical plan and, for each materialized query stage, locates a physical match for + * the stage's logical node via `physicalNode.logicalLink.exists(logicalNode.eq)`. A stale + * inherited link makes `collectFirst` pick the wrong physical node (typically a Comet operator + * far above the stage) and that whole subtree becomes a `LogicalQueryStage`. On re-planning via + * `LogicalQueryStageStrategy`, the captured physical subtree is returned verbatim, so the + * already-Comet ancestor survives a re-plan that would otherwise produce a fresh Spark plan. + * + * Concrete failure mode (regression test: `CometExecSuite."CometBroadcastExchange could be + * converted to rows using CometColumnarToRow"`): second `df.collect` with + * `COMET_EXEC_ENABLED=false` is supposed to produce Spark BHJ over a reused materialized + * `CometBroadcastExchange` query stage. Without this pass, the inner `CometSortMergeJoin` from + * the first collect survives via `LogicalQueryStage` (the stale inherited link routes the wrap + * to the SMJ instead of the shuffle stage), and the outer broadcast gets re-planned as a fresh + * Spark `BroadcastExchange` instead of reusing the Comet one. Mirrors the post-conversion "Set + * up logical links" pass in legacy `CometExecRule` (rules/CometExecRule.scala). Run after Phase + * 3 emit, before convertBlocks. + */ + private def cleanupLogicalLinks(plan: SparkPlan): SparkPlan = { + var unset = 0 + var set = 0 + val out = plan.transform { + case op: CometExec => + if (op.originalPlan.logicalLink.isEmpty) { + op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG) + op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG) + unset += 1 + } else { + op.originalPlan.logicalLink.foreach(op.setLogicalLink) + set += 1 + } + op + case op: CometShuffleExchangeExec => + if (op.originalPlan.logicalLink.isEmpty) { + op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG) + op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG) + unset += 1 + } else { + op.originalPlan.logicalLink.foreach(op.setLogicalLink) + set += 1 + } + op + case op: CometBroadcastExchangeExec => + if (op.originalPlan.logicalLink.isEmpty) { + op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG) + op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG) + unset += 1 + } else { + op.originalPlan.logicalLink.foreach(op.setLogicalLink) + set += 1 + } + op + } + if (set + unset > 0) logDebug(s"CometPlanner: cleanupLogicalLinks set=$set unset=$unset") + out + } + + /** + * Pre-pass: expression-level rewrites that change operator structure. Float NaN/zero + * normalization around comparison operators (arrow-rs doesn't normalize) and SMJ-to-SHJ/BHJ + * join rewriting. Both must run before classification because they change the set of node types + * the later phases see. + */ + private def prePass(plan: SparkPlan): SparkPlan = { + val normalized = NormalizePrePass(plan) + if (CometConf.COMET_REPLACE_SMJ.get()) { + normalized.transformUp { case p => RewriteJoin.rewrite(p) } + } else { + normalized + } + } + + /** + * Phase 1: tag each node with LIKELY_COMET based on serde's expression-sensitive support check. + * Ignores child gating. Also builds the broadcast consumer index. + */ + private def phase1LikelyComet(plan: SparkPlan, ctx: PlanningContext): SparkPlan = + Phase1LikelyComet(plan, ctx.conf) + + /** + * Phase 2: per-operator demand rules produce a DECISION tag (Convert / Passthrough / Fallback). + * Top-down so the parent's LIKELY_COMET is known when each node is visited. + */ + private def phase2Decision(plan: SparkPlan, ctx: PlanningContext): SparkPlan = + Phase2Decision(plan, ctx) + + /** + * Phase 3: emit protobuf and construct CometNativeExec subtrees for nodes tagged + * DECISION=Convert. Wires children; sets logical link and COMET_CONVERTED tag at construction. + */ + private def phase3Emit(plan: SparkPlan, ctx: PlanningContext): SparkPlan = + Phase3Emit(ctx.session)(plan) + + /** + * Post-pass: single expression walk via transformAllExpressions rewriting SubqueryBroadcastExec + * to CometSubqueryBroadcastExec (non-AQE) and wrapping SAB in + * CometSubqueryAdaptiveBroadcastExec (AQE 3.5+). Replaces the per-node invocation inside the + * old CometExecRule's transformUp. + */ + private def postPass(plan: SparkPlan, ctx: PlanningContext): SparkPlan = + SubqueryBroadcastRewrite(plan) + + private def countNative(plan: SparkPlan): Int = { + var n = 0 + plan.foreach { + case _: CometNativeExec => n += 1 + case _ => + } + n + } + + /** + * Post-emission invariants. Any violation means a prior phase produced an inconsistent plan + * that would either crash at execution or silently produce wrong results. Fail loud during the + * pre-cutover debugging phase; these become `logWarn` with fallback after stabilization. + */ + private def checkPostEmitInvariants(plan: SparkPlan): Unit = { + plan.foreach { node => + assert( + !node.getClass.getName.contains("CometBatchScanExec"), + "CometBatchScanExec found in emitted plan. CometPlanner should emit " + + s"CometIcebergNativeScanExec / CometCsvNativeScanExec directly. node=$node") + assert( + !node.getClass.getName.endsWith(".CometSinkPlaceHolder") && + !node.getClass.getName.endsWith(".CometScanWrapper"), + s"Placeholder wrapper (${node.getClass.getSimpleName}) survived Phase 3. Every serde " + + "that returns a wrapper should be unwrapped via NATIVE_OP tag inside runSerde.") + // A CometNativeExec that is not COMET_CONVERTED shouldn't exist post-Phase-3 because + // Phase 3 sets the tag at every emission site. If one appears without the tag, some + // emission path forgot to tag and AQE re-entries will try to re-convert it. + node match { + case n: CometNativeExec => + assert( + n.getTagValue(CometTags.COMET_CONVERTED).isDefined, + s"CometNativeExec missing COMET_CONVERTED tag. node=$n") + case _ => + } + } + } +} diff --git a/spark/src/main/scala/org/apache/comet/planner/PlanningContext.scala b/spark/src/main/scala/org/apache/comet/planner/PlanningContext.scala new file mode 100644 index 0000000000..4ee979e896 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/planner/PlanningContext.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.planner + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec} +import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec +import org.apache.spark.sql.internal.SQLConf + +import org.apache.comet.planner.phases.Phase1LikelyComet + +/** + * State threaded through CometPlanner's three phases. Holds session-scoped data (configs, the + * active SparkSession), a few plan-wide flags computed once at the top of `apply` so per-node + * phases do not re-walk the whole tree, plus the broadcast consumer index computed in Phase 1 and + * consulted during Phase 2's broadcast decision. + * + * Constructed once per CometPlanner.apply invocation. Not shared across invocations because AQE + * re-entries reset session state. + */ +case class PlanningContext( + session: SparkSession, + conf: SQLConf, + broadcastConsumers: BroadcastConsumerIndex, + hasInputFileExpressions: Boolean) + +/** + * Index of BroadcastExchangeExec instances to whether a likely-Comet consumer (a + * BroadcastHashJoinExec that would itself be LIKELY_COMET) exists in the plan. Built once during + * Phase 1 by walking the plan looking for joins that reference broadcast outputs. + * + * Replaces the convertNode retry-on-self-with-new-children pattern in the old CometExecRule: + * instead of converting children then re-asking whether to convert the parent, Phase 2 just reads + * the index. + */ +trait BroadcastConsumerIndex { + def isConsumedByCometCandidate(broadcast: BroadcastExchangeExec): Boolean +} + +object BroadcastConsumerIndex extends Logging { + + /** Empty index. Used as a starting placeholder or in tests. */ + val Empty: BroadcastConsumerIndex = new BroadcastConsumerIndex { + override def isConsumedByCometCandidate(broadcast: BroadcastExchangeExec): Boolean = false + } + + /** + * Walks `plan` looking for `BroadcastHashJoinExec` nodes that would themselves be LIKELY_COMET + * under the current configuration. For each such join, records every `BroadcastExchangeExec` it + * references as a consumer. Handles the AQE wrappers (`BroadcastQueryStageExec`, + * `ReusedExchangeExec`) that hide the raw broadcast between planning and execution. + */ + def build(plan: SparkPlan, conf: SQLConf): BroadcastConsumerIndex = { + val consumed = new java.util.IdentityHashMap[BroadcastExchangeExec, java.lang.Boolean]() + plan.foreach { + case bhj: BroadcastHashJoinExec if Phase1LikelyComet.isLikelyComet(bhj, conf) => + bhj.children.foreach(indexBroadcast(_, consumed)) + case _ => + } + logDebug(s"BroadcastConsumerIndex built size=${consumed.size}") + new BroadcastConsumerIndex { + override def isConsumedByCometCandidate(broadcast: BroadcastExchangeExec): Boolean = + consumed.containsKey(broadcast) + } + } + + private def indexBroadcast( + node: SparkPlan, + consumed: java.util.IdentityHashMap[BroadcastExchangeExec, java.lang.Boolean]): Unit = + node match { + case b: BroadcastExchangeExec => + consumed.put(b, java.lang.Boolean.TRUE) + case BroadcastQueryStageExec(_, b: BroadcastExchangeExec, _) => + consumed.put(b, java.lang.Boolean.TRUE) + case BroadcastQueryStageExec(_, ReusedExchangeExec(_, b: BroadcastExchangeExec), _) => + consumed.put(b, java.lang.Boolean.TRUE) + case ReusedExchangeExec(_, b: BroadcastExchangeExec) => + consumed.put(b, java.lang.Boolean.TRUE) + case _ => + } +} diff --git a/spark/src/main/scala/org/apache/comet/planner/gates/S2CGate.scala b/spark/src/main/scala/org/apache/comet/planner/gates/S2CGate.scala new file mode 100644 index 0000000000..baf7ce1d57 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/planner/gates/S2CGate.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.planner.gates + +import scala.collection.mutable.ListBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.comet.CometSparkToColumnarExec +import org.apache.spark.sql.execution.{FileSourceScanExec, LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat +import org.apache.spark.sql.execution.datasources.json.JsonFileFormat +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan +import org.apache.spark.sql.execution.datasources.v2.json.JsonScan +import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan +import org.apache.spark.sql.internal.SQLConf + +import org.apache.comet.CometConf +import org.apache.comet.CometConf._ + +/** + * Decides whether a non-Comet leaf should be wrapped in `CometSparkToColumnarExec` to bridge + * row-at-a-time data into a Comet-consuming parent. Ports + * `CometExecRule.shouldApplySparkToColumnar` and `isSparkToArrowEnabled` into a single entry + * point used by Phase 2. Demand gating (parent must be LIKELY_COMET) is applied by Phase 2 + * itself, not here. + */ +object S2CGate extends Logging { + + def shouldApply(op: SparkPlan, conf: SQLConf): Boolean = { + val fallbackReasons = new ListBuffer[String]() + if (!CometSparkToColumnarExec.isSchemaSupported(op.schema, fallbackReasons)) { + logDebug( + s"S2CGate reject schemaUnsupported node=${op.getClass.getSimpleName} " + + s"id=${op.id} reasons=${fallbackReasons.mkString(" | ")}") + return false + } + + op match { + case scan: FileSourceScanExec => + scan.relation.fileFormat match { + case _: CSVFileFormat => CometConf.COMET_CONVERT_FROM_CSV_ENABLED.get(conf) + case _: JsonFileFormat => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf) + case _: ParquetFileFormat => CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.get(conf) + case _ => isSparkToArrowEnabled(op, conf) + } + case scan: BatchScanExec => + scan.scan match { + case _: CSVScan => CometConf.COMET_CONVERT_FROM_CSV_ENABLED.get(conf) + case _: JsonScan => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf) + case _: ParquetScan => CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.get(conf) + case _ => isSparkToArrowEnabled(op, conf) + } + case _: LeafExecNode => + isSparkToArrowEnabled(op, conf) + case _ => + // Matches the old rule's conservative behavior. Non-leaf intermediate operators are + // not wrapped in CometSparkToColumnarExec today. + false + } + } + + private def isSparkToArrowEnabled(op: SparkPlan, conf: SQLConf): Boolean = { + COMET_SPARK_TO_ARROW_ENABLED.get(conf) && { + // Derive the operator name from the class name, not op.nodeName. Some operators + // override nodeName (e.g. `InMemoryTableScanExec` returns `"Scan "`), + // which would never match the "InMemoryTableScan" entry in the allowlist. + val derivedName = op.getClass.getSimpleName.replaceAll("Exec$", "") + COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST.get(conf).contains(derivedName) + } + } +} diff --git a/spark/src/main/scala/org/apache/comet/planner/gates/V1ScanGate.scala b/spark/src/main/scala/org/apache/comet/planner/gates/V1ScanGate.scala new file mode 100644 index 0000000000..6dca628407 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/planner/gates/V1ScanGate.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.planner.gates + +import scala.collection.mutable.ListBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{Expression, GenericInternalRow} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} +import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues +import org.apache.spark.sql.comet.CometScanExec +import org.apache.spark.sql.execution.{FileSourceScanExec, InSubqueryExec, SubqueryAdaptiveBroadcastExec} +import org.apache.spark.sql.execution.datasources.HadoopFsRelation +import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils +import org.apache.spark.sql.internal.SQLConf + +import org.apache.comet.CometConf._ +import org.apache.comet.CometSparkSessionExtensions.isSpark35Plus +import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled, isEncryptionConfigSupported} +import org.apache.comet.rules.CometScanTypeChecker +import org.apache.comet.serde.operator.CometNativeScan +import org.apache.comet.shims.ShimFileFormat + +sealed trait V1ScanClassification + +object V1ScanClassification { + case object Convertible extends V1ScanClassification + final case class NotConvertible(reasons: Set[String]) extends V1ScanClassification +} + +/** + * Classifies a V1 FileSourceScanExec for CometPlanner. Ports the gates from + * `CometScanRule.transformV1Scan` and `nativeDataFusionScan` into a pure ADT. Callers attach + * `withInfo` entries from the returned reasons themselves. + * + * Plan-wide checks (input_file_name / input_file_block_start / input_file_block_length) require + * the caller to pre-compute `hasInputFileExpressions` once per `CometPlanner.apply` invocation. + * V1ScanGate only receives the boolean so it does not re-walk the full plan per scan. + * + * Duplicates validation that also lives in `CometScanRule.transformV1Scan`. Both copies are live + * (one per registered rule path) until the legacy rule is deleted. + */ +object V1ScanGate extends Logging { + + def classify( + scanExec: FileSourceScanExec, + session: SparkSession, + conf: SQLConf, + hasInputFileExpressions: Boolean): V1ScanClassification = { + val reasons = new ListBuffer[String]() + + def reject(reason: String): V1ScanClassification = { + reasons += reason + logDebug(s"V1ScanGate reject scan=${scanExec.id} reason=$reason") + V1ScanClassification.NotConvertible(reasons.toSet) + } + + if (!isSpark35Plus && scanExec.partitionFilters.exists(isAqeDynamicPruningFilter)) { + return reject("AQE Dynamic Partition Pruning requires Spark 3.5+") + } + + val r = scanExec.relation match { + case rel: HadoopFsRelation => rel + case other => + return reject(s"Unsupported relation $other") + } + + if (!CometScanExec.isFileFormatSupported(r.fileFormat)) { + return reject(s"Unsupported file format ${r.fileFormat}") + } + + val hadoopConf = r.sparkSession.sessionState.newHadoopConfWithOptions(r.options) + + val possibleDefaultValues = getExistenceDefaultValues(scanExec.requiredSchema) + if (possibleDefaultValues.exists(d => + d != null && (d.isInstanceOf[ArrayBasedMapData] + || d.isInstanceOf[GenericInternalRow] + || d.isInstanceOf[GenericArrayData]))) { + return reject( + "Full native scan disabled because default values for nested types are not supported") + } + + // CometNativeScan.isSupported covers COMET_EXEC_ENABLED, the AQE-DPP-on-3.4 safety net, + // ignoreCorruptFiles and ignoreMissingFiles. Today it records fallbacks via `withInfo` + // side effects on the scan. That is acceptable: those messages should reach explain + // output regardless of which rule decides to fall back. + if (!CometNativeScan.isSupported(scanExec)) { + return reject(s"$SCAN_NATIVE_DATAFUSION scan unsupported. See scan info for details.") + } + + if (encryptionEnabled(hadoopConf) && !isEncryptionConfigSupported(hadoopConf)) { + return reject(s"$SCAN_NATIVE_DATAFUSION does not support encryption") + } + + if (scanExec.fileConstantMetadataColumns.nonEmpty) { + return reject("Native DataFusion scan does not support metadata columns") + } + + // input_file_name, input_file_block_start and input_file_block_length read from + // InputFileBlockHolder, a thread-local that Spark's FileScanRDD populates. The native + // DataFusion scan bypasses FileScanRDD, so these expressions would see empty values. + if (hasInputFileExpressions) { + return reject( + "Native DataFusion scan is not compatible with input_file_name, " + + "input_file_block_start, or input_file_block_length") + } + + if (ShimFileFormat.findRowIndexColumnIndexInSchema(scanExec.requiredSchema) >= 0) { + return reject("Native DataFusion scan does not support row index generation") + } + + if (session.sessionState.conf.getConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED) && + ParquetUtils.hasFieldIds(scanExec.requiredSchema)) { + return reject("Native DataFusion scan does not support Parquet field ID matching") + } + + val typeChecker = CometScanTypeChecker(SCAN_NATIVE_DATAFUSION) + val schemaFallback = new ListBuffer[String]() + val schemaSupported = + typeChecker.isSchemaSupported(scanExec.requiredSchema, schemaFallback) + if (!schemaSupported) { + return reject( + s"Unsupported schema ${scanExec.requiredSchema} " + + s"for $SCAN_NATIVE_DATAFUSION. ${schemaFallback.mkString(", ")}") + } + val partitionSchemaSupported = + typeChecker.isSchemaSupported(r.partitionSchema, schemaFallback) + if (!partitionSchemaSupported) { + return reject( + s"Unsupported partitioning schema ${r.partitionSchema} " + + s"for $SCAN_NATIVE_DATAFUSION. ${schemaFallback.mkString(", ")}") + } + + V1ScanClassification.Convertible + } + + private def isAqeDynamicPruningFilter(e: Expression): Boolean = + e.exists { + case sub: InSubqueryExec => sub.plan.isInstanceOf[SubqueryAdaptiveBroadcastExec] + case _ => false + } +} diff --git a/spark/src/main/scala/org/apache/comet/planner/gates/V2ScanClassifier.scala b/spark/src/main/scala/org/apache/comet/planner/gates/V2ScanClassifier.scala new file mode 100644 index 0000000000..0e90b828e9 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/planner/gates/V2ScanClassifier.scala @@ -0,0 +1,477 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.planner.gates + +import scala.collection.mutable.ListBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpression} +import org.apache.spark.sql.catalyst.util.MetadataColumnHelper +import org.apache.spark.sql.comet.CometBatchScanExec +import org.apache.spark.sql.execution.{InSubqueryExec, SubqueryAdaptiveBroadcastExec} +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan +import org.apache.spark.sql.internal.SQLConf + +import org.apache.comet.CometConf._ +import org.apache.comet.DataTypeSupport.isComplexType +import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection} +import org.apache.comet.objectstore.NativeConfig +import org.apache.comet.rules.{CometScanRule, CometScanTypeChecker} +import org.apache.comet.serde.operator.CometIcebergNativeScan +import org.apache.comet.shims.ShimSubqueryBroadcast + +/** + * Classifies a V2 BatchScanExec for CometPlanner. Wraps the validation and metadata extraction + * that CometScanRule.transformV2Scan runs today, with two differences: returns a pure ADT (no + * CometBatchScanExec plan wrapper), and never mutates the scan's info set. Callers attach + * withInfo entries from the returned reasons themselves. + * + * Duplicates logic that also lives in `CometScanRule.transformV2Scan`. Both copies are live (one + * per registered rule path) until the legacy rule is deleted. + */ +sealed trait V2ScanClassification + +object V2ScanClassification { + + /** + * Iceberg scan through iceberg-rust. Carries extracted metadata so Phase 3 can build protobuf + * directly without another reflection round-trip. + */ + final case class IcebergConvertible(metadata: CometIcebergNativeScanMetadata) + extends V2ScanClassification + + /** Native CSV V2 scan. No metadata needed at planning time. */ + case object CsvConvertible extends V2ScanClassification + + /** Not convertible. Carries human-readable fallback reasons for `withInfo`. */ + final case class NotConvertible(reasons: Set[String]) extends V2ScanClassification +} + +object V2ScanClassifier extends Logging with ShimSubqueryBroadcast { + + def classify(scanExec: BatchScanExec, conf: SQLConf): V2ScanClassification = { + // Mirrors CometScanRule's pre-dispatch gate: any metadata column reference (e.g. Iceberg's + // `_file`, `_pos`, `_deleted`, `_spec_id`, `_partition`, `_change_type`) disqualifies the + // scan from native conversion because iceberg-rust / DataFusion's Parquet reader don't + // populate those columns. + if (hasMetadataCol(scanExec)) { + return V2ScanClassification.NotConvertible(Set("Metadata column is not supported")) + } + val result = scanExec.scan match { + case scan: CSVScan if COMET_CSV_V2_NATIVE_ENABLED.get(conf) => + classifyCsv(scan, scanExec) + + case _ + if scanExec.scan.getClass.getName == + "org.apache.iceberg.spark.source.SparkBatchQueryScan" => + classifyIceberg(scanExec, conf) + + case other => + V2ScanClassification.NotConvertible( + Set( + s"Unsupported scan: ${other.getClass.getName}. " + + "Comet Scan only supports Parquet and Iceberg Parquet file formats")) + } + result match { + case V2ScanClassification.IcebergConvertible(metadata) => + assert( + metadata.metadataLocation != null && metadata.metadataLocation.nonEmpty, + s"IcebergConvertible returned without metadataLocation scan=${scanExec.id}") + case _ => + } + result + } + + private def classifyCsv(scan: CSVScan, scanExec: BatchScanExec): V2ScanClassification = { + val fallbackReasons = new ListBuffer[String]() + val schemaSupported = + CometBatchScanExec.isSchemaSupported(scan.readDataSchema, fallbackReasons) + if (!schemaSupported) { + fallbackReasons += s"Schema ${scan.readDataSchema} is not supported" + } + val partitionSchemaSupported = + CometBatchScanExec.isSchemaSupported(scan.readPartitionSchema, fallbackReasons) + if (!partitionSchemaSupported) { + fallbackReasons += s"Partition schema ${scan.readPartitionSchema} is not supported" + } + val corruptedRecordsColumnName = + SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD) + val containsCorruptedRecordsColumn = + !scan.readDataSchema.fieldNames.contains(corruptedRecordsColumnName) + if (!containsCorruptedRecordsColumn) { + fallbackReasons += "Comet doesn't support the processing of corrupted records" + } + val isInferSchemaEnabled = scan.options.getBoolean("inferSchema", false) + if (isInferSchemaEnabled) { + fallbackReasons += "Comet doesn't support inferSchema=true option" + } + val delimiter = + Option(scan.options.get("delimiter")) + .orElse(Option(scan.options.get("sep"))) + .getOrElse(",") + val isSingleCharacterDelimiter = delimiter.length == 1 + if (!isSingleCharacterDelimiter) { + fallbackReasons += + s"Comet supports only single-character delimiters, but got: '$delimiter'" + } + + if (schemaSupported && partitionSchemaSupported && containsCorruptedRecordsColumn + && !isInferSchemaEnabled && isSingleCharacterDelimiter) { + V2ScanClassification.CsvConvertible + } else { + V2ScanClassification.NotConvertible(fallbackReasons.toSet) + } + } + + private def classifyIceberg(scanExec: BatchScanExec, conf: SQLConf): V2ScanClassification = { + val fallbackReasons = new ListBuffer[String]() + + // Iceberg metadata tables (e.g. `t.snapshots`, `t.history`, `t.manifests`) still produce a + // SparkBatchQueryScan via SparkTable.newScanBuilder, so without this early reject we would + // run through full metadata extraction and eventually fail inside + // validateIcebergFileScanTasks (metadata DataTasks don't implement ContentScanTask.file()). + // Mirrors CometScanRule.isIcebergMetadataTable. + if (isIcebergMetadataTable(scanExec)) { + fallbackReasons += "Iceberg Metadata tables are not supported" + return V2ScanClassification.NotConvertible(fallbackReasons.toSet) + } + + if (!COMET_ICEBERG_NATIVE_ENABLED.get(conf)) { + fallbackReasons += "Native Iceberg scan disabled because " + + s"${COMET_ICEBERG_NATIVE_ENABLED.key} is not enabled" + return V2ScanClassification.NotConvertible(fallbackReasons.toSet) + } + + if (!COMET_EXEC_ENABLED.get(conf)) { + fallbackReasons += "Native Iceberg scan disabled because " + + s"${COMET_EXEC_ENABLED.key} is not enabled" + return V2ScanClassification.NotConvertible(fallbackReasons.toSet) + } + + val typeChecker = CometScanTypeChecker(SCAN_NATIVE_DATAFUSION) + val schemaSupported = + typeChecker.isSchemaSupported(scanExec.scan.readSchema(), fallbackReasons) + if (!schemaSupported) { + fallbackReasons += "Comet extension is not enabled for " + + s"${scanExec.scan.getClass.getSimpleName}: Schema not supported" + } + + val tableOpt = IcebergReflection.getTable(scanExec.scan) + val metadataLocationOpt = + tableOpt.flatMap(table => IcebergReflection.getMetadataLocation(table)) + + val metadataOpt = metadataLocationOpt.flatMap { metadataLocation => + try { + val session = org.apache.spark.sql.SparkSession.active + val hadoopConf = session.sessionState.newHadoopConf() + + // REST catalogs may not have the metadata file on disk; use the table location as a + // fallback so FileIO initialisation succeeds against the remote object store. + val metadataUri = new java.net.URI(metadataLocation) + val metadataFile = new java.io.File(metadataUri.getPath) + val effectiveLocation = + if (!metadataFile.exists() && metadataUri.getScheme == "file") { + tableOpt + .flatMap { table => + try { + val locationMethod = table.getClass.getMethod("location") + val tableLocation = locationMethod.invoke(table).asInstanceOf[String] + Some(tableLocation) + } catch { + case _: Exception => Some(metadataLocation) + } + } + .getOrElse(metadataLocation) + } else { + metadataLocation + } + + val effectiveUri = new java.net.URI(effectiveLocation) + val hadoopS3Options = NativeConfig.extractObjectStoreOptions(hadoopConf, effectiveUri) + val hadoopDerivedProperties = + CometIcebergNativeScan.hadoopToIcebergS3Properties(hadoopS3Options) + + // FileIO properties take precedence because they contain per-table vended credentials. + val fileIOProperties = tableOpt + .flatMap(IcebergReflection.getFileIOProperties) + .map(CometIcebergNativeScan.filterStorageProperties) + .getOrElse(Map.empty) + + val catalogProperties = hadoopDerivedProperties ++ fileIOProperties + CometIcebergNativeScanMetadata.extract( + scanExec.scan, + effectiveLocation, + catalogProperties) + } catch { + case e: Exception => + logError(s"Failed to extract catalog properties from Iceberg scan: ${e.getMessage}", e) + None + } + } + + val metadata = metadataOpt match { + case Some(m) => m + case None => + fallbackReasons += "Failed to extract Iceberg metadata via reflection" + return V2ScanClassification.NotConvertible(fallbackReasons.toSet) + } + + val fileIOCompatible = IcebergReflection.getFileIO(metadata.table) match { + case Some(fileIO) + if fileIO.getClass.getName == "org.apache.iceberg.inmemory.InMemoryFileIO" => + fallbackReasons += "InMemoryFileIO is not supported by Comet's native reader" + false + case Some(_) => true + case None => + fallbackReasons += "Could not check FileIO compatibility" + false + } + + val formatVersionSupported = IcebergReflection.getFormatVersion(metadata.table) match { + case Some(formatVersion) => + if (formatVersion > 2) { + fallbackReasons += "Iceberg table format version " + + s"$formatVersion is not supported. " + + "Comet only supports Iceberg table format V1 and V2" + false + } else { + true + } + case None => + fallbackReasons += "Could not verify Iceberg table format version" + false + } + + val taskValidation = + try { + CometScanRule.validateIcebergFileScanTasks(metadata.tasks) + } catch { + case e: Exception => + fallbackReasons += "Iceberg reflection failure: Could not validate " + + s"FileScanTasks: ${e.getMessage}" + return V2ScanClassification.NotConvertible(fallbackReasons.toSet) + } + + val allSupportedFilesystems = if (taskValidation.unsupportedSchemes.isEmpty) { + true + } else { + fallbackReasons += "Iceberg scan contains files with unsupported filesystem " + + s"schemes: ${taskValidation.unsupportedSchemes.mkString(", ")}. " + + "Comet only supports: file, s3, s3a, gs, gcs, oss, abfss, abfs, wasbs, wasb" + false + } + + if (!taskValidation.allParquet) { + fallbackReasons += "Iceberg scan contains non-Parquet files (ORC or Avro). " + + "Comet only supports Parquet files in Iceberg tables" + } + + val partitionTypesSupported = (for { + partitionSpec <- IcebergReflection.getPartitionSpec(metadata.table) + } yield { + val unsupportedTypes = + IcebergReflection.validatePartitionTypes(partitionSpec, metadata.scanSchema) + if (unsupportedTypes.nonEmpty) { + unsupportedTypes.foreach { case (fieldName, typeStr, reason) => + fallbackReasons += + s"Partition column '$fieldName' with type $typeStr is not yet supported by " + + s"iceberg-rust: $reason" + } + false + } else { + true + } + }).getOrElse { + val msg = "Iceberg reflection failure: Could not verify partition types compatibility" + logError(msg) + fallbackReasons += msg + false + } + + val filterExpressionsOpt = IcebergReflection.getFilterExpressions(scanExec.scan) + + // IS NULL/NOT NULL on complex types fail because iceberg-rust's accessor creation only + // handles primitives. Nested field filters work because Iceberg Java pre-binds them to + // field IDs; element/key access filters don't push down to FileScanTasks. + val complexTypePredicatesSupported = filterExpressionsOpt + .map { filters => + if (filters.isEmpty) { + true + } else { + val readSchema = scanExec.scan.readSchema() + val complexColumns = readSchema + .filter(field => isComplexType(field.dataType)) + .map(_.name) + .toSet + + val hasComplexNullCheck = filters.asScala.exists { expr => + val exprStr = expr.toString + val isNullCheck = exprStr.contains("is_null") || exprStr.contains("not_null") + if (isNullCheck) { + complexColumns.exists { colName => + exprStr.contains(s"""ref(name="$colName")""") + } + } else { + false + } + } + + if (hasComplexNullCheck) { + fallbackReasons += "IS NULL / IS NOT NULL predicates on complex type columns " + + "(struct/array/map) are not yet supported by iceberg-rust " + + "(nested field filters like address.city = 'NYC' are supported)" + false + } else { + true + } + } + } + .getOrElse { + val msg = "Iceberg reflection failure: Could not check for complex type predicates" + logError(msg) + fallbackReasons += msg + false + } + + val transformFunctionsSupported = taskValidation.nonIdentityTransform match { + case Some(transformType) => + fallbackReasons += + s"Iceberg transform function '$transformType' in residual expression " + + "is not yet supported by iceberg-rust. " + + "Only identity transforms are supported." + false + case None => true + } + + val deleteFileTypesSupported = { + var hasUnsupportedDeletes = false + try { + if (!taskValidation.deleteFiles.isEmpty) { + taskValidation.deleteFiles.asScala.foreach { deleteFile => + val equalityFieldIds = IcebergReflection.getEqualityFieldIds(deleteFile) + if (!equalityFieldIds.isEmpty) { + equalityFieldIds.asScala.foreach { fieldId => + val fieldInfo = + IcebergReflection.getFieldInfo(metadata.scanSchema, fieldId.asInstanceOf[Int]) + fieldInfo match { + case Some((fieldName, fieldType)) => + if (fieldType.contains("struct")) { + hasUnsupportedDeletes = true + fallbackReasons += + s"Equality delete on unsupported column type '$fieldName' " + + s"($fieldType) is not yet supported by iceberg-rust. " + + "Struct types in equality deletes " + + "require datum conversion support that is not yet implemented." + } + case None => + } + } + } + } + } + } catch { + case e: Exception => + hasUnsupportedDeletes = true + fallbackReasons += "Iceberg reflection failure: Could not verify delete file " + + s"types for safety: ${e.getMessage}" + } + !hasUnsupportedDeletes + } + + // CometIcebergNativeScanExec only supports InSubqueryExec for DPP. SPARK-46946 changed + // SubqueryAdaptiveBroadcastExec to indices: Seq[Int] as preparation for Null Safe Equality + // DPP; today indices is always length 1 but future versions may introduce multi-index DPP. + val dppSubqueriesSupported = { + val unsupportedSubqueries = scanExec.runtimeFilters.collect { + case DynamicPruningExpression(e) if !e.isInstanceOf[InSubqueryExec] => + e.getClass.getSimpleName + } + val multiIndexDpp = scanExec.runtimeFilters.exists { + case DynamicPruningExpression(e: InSubqueryExec) => + e.plan match { + case sab: SubqueryAdaptiveBroadcastExec => + getSubqueryBroadcastIndices(sab).length > 1 + case _ => false + } + case _ => false + } + if (unsupportedSubqueries.nonEmpty) { + fallbackReasons += + s"Unsupported DPP subquery types: ${unsupportedSubqueries.mkString(", ")}. " + + "CometIcebergNativeScanExec only supports InSubqueryExec for DPP" + false + } else if (multiIndexDpp) { + fallbackReasons += + "Multi-index DPP (indices.length > 1) is not yet supported. " + + "See SPARK-46946 for context." + false + } else { + true + } + } + + if (schemaSupported && fileIOCompatible && formatVersionSupported && + taskValidation.allParquet && allSupportedFilesystems && partitionTypesSupported && + complexTypePredicatesSupported && transformFunctionsSupported && + deleteFileTypesSupported && dppSubqueriesSupported) { + V2ScanClassification.IcebergConvertible(metadata) + } else { + V2ScanClassification.NotConvertible(fallbackReasons.toSet) + } + } + + /** + * Any expression on the scan that references an `Attribute` flagged as a metadata column + * (`isMetadataCol`). Mirrors `CometScanRule.hasMetadataCol`. Iceberg surfaces row-level columns + * `_file`, `_pos`, `_deleted`, `_spec_id`, `_partition`, `_change_type`, `_change_ordinal`, + * `_commit_snapshot_id` this way; the native iceberg scan does not populate them. + */ + private def hasMetadataCol(scanExec: BatchScanExec): Boolean = { + scanExec.expressions.exists(_.exists { + case a: Attribute => a.isMetadataCol + case _ => false + }) + } + + /** + * Iceberg metadata table scans (e.g. `SELECT * FROM t.snapshots`) still present as + * `SparkBatchQueryScan` because `SparkTable.newScanBuilder` is shared between regular and + * metadata tables. Mirrors `CometScanRule.isIcebergMetadataTable`: suffix list copied from + * https://iceberg.apache.org/docs/latest/spark-queries/#inspecting-tables. + */ + private def isIcebergMetadataTable(scanExec: BatchScanExec): Boolean = { + val metadataTableSuffix = Set( + "history", + "metadata_log_entries", + "snapshots", + "entries", + "files", + "manifests", + "partitions", + "position_deletes", + "all_data_files", + "all_delete_files", + "all_entries", + "all_manifests") + metadataTableSuffix.exists(suffix => scanExec.table.name().endsWith(suffix)) + } +} diff --git a/spark/src/main/scala/org/apache/comet/planner/phases/NormalizePrePass.scala b/spark/src/main/scala/org/apache/comet/planner/phases/NormalizePrePass.scala new file mode 100644 index 0000000000..24ae0dc4a0 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/planner/phases/NormalizePrePass.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.planner.phases + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Divide, DoubleLiteral, EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan, GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, NamedExpression, Remainder} +import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero +import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} +import org.apache.spark.sql.types.{DoubleType, FloatType} + +/** + * Pre-pass: normalize NaN and signed zero for floating-point comparisons in ProjectExec and + * FilterExec. Spark already normalizes these via `NormalizeFloatingNumbers` for most cases, but + * skips comparison operators because Spark's comparison paths (`SQLOrderingUtil.compareFloats`) + * handle them specially. arrow-rs does not, so Comet's native execution needs the normalization + * wrapped around comparison operands. + * + * Logic copied verbatim from CometExecRule.normalizePlan / normalize / normalizeNaNAndZero so + * CometPlanner can run the rewrite without pulling in the whole old rule. When CometExecRule is + * deleted, the original copy goes with it. + */ +object NormalizePrePass extends Logging { + + def apply(plan: SparkPlan): SparkPlan = { + var rewrites = 0 + val out = plan.transformUp { + case p: ProjectExec => + val newProjectList = p.projectList.map(normalize(_).asInstanceOf[NamedExpression]) + if (newProjectList != p.projectList) rewrites += 1 + ProjectExec(newProjectList, p.child) + case f: FilterExec => + val newCondition = normalize(f.condition) + if (newCondition ne f.condition) rewrites += 1 + FilterExec(newCondition, f.child) + } + if (rewrites > 0) logDebug(s"NormalizePrePass: rewrites=$rewrites") + out + } + + private def normalize(expr: Expression): Expression = { + expr.transformUp { + case EqualTo(left, right) => + EqualTo(normalizeNaNAndZero(left), normalizeNaNAndZero(right)) + case EqualNullSafe(left, right) => + EqualNullSafe(normalizeNaNAndZero(left), normalizeNaNAndZero(right)) + case GreaterThan(left, right) => + GreaterThan(normalizeNaNAndZero(left), normalizeNaNAndZero(right)) + case GreaterThanOrEqual(left, right) => + GreaterThanOrEqual(normalizeNaNAndZero(left), normalizeNaNAndZero(right)) + case LessThan(left, right) => + LessThan(normalizeNaNAndZero(left), normalizeNaNAndZero(right)) + case LessThanOrEqual(left, right) => + LessThanOrEqual(normalizeNaNAndZero(left), normalizeNaNAndZero(right)) + case Divide(left, right, evalMode) => + Divide(left, normalizeNaNAndZero(right), evalMode) + case Remainder(left, right, evalMode) => + Remainder(left, normalizeNaNAndZero(right), evalMode) + } + } + + private def normalizeNaNAndZero(expr: Expression): Expression = { + expr match { + case _: KnownFloatingPointNormalized => expr + case FloatLiteral(f) if !f.equals(-0.0f) => expr + case DoubleLiteral(d) if !d.equals(-0.0d) => expr + case _ => + expr.dataType match { + case _: FloatType | _: DoubleType => + KnownFloatingPointNormalized(NormalizeNaNAndZero(expr)) + case _ => expr + } + } + } +} diff --git a/spark/src/main/scala/org/apache/comet/planner/phases/Phase1LikelyComet.scala b/spark/src/main/scala/org/apache/comet/planner/phases/Phase1LikelyComet.scala new file mode 100644 index 0000000000..82f5e2ae6d --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/planner/phases/Phase1LikelyComet.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.planner.phases + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometNativeExec} +import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec +import org.apache.spark.sql.execution.{FileSourceScanExec, LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, ShuffleQueryStageExec} +import org.apache.spark.sql.execution.command.ExecutedCommandExec +import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, V2CommandExec} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec} +import org.apache.spark.sql.internal.SQLConf + +import org.apache.comet.CometConf +import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.planner.tags.CometTags +import org.apache.comet.rules.CometExecRule +import org.apache.comet.serde.{CometOperatorSerde, Compatible, Incompatible, Unsupported} + +/** + * Phase 1: predict whether each node's serde would support it *in isolation*, ignoring child + * gating. The resulting `LIKELY_COMET` tag is read by Phase 2 to make demand-aware decisions + * (e.g. shuffle conversion pays off only if at least one side is likelyComet). + * + * "In isolation" means: config enabled, own structural / expression / type checks pass, but no + * consideration of whether children have already been marked convertible. This breaks the + * circularity that otherwise requires a retry pass (as in the old CometExecRule broadcast + * handling). + * + * The predicate currently wraps `CometOperatorSerde.getSupportLevel` and the `enabledConfig` + * gate, with `Incompatible` treated per `COMET_EXPR_ALLOW_INCOMPATIBLE_OPERATORS` heuristics. + * + * This phase only mutates tag state on nodes. It does not change the plan tree shape. + */ +object Phase1LikelyComet extends Logging { + + def apply(plan: SparkPlan, conf: SQLConf): SparkPlan = { + var total = 0 + var likely = 0 + plan.foreach { node => + val verdict = isLikelyComet(node, conf) + node.setTagValue(CometTags.LIKELY_COMET, verdict) + total += 1 + if (verdict) likely += 1 + } + logDebug(s"Phase1: total=$total likely=$likely") + plan + } + + /** + * Returns whether `node` would be LIKELY_COMET under the current configuration. Exposed so + * other planner components (e.g. `BroadcastConsumerIndex`) can reason about a hypothetical + * node's eligibility without walking the whole plan. + */ + def isLikelyComet(node: SparkPlan, conf: SQLConf): Boolean = node match { + // Never-convertible control plan nodes. + case _: AdaptiveSparkPlanExec | _: ExecutedCommandExec | _: V2CommandExec => false + + // CometPlanner supports native_datafusion for V1 Parquet and native Iceberg / CSV for V2. + // SCAN_AUTO is treated as native_datafusion. SCAN_NATIVE_ICEBERG_COMPAT is not predicted + // LIKELY_COMET here, so those scans fall back to plain Spark; for that mode, run with + // COMET_USE_PLANNER=false to use the legacy rule path that still emits CometScanExec. + // Phase 3 emission re-applies the full file-format / schema / encryption gates. + case _: FileSourceScanExec => + val impl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) + CometConf.COMET_NATIVE_SCAN_ENABLED.get(conf) && + (impl == CometConf.SCAN_NATIVE_DATAFUSION || impl == CometConf.SCAN_AUTO) + case _: BatchScanExec => + CometConf.COMET_NATIVE_SCAN_ENABLED.get(conf) + + case _: ShuffleExchangeExec => + // Optimistic: Phase 1 can't evaluate the shuffle serde's getSupportLevel accurately + // because `shuffleSupported` checks `isCometPlan(child)` and children haven't been + // converted yet. Emit-time guard in Phase 3 re-checks with the actual converted child. + true + + case _: BroadcastExchangeExec => + // Same as shuffle. Broadcast's convertibility also depends on child type / state after + // conversion. Phase 3 re-checks at emit time. + true + + // AQE stage re-entry: a prior CometPlanner pass converted an exchange, AQE materialized it + // and wrapped it in a query stage. Phase 3 re-emits the stage itself as a Comet-compatible + // node via `CometExchangeSink` so the parent's protobuf wiring sees it as native. + case ShuffleQueryStageExec(_, _: CometShuffleExchangeExec, _) => true + case ShuffleQueryStageExec(_, ReusedExchangeExec(_, _: CometShuffleExchangeExec), _) => true + case BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _) => true + case BroadcastQueryStageExec(_, ReusedExchangeExec(_, _: CometBroadcastExchangeExec), _) => + true + + // An already-emitted CometNativeExec (from a prior AQE pass) stays convertible so parents + // see it as a native-compatible child. + case _: CometNativeExec => true + + // Generic exec operators dispatched through the serde map. + case op => + CometExecRule.allExecs.get(op.getClass) match { + case Some(serde) => + predictFromSerde(op, serde.asInstanceOf[CometOperatorSerde[SparkPlan]], conf) + case None => + // Fall back: a leaf we don't recognize can't convert; a non-leaf we don't recognize + // might still act as a passthrough in Phase 2 but is not itself LIKELY_COMET. + op match { + case _: LeafExecNode => false + case _ => false + } + } + } + + private def predictFromSerde( + op: SparkPlan, + serde: CometOperatorSerde[SparkPlan], + conf: SQLConf): Boolean = { + val opName = op.getClass.getSimpleName + if (!serde.enabledConfig.forall(_.get(conf))) { + val key = serde.enabledConfig.map(_.key).getOrElse("") + logDebug(s"Phase1: serde disabled by config node=$opName config=$key") + // Attach EXTENSION_INFO so the test harness's `getFallbackReasons` can surface this as + // the visible explain reason. Mirrors legacy CometExecRule.isOperatorEnabled. + withInfo( + op, + s"Native support for operator $opName is disabled. Set $key=true to enable it.") + return false + } + serde.getSupportLevel(op) match { + case u: Unsupported => + logDebug(s"Phase1: serde Unsupported node=$opName reason=$u") + withInfo(op, u.notes.getOrElse("")) + false + case _: Compatible => true + case i: Incompatible => + val allow = CometConf.isOperatorAllowIncompat(opName) + logDebug(s"Phase1: serde Incompatible node=$opName allow=$allow reason=$i") + if (allow) { + true + } else { + val incompatConf = CometConf.getOperatorAllowIncompatConfigKey(opName) + val optionalNotes = i.notes.map(str => s" ($str)").getOrElse("") + withInfo( + op, + s"$opName is not fully compatible with Spark$optionalNotes. " + + s"To enable it anyway, set $incompatConf=true. ${CometConf.COMPAT_GUIDE}.") + false + } + } + } +} diff --git a/spark/src/main/scala/org/apache/comet/planner/phases/Phase2Decision.scala b/spark/src/main/scala/org/apache/comet/planner/phases/Phase2Decision.scala new file mode 100644 index 0000000000..94fab4b9d1 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/planner/phases/Phase2Decision.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.planner.phases + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.comet.CometSparkToColumnarExec +import org.apache.spark.sql.execution.{FileSourceScanExec, LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} + +import org.apache.comet.CometConf +import org.apache.comet.planner.PlanningContext +import org.apache.comet.planner.gates.{S2CGate, V1ScanClassification, V1ScanGate, V2ScanClassification, V2ScanClassifier} +import org.apache.comet.planner.tags.{CometTags, PlannerDecision} +import org.apache.comet.planner.tags.PlannerDecision.{Convert, ConvertS2C, Fallback, Passthrough} + +/** + * Phase 2: decide per-node whether to convert, pass through, or fall back, using the + * `LIKELY_COMET` tags Phase 1 attached. Demand-aware rules catch cases where conversion adds + * overhead without a Comet consumer (PR #4010's columnar-shuffle-between-JVM-aggregates pattern, + * broadcast-without-Comet-consumer, spark-to-columnar-without-consumer). + * + * Walk is top-down so the parent's LIKELY_COMET is known when we visit each child. Per-node + * rules: + * + * - V1 scan: V1ScanGate decides; Convert on success, Fallback(reasons) with reasons extracted + * from the gate. + * - V2 scan: V2ScanClassifier decides; Convert on success (Iceberg metadata stashed on tag), + * Fallback(reasons) otherwise. + * - shuffle: convert iff parent LIKELY_COMET or any child LIKELY_COMET. + * - broadcast: convert iff parent LIKELY_COMET (parent is typically a BHJ; output format + * incompatible with Spark broadcast). + * - spark-to-columnar leaf: convert iff parent LIKELY_COMET. + * - generic exec: convert iff LIKELY_COMET and every child LIKELY_COMET. + * - otherwise: Passthrough (children may still convert) or Fallback (with reasons). + * + * This phase only mutates tag state. Plan shape is untouched. The emitter phase rewrites. + */ +object Phase2Decision extends Logging { + + def apply(plan: SparkPlan, ctx: PlanningContext): SparkPlan = { + visit(plan, parentLikely = false, ctx) + plan + } + + private def visit(node: SparkPlan, parentLikely: Boolean, ctx: PlanningContext): Unit = { + val selfLikely = likely(node) + val decision = decide(node, parentLikely, selfLikely, ctx) + node.setTagValue(CometTags.DECISION, decision) + assert( + selfLikely == node.getTagValue(CometTags.LIKELY_COMET).getOrElse(false), + s"LIKELY_COMET tag out of sync with read value for node=$node") + node.children.foreach(visit(_, selfLikely, ctx)) + } + + private def likely(node: SparkPlan): Boolean = + node.getTagValue(CometTags.LIKELY_COMET).getOrElse(false) + + private def decide( + node: SparkPlan, + parentLikely: Boolean, + selfLikely: Boolean, + ctx: PlanningContext): PlannerDecision = node match { + case scan: FileSourceScanExec if selfLikely => + V1ScanGate.classify(scan, ctx.session, ctx.conf, ctx.hasInputFileExpressions) match { + case V1ScanClassification.Convertible => Convert + case V1ScanClassification.NotConvertible(reasons) => + logDebug(s"Phase2: V1 gate rejected scan=${scan.id} reasons=$reasons") + s2cOrFallback(scan, parentLikely, ctx, reasons) + } + + case _: FileSourceScanExec => + s2cOrFallback(node, parentLikely, ctx, Set.empty) + + case scan: BatchScanExec if selfLikely => + V2ScanClassifier.classify(scan, ctx.conf) match { + case V2ScanClassification.IcebergConvertible(metadata) => + scan.setTagValue(CometTags.ICEBERG_METADATA, metadata) + Convert + case V2ScanClassification.CsvConvertible => + Convert + case V2ScanClassification.NotConvertible(reasons) => + logDebug(s"Phase2: V2 classify=NotConvertible scan=${scan.id} reasons=$reasons") + s2cOrFallback(scan, parentLikely, ctx, reasons) + } + + case _: BatchScanExec => + s2cOrFallback(node, parentLikely, ctx, Set.empty) + + case s: ShuffleExchangeExec => + val childLikely = s.children.exists(likely) + if (selfLikely && (parentLikely || childLikely)) Convert else Passthrough + + case b: BroadcastExchangeExec => + val consumed = ctx.broadcastConsumers.isConsumedByCometCandidate(b) + val forced = CometConf.COMET_EXEC_BROADCAST_FORCE_ENABLED.get(ctx.conf) + if (selfLikely && (parentLikely || consumed || forced)) Convert else Passthrough + + case _: CometSparkToColumnarExec => + if (parentLikely) Convert else Passthrough + + case op if op.isInstanceOf[LeafExecNode] => + if (selfLikely) Convert + else s2cOrFallback(op, parentLikely, ctx, Set.empty) + + case op => + // Treat S2C-eligible leaves as effectively convertible for the purpose of parent + // decisions: Phase 3 will wrap them in CometSparkToColumnarExec when the parent is + // LIKELY_COMET, so from the parent's perspective they'll present a Comet output. + // Without this, a Comet-capable parent (e.g. HashAggregate) would fall back just + // because its BatchScan child isn't natively convertible, even though S2C would + // bridge the gap. Old CometExecRule side-stepped this by running bottom-up and + // wrapping S2C leaves before visiting their parents. + def effectivelyLikely(c: SparkPlan): Boolean = + likely(c) || (c.isInstanceOf[LeafExecNode] && S2CGate.shouldApply(c, ctx.conf)) + val allChildrenLikely = op.children.nonEmpty && op.children.forall(effectivelyLikely) + if (selfLikely && allChildrenLikely) Convert + else if (op.children.exists(effectivelyLikely)) Passthrough + else Fallback(Set.empty) + } + + private def s2cOrFallback( + op: SparkPlan, + parentLikely: Boolean, + ctx: PlanningContext, + reasons: Set[String]): PlannerDecision = { + if (parentLikely && S2CGate.shouldApply(op, ctx.conf)) { + ConvertS2C + } else { + Fallback(reasons) + } + } +} diff --git a/spark/src/main/scala/org/apache/comet/planner/phases/Phase3Emit.scala b/spark/src/main/scala/org/apache/comet/planner/phases/Phase3Emit.scala new file mode 100644 index 0000000000..334dd371ec --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/planner/phases/Phase3Emit.scala @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.planner.phases + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.csv.CSVOptions +import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometCsvNativeScanExec, CometIcebergNativeScanExec, CometNativeExec, CometScanWrapper, CometSinkPlaceHolder, CometSparkToColumnarExec, SerializedPlan} +import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometNativeShuffle, CometShuffleExchangeExec} +import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, ShuffleQueryStageExec} +import org.apache.spark.sql.execution.datasources.FilePartition +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec} + +import org.apache.comet.objectstore.NativeConfig +import org.apache.comet.planner.tags.CometTags +import org.apache.comet.planner.tags.PlannerDecision.{Convert, ConvertS2C} +import org.apache.comet.rules.CometExecRule +import org.apache.comet.serde.{CometOperatorSerde, Compatible, OperatorOuterClass} +import org.apache.comet.serde.operator.{partition2Proto, schema2Proto, CometExchangeSink, CometNativeScan} + +/** + * Phase 3: bottom-up emit. For each node tagged `DECISION=Convert`, call the matching serde's + * `convert` to build the protobuf, then `createExec` to produce the CometNativeExec. Wires + * children via the protobuf children list. Sets the `COMET_CONVERTED` tag so AQE re-entries can + * short-circuit. + * + * Handles: + * - V1 FileSourceScanExec (native_datafusion path) via CometNativeScan serde directly. + * - V2 BatchScanExec: Iceberg (metadata pre-stashed on ICEBERG_METADATA tag by Phase 2) and CSV + * (scan.scan is a CSVScan). Neither goes through a CometBatchScanExec wrapper. Phase 3 builds + * protobuf and the final CometIcebergNativeScanExec / CometCsvNativeScanExec directly from + * the raw BatchScanExec. + * - Spark-to-columnar leaves (DECISION=ConvertS2C): wrap via CometSparkToColumnarExec serde. + * - ShuffleExchangeExec via CometShuffleExchangeExec serde. + * - BroadcastExchangeExec via CometBroadcastExchangeExec serde (when children native). + * - Generic exec operators whose children are already CometNativeExec, via the allExecs map. + */ +case class Phase3Emit(session: SparkSession) extends Logging { + + def apply(plan: SparkPlan): SparkPlan = { + plan.transformUp { + case op if decidedS2C(op) => + val out = emitS2C(op).getOrElse(op) + logEmit("S2C", op, out) + out + + case scan: FileSourceScanExec if decidedToConvert(scan) => + val out = emitV1NativeScan(scan).getOrElse(scan) + logEmit("V1Native", scan, out) + out + + case scan: BatchScanExec if decidedToConvert(scan) => + val out = emitV2Scan(scan).getOrElse(scan) + logEmit("V2Native", scan, out) + out + + case s: ShuffleExchangeExec if decidedToConvert(s) => + val out = emitShuffle(s).getOrElse(s) + logEmit("Shuffle", s, out) + out + + // AQE stage already wrapping a Comet exchange from a prior pass. Wrap as a Comet sink so + // the parent's protobuf wiring treats this stage as native-compatible. + case s: ShuffleQueryStageExec if decidedToConvert(s) && isCometShuffleStage(s) => + val out = emitExchangeSink(s).getOrElse(s) + logEmit("ShuffleStage", s, out) + out + + case b: BroadcastQueryStageExec if decidedToConvert(b) && isCometBroadcastStage(b) => + val out = emitExchangeSink(b).getOrElse(b) + logEmit("BroadcastStage", b, out) + out + + case b: BroadcastExchangeExec if decidedToConvert(b) && allChildrenNative(b) => + val out = emitBroadcast(b).getOrElse(b) + logEmit("Broadcast", b, out) + out + + case op if decidedToConvert(op) && allChildrenNative(op) => + val out = lookupSerde(op).flatMap(serde => emitGeneric(op, serde)).getOrElse(op) + logEmit("Generic", op, out) + out + + case op => + if (decidedToConvert(op)) { + logWarning( + s"Phase3: DECISION=Convert but no emitter matched node=${op.getClass.getSimpleName} " + + s"id=${op.id} allChildrenNative=${allChildrenNative(op)}") + } + op + } + } + + private def decidedToConvert(op: SparkPlan): Boolean = + op.getTagValue(CometTags.DECISION).contains(Convert) + + private def decidedS2C(op: SparkPlan): Boolean = + op.getTagValue(CometTags.DECISION).contains(ConvertS2C) + + private def isCometShuffleStage(s: ShuffleQueryStageExec): Boolean = s.plan match { + case _: CometShuffleExchangeExec => true + case ReusedExchangeExec(_, _: CometShuffleExchangeExec) => true + case _ => false + } + + private def isCometBroadcastStage(b: BroadcastQueryStageExec): Boolean = b.plan match { + case _: CometBroadcastExchangeExec => true + case ReusedExchangeExec(_, _: CometBroadcastExchangeExec) => true + case _ => false + } + + /** + * Emit a stage-wrapped Comet exchange (`ShuffleQueryStageExec` / `BroadcastQueryStageExec` + * holding a `CometShuffleExchangeExec` / `CometBroadcastExchangeExec` from a prior pass) as a + * Comet sink. Uses `CometExchangeSink` default `convert` which builds a Scan operator, and the + * returned wrapper is unwrapped in `runSerde`, leaving the stage with a `NATIVE_OP` tag. + */ + private def emitExchangeSink(op: SparkPlan): Option[SparkPlan] = + runSerde(op, CometExchangeSink, childOps = Seq.empty) + + /** + * A child counts as native-compatible for protobuf wiring if it is itself a CometNativeExec or + * carries a `NATIVE_OP` tag. The tag is set by `runSerde` on JVM-orchestrated operators + * (`CometCollectLimitExec`, `CometBroadcastExchangeExec`, `CometSparkToColumnarExec`, + * `CometUnionExec`, `CometTakeOrderedAndProjectExec`, `CometCoalesceExec`) where the serde + * previously wrapped them in a placeholder. Those wrappers no longer appear in the plan tree. + */ + private def isNativeCompatible(child: SparkPlan): Boolean = + child.isInstanceOf[CometNativeExec] || child.getTagValue(CometTags.NATIVE_OP).isDefined + + // Vacuous truth for leaf nodes (children.isEmpty): a leaf has no children to fail the predicate, + // so the generic emit path can wire it via its serde with no child nativeOps. Without this, + // leaves like LocalTableScanExec / RangeExec / InMemoryTableScanExec slip past the generic + // case and never get converted. + private def allChildrenNative(op: SparkPlan): Boolean = + op.children.forall(isNativeCompatible) + + /** + * Pulls the protobuf operator representing a child for parent wiring. Reads `nativeOp` from a + * `CometNativeExec` child, falls back to the `NATIVE_OP` tag for JVM-orchestrated children. + */ + private def nativeOpOf(child: SparkPlan): OperatorOuterClass.Operator = child match { + case n: CometNativeExec => n.nativeOp + case other => + other + .getTagValue(CometTags.NATIVE_OP) + .getOrElse( + throw new IllegalStateException( + s"Child not native-compatible. class=${other.getClass.getSimpleName}")) + } + + private def lookupSerde(op: SparkPlan): Option[CometOperatorSerde[SparkPlan]] = + CometExecRule.allExecs + .get(op.getClass) + .map(_.asInstanceOf[CometOperatorSerde[SparkPlan]]) + + private def emitGeneric( + op: SparkPlan, + serde: CometOperatorSerde[SparkPlan]): Option[SparkPlan] = { + assert( + allChildrenNative(op), + s"emitGeneric invoked with non-native children node=${op.getClass.getSimpleName} id=${op.id}") + runSerde(op, serde, op.children.map(nativeOpOf)) + } + + /** + * Emit a V1 native scan directly from the FileSourceScanExec, no intermediate CometScanExec + * wrapping. The serde operates on FileSourceScanExec after the type-param refactor. + */ + private def emitV1NativeScan(scanExec: FileSourceScanExec): Option[SparkPlan] = { + assert( + decidedToConvert(scanExec), + s"emitV1NativeScan invoked without DECISION=Convert scan=${scanExec.id}") + val builder = OperatorOuterClass.Operator.newBuilder().setPlanId(scanExec.id) + val nativeOpOpt = CometNativeScan.convert(scanExec, builder) + if (nativeOpOpt.isEmpty) { + logDebug(s"Phase3: V1 serde.convert returned None scan=${scanExec.id}") + } + nativeOpOpt.map { nativeOp => + val exec = CometNativeScan.createExec(nativeOp, scanExec) + scanExec.logicalLink.foreach(exec.setLogicalLink) + exec.setTagValue(CometTags.COMET_CONVERTED, ()) + exec + } + } + + /** + * Emit a CometSparkToColumnarExec-wrapped node for a leaf that Phase 2 decided should bridge + * row-at-a-time Spark data into a Comet-consuming parent. The existing `CometSink` default + * `convert` builds a Scan operator with the node's schema. `createExec` wraps the leaf in + * `CometScanWrapper(nativeOp, CometSparkToColumnarExec(op))`. The CometScanWrapper indirection + * goes away when the old rule is deleted. + */ + private def emitS2C(op: SparkPlan): Option[SparkPlan] = { + assert(decidedS2C(op), s"emitS2C invoked without DECISION=ConvertS2C node=${op.id}") + val builder = OperatorOuterClass.Operator.newBuilder().setPlanId(op.id) + val nativeOpOpt = CometSparkToColumnarExec.convert(op, builder) + if (nativeOpOpt.isEmpty) { + logDebug(s"Phase3: S2C serde.convert returned None node=${op.id}") + } + nativeOpOpt.map { nativeOp => + val raw = CometSparkToColumnarExec.createExec(nativeOp, op) + val exec = raw match { + case CometScanWrapper(_, inner) => + inner.setTagValue(CometTags.NATIVE_OP, nativeOp) + inner + case direct => direct + } + op.logicalLink.foreach(exec.setLogicalLink) + exec.setTagValue(CometTags.COMET_CONVERTED, ()) + exec + } + } + + /** + * Emit a V2 native scan directly from the BatchScanExec. Dispatch: if Phase 2 stashed iceberg + * metadata on the `ICEBERG_METADATA` tag, emit a CometIcebergNativeScanExec. Otherwise if the + * scan is a CSVScan, emit a CometCsvNativeScanExec. Any other scan falls through. Neither path + * goes through a CometBatchScanExec wrapper. + */ + private def emitV2Scan(scanExec: BatchScanExec): Option[SparkPlan] = { + assert( + decidedToConvert(scanExec), + s"emitV2Scan invoked without DECISION=Convert scan=${scanExec.id}") + scanExec.getTagValue(CometTags.ICEBERG_METADATA) match { + case Some(metadata) => + val builder = OperatorOuterClass.Operator.newBuilder().setPlanId(scanExec.id) + val icebergScanBuilder = OperatorOuterClass.IcebergScan.newBuilder() + val commonBuilder = OperatorOuterClass.IcebergScanCommon.newBuilder() + // Only metadata_location is needed at planning time. catalog_properties, + // required_schema, pools and per-partition data are populated by + // CometIcebergNativeScan.serializePartitions at execution time after DPP resolves. + commonBuilder.setMetadataLocation(metadata.metadataLocation) + icebergScanBuilder.setCommon(commonBuilder.build()) + builder.clearChildren() + val nativeOp = builder.setIcebergScan(icebergScanBuilder).build() + val exec = CometIcebergNativeScanExec( + nativeOp, + scanExec, + session, + metadata.metadataLocation, + metadata) + exec.setTagValue(CometTags.COMET_CONVERTED, ()) + // TreeNode._tags is not @transient, so a lingering metadata tag on the raw + // BatchScanExec could serialize if anything walked a mid-planner plan. Drop it so the + // heavy Iceberg fields never reach a serialized form of the original BatchScanExec. + scanExec.unsetTagValue(CometTags.ICEBERG_METADATA) + assert( + scanExec.getTagValue(CometTags.ICEBERG_METADATA).isEmpty, + s"ICEBERG_METADATA tag still present after unset scan=${scanExec.id}") + Some(exec) + + case None => + scanExec.scan match { + case csvScan: CSVScan => + Some(emitCsvScan(scanExec, csvScan)) + case other => + logWarning( + s"Phase3: V2 decided Convert but no emit path scan=${scanExec.id} " + + s"scanClass=${other.getClass.getName}") + None + } + } + } + + private def emitCsvScan(scanExec: BatchScanExec, csvScan: CSVScan): SparkPlan = { + val sessionState = session.sessionState + val options = { + val columnPruning = sessionState.conf.csvColumnPruning + val timeZone = sessionState.conf.sessionLocalTimeZone + new CSVOptions(csvScan.options.asScala.toMap, columnPruning, timeZone) + } + val filePartitions = scanExec.inputPartitions.map(_.asInstanceOf[FilePartition]) + val csvOptionsProto = csvOptions2Proto(options) + val dataSchemaProto = schema2Proto(csvScan.dataSchema.fields) + val readSchemaFieldNames = csvScan.readDataSchema.fieldNames + val projectionVector = csvScan.dataSchema.fields.zipWithIndex + .filter { case (field, _) => readSchemaFieldNames.contains(field.name) } + .map(_._2.asInstanceOf[Integer]) + val partitionSchemaProto = schema2Proto(csvScan.readPartitionSchema.fields) + val partitionsProto = filePartitions.map(partition2Proto(_, csvScan.readPartitionSchema)) + val objectStoreOptions = filePartitions.headOption + .flatMap { partitionFile => + val hadoopConf = sessionState + .newHadoopConfWithOptions(session.sparkContext.getConf.getAll.toMap) + partitionFile.files.headOption + .map(file => NativeConfig.extractObjectStoreOptions(hadoopConf, file.pathUri)) + } + .getOrElse(Map.empty) + + val csvScanBuilder = OperatorOuterClass.CsvScan.newBuilder() + csvScanBuilder.putAllObjectStoreOptions(objectStoreOptions.asJava) + csvScanBuilder.setCsvOptions(csvOptionsProto) + csvScanBuilder.addAllFilePartitions(partitionsProto.asJava) + csvScanBuilder.addAllDataSchema(dataSchemaProto.toIterable.asJava) + csvScanBuilder.addAllProjectionVector(projectionVector.toIterable.asJava) + csvScanBuilder.addAllPartitionSchema(partitionSchemaProto.toIterable.asJava) + + val builder = OperatorOuterClass.Operator.newBuilder().setPlanId(scanExec.id) + val nativeOp = builder.setCsvScan(csvScanBuilder).build() + + val exec = + CometCsvNativeScanExec(nativeOp, scanExec.output, scanExec, SerializedPlan(None)) + scanExec.logicalLink.foreach(exec.setLogicalLink) + exec.setTagValue(CometTags.COMET_CONVERTED, ()) + exec + } + + private def csvOptions2Proto(options: CSVOptions): OperatorOuterClass.CsvOptions = { + val csvOptionsBuilder = OperatorOuterClass.CsvOptions.newBuilder() + csvOptionsBuilder.setDelimiter(options.delimiter) + csvOptionsBuilder.setHasHeader(options.headerFlag) + csvOptionsBuilder.setQuote(options.quote.toString) + csvOptionsBuilder.setEscape(options.escape.toString) + csvOptionsBuilder.setTerminator(options.lineSeparator.getOrElse("\n")) + csvOptionsBuilder.setTruncatedRows(options.multiLine) + if (options.isCommentSet) { + csvOptionsBuilder.setComment(options.comment.toString) + } + csvOptionsBuilder.build() + } + + /** + * Emit a shuffle without going through `CometShuffleExchangeExec.createExec`. That serde guards + * the native-shuffle path with `op.children.forall(_.isInstanceOf[CometNativeExec])`, which is + * too strict after we unwrapped placeholder-based children to carry `NATIVE_OP` tags instead. + * Phase 3 takes the same branches the serde would (native vs columnar) and constructs + * `CometShuffleExchangeExec` directly. Protobuf comes from `serde.convert`. + */ + private def emitShuffle(s: ShuffleExchangeExec): Option[SparkPlan] = { + val supportOpt = CometShuffleExchangeExec.shuffleSupported(s) + if (supportOpt.isEmpty) { + logDebug(s"Phase3: shuffle serde rejected at emit time id=${s.id}") + return None + } + val shuffleType = supportOpt.get + val childOps = s.children.flatMap { c => + if (isNativeCompatible(c)) Some(nativeOpOf(c)) else None + } + if (childOps.size != s.children.size) { + logDebug( + s"Phase3: shuffle has mixed Comet / Spark children id=${s.id} " + + s"cometChildren=${childOps.size} totalChildren=${s.children.size}") + } + val builder = OperatorOuterClass.Operator.newBuilder().setPlanId(s.id) + childOps.foreach(builder.addChildren) + CometShuffleExchangeExec.convert(s, builder, childOps: _*).map { nativeOp => + val exec = shuffleType match { + case CometNativeShuffle => CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle) + case CometColumnarShuffle => + CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle) + } + exec.setTagValue(CometTags.NATIVE_OP, nativeOp) + s.logicalLink.foreach(exec.setLogicalLink) + exec.setTagValue(CometTags.COMET_CONVERTED, ()) + exec + } + } + + private def emitBroadcast(b: BroadcastExchangeExec): Option[SparkPlan] = { + // Same emit-time support check as shuffle. The broadcast serde's getSupportLevel may + // reject based on child types / structure that weren't decidable at Phase 1. + if (!CometBroadcastExchangeExec.getSupportLevel(b).isInstanceOf[Compatible]) { + logDebug( + s"Phase3: broadcast serde rejected at emit time id=${b.id} " + + s"support=${CometBroadcastExchangeExec.getSupportLevel(b).getClass.getSimpleName}") + return None + } + assert(allChildrenNative(b), s"emitBroadcast invoked with non-native children id=${b.id}") + val childOps = b.children.map(nativeOpOf) + runSerde(b, CometBroadcastExchangeExec, childOps) + } + + /** + * Run a serde and, if it returned a placeholder wrapper (`CometSinkPlaceHolder` / + * `CometScanWrapper`), unwrap to the inner operator and attach the protobuf as a `NATIVE_OP` + * tag. This is the tag-based replacement for the old placeholder plan-tree wrappers: Phase 3 + * emits the JVM-orchestrated operators directly without a stripping post-pass. + * + * The placeholder classes still exist because the legacy rule path (`COMET_USE_PLANNER=false`) + * produces them. They go away when the legacy rule is deleted. + */ + private def runSerde[T <: SparkPlan]( + op: T, + serde: CometOperatorSerde[T], + childOps: Seq[OperatorOuterClass.Operator]): Option[SparkPlan] = { + val builder = OperatorOuterClass.Operator.newBuilder().setPlanId(op.id) + childOps.foreach(builder.addChildren) + val nativeOpOpt = serde.convert(op, builder, childOps: _*) + if (nativeOpOpt.isEmpty) { + logDebug( + s"Phase3: serde.convert returned None node=${op.getClass.getSimpleName} " + + s"id=${op.id} serde=${serde.getClass.getSimpleName}") + } + nativeOpOpt.map { nativeOp => + val raw = serde.createExec(nativeOp, op) + val exec = raw match { + case CometSinkPlaceHolder(_, _, inner) => + inner.setTagValue(CometTags.NATIVE_OP, nativeOp) + inner + case CometScanWrapper(_, inner) => + inner.setTagValue(CometTags.NATIVE_OP, nativeOp) + inner + case direct => direct + } + op.logicalLink.foreach(exec.setLogicalLink) + exec.setTagValue(CometTags.COMET_CONVERTED, ()) + exec + } + } + + private def logEmit(kind: String, in: SparkPlan, out: SparkPlan): Unit = { + if (log.isTraceEnabled && !(in eq out)) { + logTrace( + s"Phase3: emit kind=$kind inClass=${in.getClass.getSimpleName} " + + s"outClass=${out.getClass.getSimpleName} inId=${in.id} outId=${out.id}") + } + } +} diff --git a/spark/src/main/scala/org/apache/comet/planner/phases/SubqueryBroadcastRewrite.scala b/spark/src/main/scala/org/apache/comet/planner/phases/SubqueryBroadcastRewrite.scala new file mode 100644 index 0000000000..e3608a3f0c --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/planner/phases/SubqueryBroadcastRewrite.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.planner.phases + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometNativeColumnarToRowExec, CometNativeExec, CometSubqueryAdaptiveBroadcastExec, CometSubqueryBroadcastExec} +import org.apache.spark.sql.execution.{InSubqueryExec, SparkPlan, SubqueryAdaptiveBroadcastExec, SubqueryBroadcastExec} +import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec + +import org.apache.comet.CometSparkSessionExtensions.isSpark35Plus +import org.apache.comet.shims.ShimSubqueryBroadcast + +/** + * Post-pass: rewrite DPP subquery broadcasts so that exchange reuse and AQE DPP work correctly + * after Comet conversion. Runs once over the emitted plan, not at every node of the main + * transform like the old CometExecRule does. + * + * Two cases: + * + * - Non-AQE DPP: a BroadcastExchangeExec on the join side has been converted to + * CometBroadcastExchangeExec, but the DPP subquery still references the original + * BroadcastExchangeExec. ReuseExchangeAndSubquery (which runs after Comet rules) can't match + * them because their types differ. Replace SubqueryBroadcastExec with + * CometSubqueryBroadcastExec wrapping the converted broadcast so both sides share the same + * type. + * + * - AQE DPP (Spark 3.5+): Spark's PlanAdaptiveDynamicPruningFilters pattern-matches + * SubqueryAdaptiveBroadcastExec. When it can't find BroadcastHashJoinExec (Comet replaced it + * with CometBroadcastHashJoinExec), it replaces the DPP filter with Literal.TrueLiteral, + * disabling DPP. Wrap SABs in CometSubqueryAdaptiveBroadcastExec to hide them from Spark's + * rule. CometPlanAdaptiveDynamicPruningFilters later unwraps them with access to the + * materialized BroadcastQueryStageExec. + * + * Logic copied from CometExecRule.convertSubqueryBroadcasts so CometPlanner can run this rewrite + * without pulling in the whole old rule. Delete the original when CometExecRule is removed. + */ +object SubqueryBroadcastRewrite extends ShimSubqueryBroadcast with Logging { + + def apply(plan: SparkPlan): SparkPlan = { + var nonAqeRewrites = 0 + var aqeRewrites = 0 + val out = plan.transformAllExpressions { case inSub: InSubqueryExec => + inSub.plan match { + case sub: SubqueryBroadcastExec => + sub.child match { + case b: BroadcastExchangeExec => + val cometChild = b.child match { + case c2r: CometNativeColumnarToRowExec => c2r.child + case other => other + } + if (cometChild.isInstanceOf[CometNativeExec]) { + nonAqeRewrites += 1 + val cometBroadcast = CometBroadcastExchangeExec(b, b.output, b.mode, cometChild) + val cometSub = CometSubqueryBroadcastExec( + sub.name, + getSubqueryBroadcastExecIndices(sub), + sub.buildKeys, + cometBroadcast) + inSub.withNewPlan(cometSub) + } else { + inSub + } + case _ => inSub + } + case sab: SubqueryAdaptiveBroadcastExec if isSpark35Plus => + assert( + sab.buildKeys.nonEmpty, + s"SubqueryAdaptiveBroadcastExec '${sab.name}' has empty buildKeys") + aqeRewrites += 1 + val indices = getSubqueryBroadcastIndices(sab) + val wrapped = CometSubqueryAdaptiveBroadcastExec( + sab.name, + indices, + sab.onlyInBroadcast, + sab.buildPlan, + sab.buildKeys, + sab.child) + inSub.withNewPlan(wrapped) + case _: SubqueryAdaptiveBroadcastExec => + // Spark 3.4: no injectQueryStageOptimizerRule, leave SAB unwrapped. The + // CometSpark34AqeDppFallbackRule handles the 3.4-specific path. + inSub + case _ => inSub + } + } + if (nonAqeRewrites + aqeRewrites > 0) { + logDebug( + s"SubqueryBroadcastRewrite: nonAqeRewrites=$nonAqeRewrites aqeRewrites=$aqeRewrites") + } + out + } +} diff --git a/spark/src/main/scala/org/apache/comet/planner/tags/CometTags.scala b/spark/src/main/scala/org/apache/comet/planner/tags/CometTags.scala new file mode 100644 index 0000000000..393588f2b4 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/planner/tags/CometTags.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.planner.tags + +import org.apache.spark.sql.catalyst.trees.TreeNodeTag + +import org.apache.comet.iceberg.CometIcebergNativeScanMetadata +import org.apache.comet.serde.OperatorOuterClass + +/** + * Formal tag vocabulary used by CometPlanner to coordinate across its internal phases and across + * AQE stage re-entries. Every tag a Comet rule reads or writes should be declared here with a + * comment describing who sets it and who reads it. This replaces the ad-hoc tag strings that + * CometScanRule / CometExecRule scatter across companion objects. + */ +object CometTags { + + /** + * Set by Phase 1 (LIKELY_COMET annotator) on every node. Value `true` means "serde supports + * this op in isolation" (configs enabled, structural / expression checks pass). Ignores child + * gating so it can be computed bottom-up without the classification depending on its own + * descendants' decisions. Read by Phase 2 to judge demand-aware conversion. + */ + val LIKELY_COMET: TreeNodeTag[Boolean] = TreeNodeTag("comet.likelyComet") + + /** + * Set by Phase 2 (DECISION annotator) on every node. Encodes whether this node should be + * converted (`Convert`), kept as a Spark node with Comet-convertible children underneath + * (`Passthrough`), or fallen back to Spark entirely (`Fallback`). Read by Phase 3 emitter. + */ + val DECISION: TreeNodeTag[PlannerDecision] = TreeNodeTag("comet.decision") + + /** + * Set by Phase 3 (emitter) on every emitted CometNativeExec. Marks the subtree as already + * compiled so that re-entries (especially AQE per-stage re-planning) can skip Phase 1/2/3 via + * the top-level check in `CometPlanner.apply`. Persists across stage boundaries because + * TreeNode tags survive makeCopy. + */ + val COMET_CONVERTED: TreeNodeTag[Unit] = TreeNodeTag("comet.converted") + + /** + * Set by Phase 2 on a `BatchScanExec` that has been classified as a convertible native Iceberg + * scan. Carries the pre-extracted metadata (resolved via iceberg-java reflection + catalog + * properties) so Phase 3 can build the `CometIcebergNativeScanExec` without going through the + * `CometBatchScanExec` carrier class. Absent on non-Iceberg convertible scans (CSV) and on + * fall-back scans. + */ + val ICEBERG_METADATA: TreeNodeTag[CometIcebergNativeScanMetadata] = + TreeNodeTag("comet.icebergMetadata") + + /** + * Attached by Phase 3 to any emitted operator that is Comet-compatible but is not itself a + * `CometNativeExec` (e.g. JVM-orchestrated operators like `CometCollectLimitExec`, + * `CometBroadcastExchangeExec`, `CometSparkToColumnarExec`). Carries the protobuf `Operator` + * that a parent's serde needs to wire this node into its own protobuf tree. Replaces the old + * `CometSinkPlaceHolder` / `CometScanWrapper` nodes that previously carried the same payload as + * a plan-tree wrapper. + * + * `Phase 3` treats a child as native-compatible if it is a `CometNativeExec` OR has this tag. + */ + val NATIVE_OP: TreeNodeTag[OperatorOuterClass.Operator] = TreeNodeTag("comet.nativeOp") +} + +/** + * Outcome of Phase 2 classification for a single plan node. + */ +sealed trait PlannerDecision + +object PlannerDecision { + + /** Emit a CometNativeExec for this node in Phase 3. */ + case object Convert extends PlannerDecision + + /** + * Wrap this non-Comet leaf in CometSparkToColumnarExec to bridge row-at-a-time data into a + * Comet-consuming parent. Only used when the parent is LIKELY_COMET so demand exists for the + * columnar conversion. + */ + case object ConvertS2C extends PlannerDecision + + /** + * Keep the Spark node as-is; its children may still convert. Used for plan nodes Comet cannot + * replace directly (e.g. AdaptiveSparkPlanExec, some V2CommandExec cases). + */ + case object Passthrough extends PlannerDecision + + /** + * Node will not convert; record the reasons via `withInfo` for explain output. Any already- + * converted descendants remain; this node terminates the native subtree at its children. + */ + case class Fallback(reasons: Set[String]) extends PlannerDecision +} diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 06d926dfc0..545750b6dd 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -259,7 +259,7 @@ case class CometExecRule(session: SparkSession) def convertNode(op: SparkPlan): SparkPlan = op match { // Fully native scan for V1 case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_DATAFUSION => - convertToComet(scan, CometNativeScan).getOrElse(scan) + convertToComet(scan.wrapped, CometNativeScan).getOrElse(scan) // Fully native Iceberg scan for V2 (iceberg-rust path) // Only handle scans with native metadata; other scans fall through to isCometScan @@ -535,6 +535,11 @@ case class CometExecRule(session: SparkSession) private def _apply(plan: SparkPlan): SparkPlan = { // We shouldn't transform Spark query plan if Comet is not loaded. if (!isCometLoaded(conf)) return plan + assert( + !CometConf.COMET_USE_PLANNER.get(conf), + s"CometExecRule ran while ${CometConf.COMET_USE_PLANNER.key}=true. CometPlanner should " + + "be the sole rule on this path. Either COMET_USE_PLANNER was flipped after session " + + "creation or the legacy rule was registered by mistake.") // Comet does not support structured streaming. Fall back to Spark for any plan that // belongs to a streaming query (detected via StreamSourceAwareSparkPlan.getStream). diff --git a/spark/src/main/scala/org/apache/comet/rules/CometPlanAdaptiveDynamicPruningFilters.scala b/spark/src/main/scala/org/apache/comet/rules/CometPlanAdaptiveDynamicPruningFilters.scala index 20207ffa5f..42c6e77a98 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometPlanAdaptiveDynamicPruningFilters.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometPlanAdaptiveDynamicPruningFilters.scala @@ -46,8 +46,9 @@ import org.apache.comet.shims.{ShimPrepareExecutedPlan, ShimSubqueryBroadcast} * CometSubqueryBroadcastExec for broadcast reuse. * * Also handles the dual-filter problem: CometNativeScanExec.partitionFilters and - * CometScanExec.partitionFilters are separate InSubqueryExec instances. Both must be converted - * because CometScanExec.dynamicallySelectedPartitions evaluates its own partitionFilters. + * originalPlan.partitionFilters are separate InSubqueryExec instances. Both must be converted + * because originalPlan.inputRDD (used for FilePartition computation) evaluates its own + * partitionFilters during FileScanRDD construction. * * @see * PlanAdaptiveDynamicPruningFilters (Spark's equivalent for BroadcastHashJoinExec) @@ -66,18 +67,27 @@ case object CometPlanAdaptiveDynamicPruningFilters return plan } + var hits = 0 + // TODO(#3510): CometNativeScanExec needs special handling because its makeCopy - // loses @transient scan and expression transformations. Once makeCopy is fixed - // (or CometScanExec wrapping is removed), replace both cases with a single - // plan.transformAllExpressions call matching Spark's PlanAdaptiveDynamicPruningFilters. - plan.transformUp { + // loses expression transformations on originalPlan. Once makeCopy is fixed, + // replace both cases with a single plan.transformAllExpressions call matching + // Spark's PlanAdaptiveDynamicPruningFilters. + val out = plan.transformUp { case nativeScan: CometNativeScanExec if nativeScan.partitionFilters.exists(hasCometSAB) => - logDebug("Converting AQE DPP for CometNativeScanExec") + logDebug(s"CometPlanAdaptiveDPP: MATCH CometNativeScanExec id=${nativeScan.id}") + hits += 1 convertNativeScanDPP(nativeScan, plan) case p: SparkPlan if !p.isInstanceOf[CometNativeScanExec] && hasWrappedSAB(p) => - logDebug(s"Converting AQE DPP for non-Comet node: ${p.nodeName}") + logDebug( + s"CometPlanAdaptiveDPP: MATCH non-Comet node ${p.getClass.getSimpleName} id=${p.id}") + hits += 1 convertNonCometNodeDPP(p, plan) } + if (hits > 0) { + logDebug(s"CometPlanAdaptiveDPP: applied to plan#${plan.id} hits=$hits") + } + out } private def convertNativeScanDPP( @@ -88,16 +98,18 @@ case object CometPlanAdaptiveDynamicPruningFilters if (newOuterFilters == nativeScan.partitionFilters) return nativeScan // Dual-filter invariant: CometNativeScanExec.partitionFilters and - // CometScanExec.partitionFilters are separate InSubqueryExec instances for the - // same DPP filters. Both must be converted because - // CometScanExec.dynamicallySelectedPartitions evaluates its own filters. + // originalPlan.partitionFilters are separate InSubqueryExec instances for the same DPP + // filters. Both must be converted because originalPlan.inputRDD (called from + // serializedPartitionData) evaluates its own partitionFilters during FileScanRDD + // construction. assert( - nativeScan.scan != null, - "CometNativeScanExec with DPP filters must have a non-null CometScanExec") - val newInnerFilters = nativeScan.scan.partitionFilters.map(f => convertFilter(f, stagePlan)) - val newInnerScan = nativeScan.scan.copy(partitionFilters = newInnerFilters) + nativeScan.originalPlan != null, + "CometNativeScanExec with DPP filters must have a non-null originalPlan") + val newInnerFilters = + nativeScan.originalPlan.partitionFilters.map(f => convertFilter(f, stagePlan)) + val newOriginal = nativeScan.originalPlan.copy(partitionFilters = newInnerFilters) - nativeScan.copy(partitionFilters = newOuterFilters, scan = newInnerScan) + nativeScan.copy(partitionFilters = newOuterFilters, originalPlan = newOriginal) } private def convertFilter(filter: Expression, stagePlan: SparkPlan): Expression = { @@ -113,8 +125,8 @@ case object CometPlanAdaptiveDynamicPruningFilters /** * Extracts SAB data from an InSubqueryExec's plan. Handles both: * - CometSubqueryAdaptiveBroadcastExec (outer partitionFilters, wrapped by CometExecRule) - * - SubqueryAdaptiveBroadcastExec (inner CometScanExec.partitionFilters, never wrapped - * because CometScanExec is @transient and not part of the plan expression tree) + * - SubqueryAdaptiveBroadcastExec (inner originalPlan.partitionFilters, never wrapped because + * originalPlan is @transient and not part of the plan expression tree) * * Either form may itself be wrapped in a `ReusedSubqueryExec` when Spark's * `ReuseAdaptiveSubquery` (which runs before our rule) dedupes identical DPP subqueries, e.g. @@ -423,9 +435,8 @@ case object CometPlanAdaptiveDynamicPruningFilters /** * Checks if an expression contains an SAB variant (wrapped or unwrapped). The outer * CometNativeScanExec.partitionFilters has CometSubqueryAdaptiveBroadcastExec (wrapped by - * CometExecRule). The inner CometScanExec.partitionFilters may have the original - * SubqueryAdaptiveBroadcastExec (unwrapped, because CometScanExec is - * @transient). + * CometExecRule). The inner originalPlan.partitionFilters may have the original + * SubqueryAdaptiveBroadcastExec (unwrapped, because originalPlan is @transient). */ private def hasCometSAB(e: Expression): Boolean = e.exists { diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 1c9ec98a7a..04a53e8683 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -75,6 +75,11 @@ case class CometScanRule(session: SparkSession) private def _apply(plan: SparkPlan): SparkPlan = { if (!isCometLoaded(conf)) return plan + assert( + !CometConf.COMET_USE_PLANNER.get(conf), + s"CometScanRule ran while ${CometConf.COMET_USE_PLANNER.key}=true. CometPlanner should " + + "be the sole rule on this path. Either COMET_USE_PLANNER was flipped after session " + + "creation or the legacy rule was registered by mistake.") // Comet does not support structured streaming. The parallel guard in // CometExecRule only stops operator wrapping, so without this check we diff --git a/spark/src/main/scala/org/apache/comet/rules/CometShuffleRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometShuffleRule.scala new file mode 100644 index 0000000000..be8e2c6402 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/rules/CometShuffleRule.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.rules + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometNativeShuffle, CometShuffleExchangeExec} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec + +import org.apache.comet.CometConf +import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isCometShuffleEnabled} + +/** + * Handles the niche configuration where `spark.comet.exec.enabled=false` but + * `spark.comet.exec.shuffle.enabled=true`: the user wants Comet's columnar/native shuffle without + * any other Comet operator conversion. + * + * Under the legacy `CometScanRule` + `CometExecRule` pair, this case was covered inside + * `CometExecRule.apply` with a dedicated `applyCometShuffle` branch that ran when exec was + * disabled. `CometPlanner` intentionally short-circuits to `convertBlocks` when exec is disabled + * because its demand-aware Phase 1/2/3 pipeline assumes exec is on (Phase 2's shuffle rule is + * "convert iff parent or child LIKELY_COMET", which never fires with nothing else convertible). + * Rather than bake a shuffle-only escape hatch into `CometPlanner`, keep that mode in its own + * tiny rule so the planner stays focused on the full compile path. + * + * Pipeline contract: + * - With `COMET_USE_PLANNER=true` (the new path), both `CometPlanner` and this rule run as + * `queryStagePrepRule`s and as `preColumnarTransitions`. Their exec-flag gates are disjoint: + * `CometPlanner` short-circuits when exec is disabled, this rule short-circuits when exec is + * enabled. At most one does real work per invocation. + * - With `COMET_USE_PLANNER=false` (the legacy path), this rule is a no-op because + * `CometExecRule` already embeds the same logic. + * + * How to remove this rule in the future: + * 1. Drop the `exec=off + shuffle=on` mode from the docs and config. 2. Delete this file. 3. In + * `CometSparkSessionExtensions`: remove the `injectQueryStagePrepRule` entry for + * `CometShuffleRule`, and replace `CometExecColumnar.preColumnarTransitions`' planner branch + * with an identity rule (`new Rule[SparkPlan] { def apply(p) = p }`). 4. Delete + * `CometExecRule.SKIP_COMET_SHUFFLE_TAG` if its only remaining reader is + * `CometSpark34AqeDppFallbackRule` and Spark 3.4 support has been dropped. + */ +case class CometShuffleRule(session: SparkSession) extends Rule[SparkPlan] with Logging { + + override def apply(plan: SparkPlan): SparkPlan = { + if (!isCometLoaded(conf)) return plan + // Legacy rule path covers this mode via CometExecRule; skip to avoid double-wrapping. + if (!CometConf.COMET_USE_PLANNER.get(conf)) return plan + // CometPlanner already handled the exec-enabled case, including shuffle conversion in Phase 3. + if (CometConf.COMET_EXEC_ENABLED.get(conf)) return plan + if (!isCometShuffleEnabled(conf)) return plan + applyCometShuffle(plan) + } + + /** + * Single-pass wrap of Spark shuffles as `CometShuffleExchangeExec`. No child wiring, no + * protobuf tree, no logical-link reconciliation: the wrapped shuffle sits under a Spark parent, + * and Spark's `ApplyColumnarRulesAndInsertTransitions` inserts the columnar-to-row boundaries + * around it. Verbatim port of `CometExecRule.applyCometShuffle`. + */ + private def applyCometShuffle(plan: SparkPlan): SparkPlan = { + plan.transformUp { + case s: ShuffleExchangeExec if shouldSkipCometShuffle(s) => + s + case s: ShuffleExchangeExec => + CometShuffleExchangeExec.shuffleSupported(s) match { + case Some(CometNativeShuffle) => + // Arrow's native kernels don't support Decimal32 / Decimal64 yet; force Decimal128 + // so the shuffle's input batches match what native execution expects. + conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true") + CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle) + case Some(CometColumnarShuffle) => + CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle) + case None => + s + } + } + } + + /** + * Written by `CometSpark34AqeDppFallbackRule` on Spark < 3.5 and (historically) by the + * revert-redundant-shuffle post-pass. Honor it so shuffles that should stay Spark-native stay + * Spark-native. + */ + private def shouldSkipCometShuffle(s: ShuffleExchangeExec): Boolean = + s.getTagValue(CometExecRule.SKIP_COMET_SHUFFLE_TAG).isDefined +} diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index 066b770bbb..acfb0b655f 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -23,11 +23,13 @@ import scala.collection.mutable.ListBuffer import scala.jdk.CollectionConverters._ import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} +import org.apache.spark.sql.catalyst.expressions.{Expression, IsNotNull, IsNull, Literal, PlanExpression} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues -import org.apache.spark.sql.comet.{CometNativeExec, CometNativeScanExec, CometScanExec} +import org.apache.spark.sql.comet.{CometNativeExec, CometNativeScanExec} +import org.apache.spark.sql.comet.shims.ShimFileSourceScanExec import org.apache.spark.sql.execution.{FileSourceScanExec, InSubqueryExec, SubqueryAdaptiveBroadcastExec} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.ArrayType import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometConf.COMET_EXEC_ENABLED @@ -42,7 +44,7 @@ import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType} /** * Validation and serde logic for `native_datafusion` scans. */ -object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { +object CometNativeScan extends CometOperatorSerde[FileSourceScanExec] with Logging { /** Determine whether the scan is supported and tag the Spark plan with any fallback reasons */ def isSupported(scanExec: FileSourceScanExec): Boolean = { @@ -95,16 +97,38 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { case _ => false } + /** + * Data filters pushed down to the native reader. Excludes DPP (subquery-bearing) filters, which + * are resolved at execution time via CometNativeScanExec.serializedPartitionData, and null + * checks on array columns (Arrow semantics differ from Spark for nested null checks). + * + * Extracted from the previous CometScanExec.supportedDataFilters so this serde can operate + * directly on FileSourceScanExec. + */ + private def supportedDataFilters(scan: FileSourceScanExec): Seq[Expression] = + scan.dataFilters + .filterNot(isDynamicPruningFilter) + .filterNot(isNullCheckOnArrayColumn) + + private def isDynamicPruningFilter(e: Expression): Boolean = + e.exists(_.isInstanceOf[PlanExpression[_]]) + + private def isNullCheckOnArrayColumn(expr: Expression): Boolean = expr match { + case IsNotNull(child) => child.dataType.isInstanceOf[ArrayType] + case IsNull(child) => child.dataType.isInstanceOf[ArrayType] + case _ => false + } + override def enabledConfig: Option[ConfigEntry[Boolean]] = None - override def getSupportLevel(operator: CometScanExec): SupportLevel = { - // all checks happen in CometScanRule before ScanExec is converted to CometScanExec, so - // we always report compatible here because this serde object is for the converted CometScanExec + override def getSupportLevel(operator: FileSourceScanExec): SupportLevel = { + // all checks happen in CometScanRule / CometPlanner before the scan reaches this serde, so + // we always report compatible here because this serde is only invoked for validated scans. Compatible() } override def convert( - scan: CometScanExec, + scan: FileSourceScanExec, builder: Operator.Builder, childOp: OperatorOuterClass.Operator*): Option[OperatorOuterClass.Operator] = { val nativeScanBuilder = OperatorOuterClass.NativeScan.newBuilder() @@ -127,7 +151,7 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.get(scan.conf)) { val dataFilters = new ListBuffer[Expr]() - for (filter <- scan.supportedDataFilters) { + for (filter <- supportedDataFilters(scan)) { exprToProto(filter, scan.output) match { case Some(proto) => dataFilters += proto case _ => @@ -158,10 +182,7 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { // Extract object store options from first file (S3 configs apply to all files in scan). // Use selectedPartitions (static) instead of getFilePartitions() because at planning time // DPP subqueries haven't been resolved yet. Object store options don't depend on DPP. - val firstFileUri = scan.selectedPartitions - .flatMap(_.files.headOption) - .headOption - .map(_.getPath.toUri) + val firstFileUri = ShimFileSourceScanExec.firstSelectedFileUri(scan) val partitionSchema = schema2Proto(scan.relation.partitionSchema.fields) val requiredSchema = schema2Proto(scan.requiredSchema.fields) @@ -229,7 +250,7 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { } - override def createExec(nativeOp: Operator, op: CometScanExec): CometNativeExec = { - CometNativeScanExec(nativeOp, op.wrapped, op.session, op) + override def createExec(nativeOp: Operator, op: FileSourceScanExec): CometNativeExec = { + CometNativeScanExec(nativeOp, op, op.session) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index 45d708aaef..443d1ee269 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} -import org.apache.spark.sql.comet.shims.ShimStreamSourceAwareSparkPlan +import org.apache.spark.sql.comet.shims.{ShimFileSourceScanExec, ShimStreamSourceAwareSparkPlan} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.{ScalarSubquery => ExecScalarSubquery} import org.apache.spark.sql.execution.datasources._ @@ -70,7 +70,6 @@ case class CometNativeScanExec( disableBucketedScan: Boolean = false, originalPlan: FileSourceScanExec, override val serializedPlanOpt: SerializedPlan, - @transient scan: CometScanExec, // Lazy access to file partitions without serializing with plan sourceKey: String) // Key for PlanDataInjector to match common+partition data at runtime extends CometLeafExec with DataSourceScanExec @@ -143,25 +142,39 @@ case class CometNativeScanExec( override lazy val outputPartitioning: Partitioning = { if (bucketedScan) { originalPlan.outputPartitioning - } else { - // Use perPartitionData.length instead of originalPlan.inputRDD.getNumPartitions. - // - // originalPlan.inputRDD triggers FileSourceScanExec's full scan pipeline including - // codegen on partition filter expressions. With DPP, this calls - // InSubqueryExec.doGenCode which requires the subquery to have finished - but - // outputPartitioning can be accessed before prepare() runs (e.g., by - // ValidateRequirements during plan validation). - // - // perPartitionData goes through serializedPartitionData, which explicitly resolves - // DPP subqueries (via updateResult()) before accessing file partitions. This is the - // same pattern CometIcebergNativeScanExec uses. + } else if (hasUnresolvableDpp(partitionFilters)) { + // DPP resolution requires executing a subquery via InSubqueryExec.updateResult(). + // If any DPP filter's plan is a CometSubqueryAdaptiveBroadcastExec (AQE DPP wrapper + // that's not executable until CometPlanAdaptiveDynamicPruningFilters converts it), + // we cannot resolve DPP yet. Use a pre-DPP upper bound so EnsureRequirements + // (which calls child.outputPartitioning during AQE stage optimization) doesn't + // crash with "CSAB.doExecute" - our DPP rule runs later and does the real + // conversion. Post-DPP pruning still happens at execution time. // - // This is also more correct: perPartitionData.length reflects the post-DPP partition - // count, matching what CometExecRDD actually uses in doExecuteColumnar(). + // This mirrors Spark's own FileSourceScanExec, which uses UnknownPartitioning(0) + // for non-bucketed scans. We do slightly better by reporting a pre-DPP partition + // count as a cardinality hint. + UnknownPartitioning(ShimFileSourceScanExec.selectedPartitionCount(originalPlan)) + } else { + // DPP is resolvable (non-AQE DPP, or AQE DPP already converted). perPartitionData + // triggers serializedPartitionData which explicitly resolves DPP via updateResult(). + // The resulting length matches what CometExecRDD's per-partition arrays will see + // at execution time, so firstNonBroadcastPlanNumPartitions (operators.scala:515) + // stays consistent with perPartitionByKey array lengths. UnknownPartitioning(perPartitionData.length) } } + private def hasUnresolvableDpp(filters: Seq[Expression]): Boolean = + filters.exists(_.exists { + case DynamicPruningExpression(e: InSubqueryExec) if e.values().isEmpty => + // Any BaseSubqueryExec whose doExecute() throws (like + // CometSubqueryAdaptiveBroadcastExec before its rule runs) cannot be resolved. + // Class-name match avoids an upward import dependency on the CSAB class. + e.plan.getClass.getSimpleName == "CometSubqueryAdaptiveBroadcastExec" + case _ => false + }) + override lazy val outputOrdering: Seq[SortOrder] = originalPlan.outputOrdering /** @@ -203,20 +216,18 @@ case class CometNativeScanExec( } case _ => } - // CometNativeScanExec.partitionFilters and CometScanExec.partitionFilters contain - // different InSubqueryExec instances. convertSubqueryBroadcasts replaced the former with - // CometSubqueryBroadcastExec, but the latter still has the original SubqueryBroadcastExec. - // Both need resolution because CometScanExec.dynamicallySelectedPartitions evaluates its - // own partitionFilters. updateResult() is a no-op if already resolved. - if (scan != null) { - scan.partitionFilters.foreach { - case DynamicPruningExpression(e: InSubqueryExec) if e.values().isEmpty => - logDebug( - "Resolving CometScanExec DPP subquery: " + - s"plan=${e.plan.getClass.getSimpleName}") - e.updateResult() - case _ => - } + // CometNativeScanExec.partitionFilters and originalPlan.partitionFilters contain different + // InSubqueryExec instances. convertSubqueryBroadcasts replaced the former with + // CometSubqueryBroadcastExec, but originalPlan still has the original SubqueryBroadcastExec. + // Both need resolution because originalPlan.inputRDD below evaluates its own partitionFilters + // during FileScanRDD construction. updateResult() is a no-op if already resolved. + originalPlan.partitionFilters.foreach { + case DynamicPruningExpression(e: InSubqueryExec) if e.values().isEmpty => + logDebug( + "Resolving originalPlan DPP subquery: " + + s"plan=${e.plan.getClass.getSimpleName}") + e.updateResult() + case _ => } // Resolve scalar subqueries in dataFilters and push to the native Parquet reader. // supportedDataFilters excludes PlanExpression at planning time (unresolved), so these @@ -253,8 +264,12 @@ case class CometNativeScanExec( } } - // Get file partitions from CometScanExec (handles bucketing, etc.) - val filePartitions = scan.getFilePartitions() + // Delegate FilePartition computation to Spark's FileSourceScanExec. Its inputRDD + // builds a FileScanRDD whose filePartitions field exposes the same partitions Comet + // would compute itself (bucketing, pruning, splitting). The FileScanRDD's readFile + // closure is captured but never invoked because we only consume filePartitions and + // serialize them for native execution. + val filePartitions = originalPlan.inputRDD.asInstanceOf[FileScanRDD].filePartitions // Serialize each partition's files import org.apache.comet.serde.operator.partition2Proto @@ -341,7 +356,6 @@ case class CometNativeScanExec( disableBucketedScan, canonOriginal, SerializedPlan(None), - null, // Transient scan not needed for canonicalization "" ) // sourceKey not needed for canonicalization } @@ -380,7 +394,7 @@ case class CometNativeScanExec( case Some(metric) => nativeMetrics + ("numOutputRows" -> metric) case None => nativeMetrics } - withAlias ++ scan.metrics.filterKeys(driverMetricKeys) + withAlias ++ originalPlan.metrics.filterKeys(driverMetricKeys) } /** @@ -393,8 +407,7 @@ object CometNativeScanExec { def apply( nativeOp: Operator, scanExec: FileSourceScanExec, - session: SparkSession, - scan: CometScanExec): CometNativeScanExec = { + session: SparkSession): CometNativeScanExec = { // TreeNode.mapProductIterator is protected method. def mapProductIterator[B: ClassTag](product: Product, f: Any => B): Array[B] = { val arr = Array.ofDim[B](product.productArity) @@ -444,7 +457,6 @@ object CometNativeScanExec { wrapped.disableBucketedScan, wrapped, SerializedPlan(None), - scan, sourceKey) scanExec.logicalLink.foreach(batchScanExec.setLogicalLink) batchScanExec diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index d4ee4e4ccf..6e3b56d35f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -51,6 +51,7 @@ import com.google.common.base.Objects import org.apache.comet.{CometConf, CometExplainInfo} import org.apache.comet.CometConf.{COMET_EXEC_SHUFFLE_ENABLED, COMET_SHUFFLE_MODE} import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, isCometShuffleManagerEnabled, withInfos} +import org.apache.comet.planner.tags.CometTags import org.apache.comet.serde.{Compatible, OperatorOuterClass, QueryPlanSerde, SupportLevel, Unsupported} import org.apache.comet.serde.operator.CometSink import org.apache.comet.shims.{CometTypeShim, ShimCometShuffleExchangeExec} @@ -582,7 +583,8 @@ object CometShuffleExchangeExec } } - private def isCometPlan(op: SparkPlan): Boolean = op.isInstanceOf[CometPlan] + private def isCometPlan(op: SparkPlan): Boolean = + op.isInstanceOf[CometPlan] || op.getTagValue(CometTags.NATIVE_OP).isDefined /** * Returns true if a given spark plan is Comet shuffle operator. diff --git a/spark/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimFileSourceScanExec.scala b/spark/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimFileSourceScanExec.scala new file mode 100644 index 0000000000..f1bbab1c21 --- /dev/null +++ b/spark/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimFileSourceScanExec.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet.shims + +import java.net.URI + +import org.apache.spark.sql.execution.FileSourceScanExec + +/** + * Spark 3.x exposes `FileSourceScanExec.selectedPartitions` as `Seq[PartitionDirectory]`. Spark + * 4.x changed it to the `ScanFileListing` trait. These helpers hide the difference. + */ +object ShimFileSourceScanExec { + + def selectedPartitionCount(scan: FileSourceScanExec): Int = + scan.selectedPartitions.length + + def firstSelectedFileUri(scan: FileSourceScanExec): Option[URI] = + scan.selectedPartitions + .flatMap(_.files.headOption) + .headOption + .map(_.getPath.toUri) +} diff --git a/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimFileSourceScanExec.scala b/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimFileSourceScanExec.scala new file mode 100644 index 0000000000..10ebe7f059 --- /dev/null +++ b/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimFileSourceScanExec.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet.shims + +import java.net.URI + +import org.apache.spark.sql.execution.FileSourceScanExec + +/** + * Spark 3.x exposes `FileSourceScanExec.selectedPartitions` as `Seq[PartitionDirectory]`. Spark + * 4.x changed it to the `ScanFileListing` trait. These helpers hide the difference. + */ +object ShimFileSourceScanExec { + + def selectedPartitionCount(scan: FileSourceScanExec): Int = + scan.selectedPartitions.partitionCount + + def firstSelectedFileUri(scan: FileSourceScanExec): Option[URI] = { + val partitions = scan.selectedPartitions.filePartitionIterator + while (partitions.hasNext) { + val files = partitions.next().files + if (files.hasNext) { + return Some(files.next().getPath.toUri) + } + } + None + } +} diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 4a02a26119..2d0860c1f0 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -2269,50 +2269,47 @@ class CometExecSuite extends CometTestBase { } test("Comet native metrics: scan") { - Seq(CometConf.SCAN_NATIVE_DATAFUSION, CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach { - scanMode => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode) { - withTempDir { dir => - val path = new Path(dir.toURI.toString, "native-scan.parquet") - makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = true, 10000) - withParquetTable(path.toString, "tbl") { - val df = sql("SELECT * FROM tbl") - df.collect() + Seq(CometConf.SCAN_NATIVE_DATAFUSION).foreach { scanMode => + withSQLConf( + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode) { + withTempDir { dir => + val path = new Path(dir.toURI.toString, "native-scan.parquet") + makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = true, 10000) + withParquetTable(path.toString, "tbl") { + val df = sql("SELECT * FROM tbl") + df.collect() - val scan = find(df.queryExecution.executedPlan)(s => - s.isInstanceOf[CometScanExec] || s.isInstanceOf[CometNativeScanExec]) - assert(scan.isDefined, s"Expected to find a Comet scan node for $scanMode") - val metrics = scan.get.metrics + val scan = find(df.queryExecution.executedPlan)(s => + s.isInstanceOf[CometScanExec] || s.isInstanceOf[CometNativeScanExec]) + assert(scan.isDefined, s"Expected to find a Comet scan node for $scanMode") + val metrics = scan.get.metrics - assert( - metrics.contains("time_elapsed_scanning_total"), - s"[$scanMode] Missing time_elapsed_scanning_total. Available: ${metrics.keys}") - assert(metrics.contains("bytes_scanned")) - assert(metrics.contains("output_rows")) - assert(metrics.contains("time_elapsed_opening")) - assert(metrics.contains("time_elapsed_processing")) - assert(metrics.contains("time_elapsed_scanning_until_data")) - assert( - metrics("time_elapsed_scanning_total").value > 0, - s"[$scanMode] time_elapsed_scanning_total should be > 0") - assert( - metrics("bytes_scanned").value > 0, - s"[$scanMode] bytes_scanned should be > 0") - assert(metrics("output_rows").value > 0, s"[$scanMode] output_rows should be > 0") - assert( - metrics("time_elapsed_opening").value > 0, - s"[$scanMode] time_elapsed_opening should be > 0") - assert( - metrics("time_elapsed_processing").value > 0, - s"[$scanMode] time_elapsed_processing should be > 0") - assert( - metrics("time_elapsed_scanning_until_data").value > 0, - s"[$scanMode] time_elapsed_scanning_until_data should be > 0") - } + assert( + metrics.contains("time_elapsed_scanning_total"), + s"[$scanMode] Missing time_elapsed_scanning_total. Available: ${metrics.keys}") + assert(metrics.contains("bytes_scanned")) + assert(metrics.contains("output_rows")) + assert(metrics.contains("time_elapsed_opening")) + assert(metrics.contains("time_elapsed_processing")) + assert(metrics.contains("time_elapsed_scanning_until_data")) + assert( + metrics("time_elapsed_scanning_total").value > 0, + s"[$scanMode] time_elapsed_scanning_total should be > 0") + assert(metrics("bytes_scanned").value > 0, s"[$scanMode] bytes_scanned should be > 0") + assert(metrics("output_rows").value > 0, s"[$scanMode] output_rows should be > 0") + assert( + metrics("time_elapsed_opening").value > 0, + s"[$scanMode] time_elapsed_opening should be > 0") + assert( + metrics("time_elapsed_processing").value > 0, + s"[$scanMode] time_elapsed_processing should be > 0") + assert( + metrics("time_elapsed_scanning_until_data").value > 0, + s"[$scanMode] time_elapsed_scanning_until_data should be > 0") } } + } } } diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala index b8db737a3c..22f032012c 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.{CometTestBase, DataFrame, SaveMode} import org.apache.spark.sql.comet.CometNativeScanExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.datasources.FileScanRDD import org.apache.spark.sql.functions.{col, sum} import org.apache.comet.CometConf @@ -66,8 +67,8 @@ class ParquetReadFromFakeHadoopFsSuite extends CometTestBase with AdaptiveSparkP p } assert(scans.size == 1) - // File partitions are now accessed from the scan field, not from the protobuf - val filePartitions = scans.head.scan.getFilePartitions() + val filePartitions = + scans.head.originalPlan.inputRDD.asInstanceOf[FileScanRDD].filePartitions assert(filePartitions.nonEmpty) assert( filePartitions.head.files.head.filePath.toString