Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 10 additions & 18 deletions dev/diffs/4.0.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1309,24 +1309,23 @@ index 0df7f806272..92390bd819f 100644

test("non-matching optional group") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 2e33f6505ab..949fdea0003 100644
index 2e33f6505ab..54f5081e10a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -23,10 +23,12 @@ import org.apache.spark.SparkRuntimeException
@@ -23,10 +23,11 @@ import org.apache.spark.SparkRuntimeException
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, LogicalPlan, Project, Sort, Union}
+import org.apache.spark.sql.comet.{CometNativeColumnarToRowExec, CometNativeScanExec, CometScanExec}
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.IgnoreCometNativeDataFusion
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecution}
import org.apache.spark.sql.execution.datasources.FileScanRDD
-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, BroadcastNestedLoopJoinExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -1529,6 +1531,18 @@ class SubquerySuite extends QueryTest
@@ -1529,6 +1530,18 @@ class SubquerySuite extends QueryTest
fs.inputRDDs().forall(
_.asInstanceOf[FileScanRDD].filePartitions.forall(
_.files.forall(_.urlEncodedPath.contains("p=0"))))
Expand All @@ -1345,7 +1344,7 @@ index 2e33f6505ab..949fdea0003 100644
case _ => false
})
}
@@ -2094,7 +2108,7 @@ class SubquerySuite extends QueryTest
@@ -2094,7 +2107,7 @@ class SubquerySuite extends QueryTest

df.collect()
val exchanges = collect(df.queryExecution.executedPlan) {
Expand All @@ -1354,13 +1353,7 @@ index 2e33f6505ab..949fdea0003 100644
}
assert(exchanges.size === 1)
}
@@ -2674,22 +2688,31 @@ class SubquerySuite extends QueryTest
}
}

