Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithEnvVarOrDefault("ENABLE_COMET", true)

val COMET_USE_PLANNER: ConfigEntry[Boolean] = conf("spark.comet.planner.enabled")
.category(CATEGORY_EXEC)
.doc(
"When true, Comet registers the single-rule CometPlanner in place of the legacy " +
"CometScanRule + CometExecRule pair. Default true. Flip to false to run the legacy " +
"path as a rollback. The choice is evaluated once at session extension injection, so " +
"changes after session creation do not switch rules. The selected rules assert on this " +
"flag at entry to surface configuration drift early.")
.booleanConf
.createWithDefault(true)

val COMET_NATIVE_SCAN_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.scan.enabled")
.category(CATEGORY_SCAN)
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.internal.SQLConf

import org.apache.comet.CometConf._
import org.apache.comet.planner.CometPlanner
import org.apache.comet.rules.{CometExecRule, CometPlanAdaptiveDynamicPruningFilters, CometReuseSubquery, CometScanRule, CometSpark34AqeDppFallbackRule, EliminateRedundantTransitions}
import org.apache.comet.shims.ShimCometSparkSessionExtensions

Expand Down Expand Up @@ -93,22 +94,34 @@ class CometSparkSessionExtensions
// Registered before CometScanRule/CometExecRule so tags are in place when conversion runs.
// No-op on Spark 3.5+; see CometSpark34AqeDppFallbackRule's class docstring.
injectPreSpark35QueryStagePrepRuleShim(extensions, CometSpark34AqeDppFallbackRule)
extensions.injectQueryStagePrepRule { session => CometScanRule(session) }
extensions.injectQueryStagePrepRule { session => CometExecRule(session) }
if (CometConf.COMET_USE_PLANNER.get()) {
extensions.injectQueryStagePrepRule { session => CometPlanner(session) }
} else {
extensions.injectQueryStagePrepRule { session => CometScanRule(session) }
extensions.injectQueryStagePrepRule { session => CometExecRule(session) }
}
injectQueryStageOptimizerRuleShim(extensions, CometPlanAdaptiveDynamicPruningFilters)
injectQueryStageOptimizerRuleShim(extensions, CometReuseSubquery)
}

case class CometScanColumnar(session: SparkSession) extends ColumnarRule {
override def preColumnarTransitions: Rule[SparkPlan] = CometScanRule(session)
override def preColumnarTransitions: Rule[SparkPlan] =
if (CometConf.COMET_USE_PLANNER.get()) CometPlanner(session)
else CometScanRule(session)
}

case class CometExecColumnar(session: SparkSession) extends ColumnarRule {
override def preColumnarTransitions: Rule[SparkPlan] = CometExecRule(session)
override def preColumnarTransitions: Rule[SparkPlan] =
if (CometConf.COMET_USE_PLANNER.get()) NoopRule else CometExecRule(session)

override def postColumnarTransitions: Rule[SparkPlan] =
EliminateRedundantTransitions(session)
}

/** Identity rule used when CometPlanner already ran via CometScanColumnar. */
private object NoopRule extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = plan
}
}

object CometSparkSessionExtensions extends Logging {
Expand Down
359 changes: 359 additions & 0 deletions spark/src/main/scala/org/apache/comet/planner/CometPlanner.scala

Large diffs are not rendered by default.

101 changes: 101 additions & 0 deletions spark/src/main/scala/org/apache/comet/planner/PlanningContext.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.planner

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec}
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.internal.SQLConf

import org.apache.comet.planner.phases.Phase1LikelyComet

/**
* State threaded through CometPlanner's three phases. Holds session-scoped data (configs, the
* active SparkSession), a few plan-wide flags computed once at the top of `apply` so per-node
* phases do not re-walk the whole tree, plus the broadcast consumer index computed in Phase 1 and
* consulted during Phase 2's broadcast decision.
*
* Constructed once per CometPlanner.apply invocation. Not shared across invocations because AQE
* re-entries reset session state.
*/
case class PlanningContext(
session: SparkSession,
conf: SQLConf,
broadcastConsumers: BroadcastConsumerIndex,
hasInputFileExpressions: Boolean)

/**
* Index of BroadcastExchangeExec instances to whether a likely-Comet consumer (a
* BroadcastHashJoinExec that would itself be LIKELY_COMET) exists in the plan. Built once during
* Phase 1 by walking the plan looking for joins that reference broadcast outputs.
*
* Replaces the convertNode retry-on-self-with-new-children pattern in the old CometExecRule:
* instead of converting children then re-asking whether to convert the parent, Phase 2 just reads
* the index.
*/
trait BroadcastConsumerIndex {
def isConsumedByCometCandidate(broadcast: BroadcastExchangeExec): Boolean
}