- test("SPARK-43402: FileSourceScanExec supports push down data filter with scalar subquery") {
+ test("SPARK-43402: FileSourceScanExec supports push down data filter with scalar subquery",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) {
@@ -2678,18 +2691,25 @@ class SubquerySuite extends QueryTest
def checkFileSourceScan(query: String, answer: Seq[Row]): Unit = {
val df = sql(query)
checkAnswer(df, answer)
Expand All @@ -1369,6 +1362,7 @@ index 2e33f6505ab..949fdea0003 100644
+ val dataSourceScanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanLike => f
+ case c: CometScanExec => c
+ case n: CometNativeScanExec => n
}
sparkContext.listenerBus.waitUntilEmpty()
- assert(fileSourceScanExec.size === 1)
Expand All @@ -1378,13 +1372,11 @@ index 2e33f6505ab..949fdea0003 100644
+ assert(dataSourceScanExec.size === 1)
+ val scalarSubquery = dataSourceScanExec.head match {
+ case f: FileSourceScanLike =>
+ f.dataFilters.flatMap(_.collect {
+ case s: ScalarSubquery => s
+ })
+ f.dataFilters.flatMap(_.collect { case s: ScalarSubquery => s })
+ case c: CometScanExec =>
+ c.dataFilters.flatMap(_.collect {
+ case s: ScalarSubquery => s
+ })
+ c.dataFilters.flatMap(_.collect { case s: ScalarSubquery => s })
+ case n: CometNativeScanExec =>
+ n.dataFilters.flatMap(_.collect { case s: ScalarSubquery => s })
+ }
assert(scalarSubquery.length === 1)
assert(scalarSubquery.head.plan.isInstanceOf[ReusedSubqueryExec])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,45 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.internal.SQLConf

import org.apache.comet.CometConf._
import org.apache.comet.rules.{CometExecRule, CometScanRule, EliminateRedundantTransitions}
import org.apache.comet.rules.{CometExecRule, CometReuseSubquery, CometScanRule, EliminateRedundantTransitions}
import org.apache.comet.shims.ShimCometSparkSessionExtensions

/**
* CometDriverPlugin will register an instance of this class with Spark.
*
* This class is responsible for injecting Comet rules and extensions into Spark.
* Comet rules are injected into Spark's rule pipeline at several extension points. The execution
* order differs between AQE and non-AQE paths:
*
* Non-AQE (QueryExecution.preparations):
* {{{
* 1. PlanDynamicPruningFilters -- Spark creates DPP filters
* 2. PlanSubqueries -- Spark creates SubqueryExec for scalar subqueries
* 3. EnsureRequirements -- Spark inserts shuffles/sorts
* 4. ApplyColumnarRulesAndInsertTransitions:
* a. preColumnarTransitions: CometScanRule, CometExecRule (replace Spark -> Comet nodes)
* b. insertTransitions: ColumnarToRow/RowToColumnar added
* c. postColumnarTransitions: EliminateRedundantTransitions
* 5. ReuseExchangeAndSubquery -- Spark deduplicates subqueries (sees Comet nodes)
* }}}
*
* AQE (AdaptiveSparkPlanExec):
* {{{
* Initial plan:
* queryStagePreparationRules: CometScanRule, CometExecRule (replace Spark -> Comet nodes)
*
* Per stage (optimizeQueryStage + postStageCreationRules):
* 1. queryStageOptimizerRules: ReuseAdaptiveSubquery, CometReuseSubquery
* 2. postStageCreationRules -> ApplyColumnarRulesAndInsertTransitions:
* a. preColumnarTransitions: CometScanRule, CometExecRule (no-ops, already converted)
* b. insertTransitions
* c. postColumnarTransitions: EliminateRedundantTransitions
* }}}
*
* CometReuseSubquery is needed in AQE because Spark's ReuseAdaptiveSubquery may run before
* Comet's node replacements in the initial plan construction, and the replacements can disrupt
* subquery reuse that was already applied. The shim-based registration
* (injectQueryStageOptimizerRuleShim) handles API availability: Spark 3.5+ has
* injectQueryStageOptimizerRule, Spark 3.4 does not (no-op).
*/
class CometSparkSessionExtensions
extends (SparkSessionExtensions => Unit)
Expand All @@ -49,6 +81,7 @@ class CometSparkSessionExtensions
extensions.injectColumnar { session => CometExecColumnar(session) }
extensions.injectQueryStagePrepRule { session => CometScanRule(session) }
extensions.injectQueryStagePrepRule { session => CometExecRule(session) }
injectQueryStageOptimizerRuleShim(extensions, CometReuseSubquery)
}

case class CometScanColumnar(session: SparkSession) extends ColumnarRule {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.rules

import scala.collection.mutable

import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
import org.apache.spark.sql.execution.{BaseSubqueryExec, ExecSubqueryExpression, ReusedSubqueryExec, SparkPlan}

/**
* Re-applies subquery deduplication after Comet node conversions.
*
* Spark's ReuseAdaptiveSubquery runs as a queryStageOptimizerRule before postStageCreationRules,
* which is where CometScanRule/CometExecRule replace Spark operators with Comet equivalents. The
* Comet rules copy expressions from the original Spark nodes, which can disrupt subquery reuse
* that was already applied by Spark's rule. This rule runs after Comet conversions to restore
* proper deduplication.
*
* Uses the same algorithm as Spark's ReuseExchangeAndSubquery (subquery portion): top-down
* traversal via transformAllExpressionsWithPruning, caching by canonical form.
*
* For non-AQE, Spark's ReuseExchangeAndSubquery runs after ApplyColumnarRulesAndInsertTransitions
* in QueryExecution.preparations and handles reuse correctly without this rule.
*
* @see
* ReuseExchangeAndSubquery (Spark's non-AQE subquery reuse)
* @see
* ReuseAdaptiveSubquery (Spark's AQE subquery reuse)
*/
case object CometReuseSubquery extends Rule[SparkPlan] {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should start thinking about an audit process for areas where we are replicating Spark logic, so that when we upgrade to future versions of Spark we can check that we staying consistent. Maybe a starting point is adding annotations to sections of code where we replicate upstream logic? No action needed on this PR ... just thinking aloud


def apply(plan: SparkPlan): SparkPlan = {
if (!conf.subqueryReuseEnabled) {
return plan
}

val cache = mutable.Map.empty[SparkPlan, BaseSubqueryExec]

plan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
case sub: ExecSubqueryExpression if !sub.plan.isInstanceOf[ReusedSubqueryExec] =>
val cached = cache.getOrElseUpdate(sub.plan.canonicalized, sub.plan)
if (cached.ne(sub.plan)) {
sub.withNewPlan(ReusedSubqueryExec(cached))
} else {
sub
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,30 @@ case class CometNativeScanExec(
override lazy val metadata: Map[String, String] = originalPlan.metadata

/**
* Prepare DPP subquery plans before execution.
* Prepare subquery plans before execution.
*
* For non-AQE DPP, partitionFilters contains DynamicPruningExpression(InSubqueryExec(...))
* inserted by PlanDynamicPruningFilters (which runs before Comet rules). We call
* e.plan.prepare() here so that the subquery plans are set up before execution begins.
* DPP: partitionFilters may contain DynamicPruningExpression(InSubqueryExec(...)) from
* PlanDynamicPruningFilters.
*
* Note: doPrepare() alone is NOT sufficient for DPP resolution. serializedPartitionData can be
* triggered from findAllPlanData (via commonData) on a BroadcastExchangeExec thread, outside
* the normal prepare() -> executeSubqueries() flow. The actual DPP resolution (updateResult)
* happens in serializedPartitionData below.
* Scalar subquery pushdown (SPARK-43402, Spark 4.0+): dataFilters may contain ScalarSubquery.
*
* serializedPartitionData can be triggered outside the normal prepare() -> executeSubqueries()
* flow (e.g., from a BroadcastExchangeExec thread), so we prepare subquery plans here and
* resolve them explicitly in serializedPartitionData via updateResult().
*/
override protected def doPrepare(): Unit = {
partitionFilters.foreach {
case DynamicPruningExpression(e: InSubqueryExec) =>
e.plan.prepare()
case _ =>
}
dataFilters.foreach { f =>
f.foreach {
case s: org.apache.spark.sql.execution.ScalarSubquery =>
s.plan.prepare()
case _ =>
}
}
super.doPrepare()
}

Expand Down Expand Up @@ -138,7 +145,7 @@ case class CometNativeScanExec(
//
// originalPlan.inputRDD triggers FileSourceScanExec's full scan pipeline including
// codegen on partition filter expressions. With DPP, this calls
// InSubqueryExec.doGenCode which requires the subquery to have finished but
// InSubqueryExec.doGenCode which requires the subquery to have finished - but
// outputPartitioning can be accessed before prepare() runs (e.g., by
// ValidateRequirements during plan validation).
//
Expand Down Expand Up @@ -208,8 +215,41 @@ case class CometNativeScanExec(
case _ =>
}
}
// Extract common data from nativeOp
val commonBytes = nativeOp.getNativeScan.getCommon.toByteArray
// Resolve scalar subqueries in dataFilters and push to the native Parquet reader.
// supportedDataFilters excludes PlanExpression at planning time (unresolved), so these
// aren't in the serialized native plan yet. We resolve them here and append to the
// NativeScanCommon protobuf. Same approach as FileSourceScanLike.pushedDownFilters
// (DataSourceScanExec.scala), which resolves ScalarSubquery -> Literal at execution time.
val commonBytes = {
val base = nativeOp.getNativeScan.getCommon
val scalarSubqueryFilters = dataFilters
.filter(_.exists(_.isInstanceOf[org.apache.spark.sql.execution.ScalarSubquery]))
scalarSubqueryFilters.foreach { f =>
f.foreach {
case s: org.apache.spark.sql.execution.ScalarSubquery =>
s.updateResult()
case _ =>
}
}
val resolvedFilters = scalarSubqueryFilters
.map(_.transform { case s: org.apache.spark.sql.execution.ScalarSubquery =>
Comment thread
mbutrovich marked this conversation as resolved.
Outdated
Literal.create(s.eval(null), s.dataType)
})
if (resolvedFilters.nonEmpty) {
import org.apache.comet.serde.QueryPlanSerde.exprToProto
Comment thread
mbutrovich marked this conversation as resolved.
Outdated
val commonBuilder = base.toBuilder
for (filter <- resolvedFilters) {
exprToProto(filter, output) match {
case Some(proto) => commonBuilder.addDataFilters(proto)
case _ =>
logWarning(s"Could not serialize resolved scalar subquery filter: $filter")
}
}
commonBuilder.build().toByteArray
} else {
base.toByteArray
}
}

// Get file partitions from CometScanExec (handles bucketing, etc.)
val filePartitions = scan.getFilePartitions()
Expand Down Expand Up @@ -299,13 +339,15 @@ case class CometNativeScanExec(
case other: CometNativeScanExec =>
this.originalPlan == other.originalPlan &&
this.serializedPlanOpt == other.serializedPlanOpt &&
this.partitionFilters == other.partitionFilters
this.partitionFilters == other.partitionFilters &&
this.dataFilters == other.dataFilters
case _ =>
false
}
}

override def hashCode(): Int = Objects.hashCode(originalPlan, serializedPlanOpt)
override def hashCode(): Int =
Objects.hashCode(originalPlan, serializedPlanOpt, partitionFilters, dataFilters)

private val driverMetricKeys =
Set(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,16 @@

package org.apache.comet.shims

import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}

trait ShimCometSparkSessionExtensions {

/**
* TODO: delete after dropping Spark 3.x support and directly call
* SQLConf.EXTENDED_EXPLAIN_PROVIDERS.key
*/
protected val EXTENDED_EXPLAIN_PROVIDERS_KEY = "spark.sql.extendedExplainProviders"

// Extended info is available only since Spark 4.0.0
// (https://issues.apache.org/jira/browse/SPARK-47289)
def supportsExtendedExplainInfo(qe: QueryExecution): Boolean = {
try {
// Look for QueryExecution.extendedExplainInfo(scala.Function1[String, Unit], SparkPlan)
qe.getClass.getDeclaredMethod(
"extendedExplainInfo",
classOf[String => Unit],
Expand All @@ -43,4 +38,9 @@ trait ShimCometSparkSessionExtensions {
}
true
}

// injectQueryStageOptimizerRule not available on Spark 3.4
def injectQueryStageOptimizerRuleShim(
extensions: SparkSessionExtensions,
rule: Rule[SparkPlan]): Unit = {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.shims

import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}

trait ShimCometSparkSessionExtensions {

protected val EXTENDED_EXPLAIN_PROVIDERS_KEY = "spark.sql.extendedExplainProviders"

def supportsExtendedExplainInfo(qe: QueryExecution): Boolean = {
try {
qe.getClass.getDeclaredMethod(
"extendedExplainInfo",
classOf[String => Unit],
classOf[SparkPlan])
} catch {
case _: NoSuchMethodException | _: SecurityException => return false
}
true
}

// Available since Spark 3.5 (SPARK-45785)
def injectQueryStageOptimizerRuleShim(
extensions: SparkSessionExtensions,
rule: Rule[SparkPlan]): Unit = {
extensions.injectQueryStageOptimizerRule(_ => rule)
}
}
Loading
Loading