object BroadcastConsumerIndex extends Logging {

/** Empty index. Used as a starting placeholder or in tests. */
val Empty: BroadcastConsumerIndex = new BroadcastConsumerIndex {
override def isConsumedByCometCandidate(broadcast: BroadcastExchangeExec): Boolean = false
}

/**
* Walks `plan` looking for `BroadcastHashJoinExec` nodes that would themselves be LIKELY_COMET
* under the current configuration. For each such join, records every `BroadcastExchangeExec` it
* references as a consumer. Handles the AQE wrappers (`BroadcastQueryStageExec`,
* `ReusedExchangeExec`) that hide the raw broadcast between planning and execution.
*/
def build(plan: SparkPlan, conf: SQLConf): BroadcastConsumerIndex = {
val consumed = new java.util.IdentityHashMap[BroadcastExchangeExec, java.lang.Boolean]()
plan.foreach {
case bhj: BroadcastHashJoinExec if Phase1LikelyComet.isLikelyComet(bhj, conf) =>
bhj.children.foreach(indexBroadcast(_, consumed))
case _ =>
}
logDebug(s"BroadcastConsumerIndex built size=${consumed.size}")
new BroadcastConsumerIndex {
override def isConsumedByCometCandidate(broadcast: BroadcastExchangeExec): Boolean =
consumed.containsKey(broadcast)
}
}

private def indexBroadcast(
node: SparkPlan,
consumed: java.util.IdentityHashMap[BroadcastExchangeExec, java.lang.Boolean]): Unit =
node match {
case b: BroadcastExchangeExec =>
consumed.put(b, java.lang.Boolean.TRUE)
case BroadcastQueryStageExec(_, b: BroadcastExchangeExec, _) =>
consumed.put(b, java.lang.Boolean.TRUE)
case BroadcastQueryStageExec(_, ReusedExchangeExec(_, b: BroadcastExchangeExec), _) =>
consumed.put(b, java.lang.Boolean.TRUE)
case ReusedExchangeExec(_, b: BroadcastExchangeExec) =>
consumed.put(b, java.lang.Boolean.TRUE)
case _ =>
}
}
90 changes: 90 additions & 0 deletions spark/src/main/scala/org/apache/comet/planner/gates/S2CGate.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.planner.gates

import scala.collection.mutable.ListBuffer

import org.apache.spark.internal.Logging
import org.apache.spark.sql.comet.CometSparkToColumnarExec
import org.apache.spark.sql.execution.{FileSourceScanExec, LeafExecNode, SparkPlan}
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
import org.apache.spark.sql.execution.datasources.v2.json.JsonScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.internal.SQLConf

import org.apache.comet.CometConf
import org.apache.comet.CometConf._

/**
* Decides whether a non-Comet leaf should be wrapped in `CometSparkToColumnarExec` to bridge
* row-at-a-time data into a Comet-consuming parent. Ports
* `CometExecRule.shouldApplySparkToColumnar` and `isSparkToArrowEnabled` into a single entry
* point used by Phase 2. Demand gating (parent must be LIKELY_COMET) is applied by Phase 2
* itself, not here.
*/
object S2CGate extends Logging {

def shouldApply(op: SparkPlan, conf: SQLConf): Boolean = {
val fallbackReasons = new ListBuffer[String]()
if (!CometSparkToColumnarExec.isSchemaSupported(op.schema, fallbackReasons)) {
logDebug(
s"S2CGate reject schemaUnsupported node=${op.getClass.getSimpleName} " +
s"id=${op.id} reasons=${fallbackReasons.mkString(" | ")}")
return false
}

op match {
case scan: FileSourceScanExec =>
scan.relation.fileFormat match {
case _: CSVFileFormat => CometConf.COMET_CONVERT_FROM_CSV_ENABLED.get(conf)
case _: JsonFileFormat => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf)
case _: ParquetFileFormat => CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.get(conf)
case _ => isSparkToArrowEnabled(op, conf)
}
case scan: BatchScanExec =>
scan.scan match {
case _: CSVScan => CometConf.COMET_CONVERT_FROM_CSV_ENABLED.get(conf)
case _: JsonScan => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf)
case _: ParquetScan => CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.get(conf)
case _ => isSparkToArrowEnabled(op, conf)
}
case _: LeafExecNode =>
isSparkToArrowEnabled(op, conf)
case _ =>
// Matches the old rule's conservative behavior. Non-leaf intermediate operators are
// not wrapped in CometSparkToColumnarExec today.
false
}
}

private def isSparkToArrowEnabled(op: SparkPlan, conf: SQLConf): Boolean = {
COMET_SPARK_TO_ARROW_ENABLED.get(conf) && {
// Derive the operator name from the class name, not op.nodeName. Some operators
// override nodeName (e.g. `InMemoryTableScanExec` returns `"Scan <cachedName>"`),
// which would never match the "InMemoryTableScan" entry in the allowlist.
val derivedName = op.getClass.getSimpleName.replaceAll("Exec$", "")
COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST.get(conf).contains(derivedName)
}
}
}
Loading
Loading