Skip to content

Commit 416e5b8

Browse files
committed
Convert prints to logs.
1 parent 0837d9e commit 416e5b8

4 files changed

Lines changed: 93 additions & 160 deletions

File tree

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 38 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -301,100 +301,60 @@ case class CometExecRule(session: SparkSession)
301301
}
302302
}
303303

304-
// scalastyle:off println
305304
plan.transformUp { case op =>
306-
val hasSubqueryExpr = op.expressions.exists(_.exists {
307-
case _: InSubqueryExec => true
308-
case _ => false
309-
})
310-
if (hasSubqueryExpr) {
311-
println(s"[RULE-DEBUG] convertNode on ${op.getClass.getSimpleName} " +
312-
s"which HAS InSubqueryExec expressions")
313-
op.expressions.foreach { expr =>
314-
expr.foreach {
315-
case sub: InSubqueryExec =>
316-
println(s"[RULE-DEBUG] InSubqueryExec.plan: ${sub.plan.getClass.getSimpleName}")
317-
sub.plan match {
318-
case sb: SubqueryBroadcastExec =>
319-
println(s"[RULE-DEBUG] SubqueryBroadcast.child: " +
320-
s"${sb.child.getClass.getSimpleName}")
321-
sb.child match {
322-
case b: BroadcastExchangeExec =>
323-
println(s"[RULE-DEBUG] BroadcastExchange.child: " +
324-
s"${b.child.getClass.getSimpleName}")
325-
println(s"[RULE-DEBUG] BroadcastExchange.child is CometNative? " +
326-
s"${b.child.isInstanceOf[CometNativeExec]}")
327-
println(s"[RULE-DEBUG] BroadcastExchange.children all CometNative? " +
328-
s"${b.children.forall(_.isInstanceOf[CometNativeExec])}")
329-
case other =>
330-
println(s"[RULE-DEBUG] SubqueryBroadcast.child is: " +
331-
s"${other.getClass.getSimpleName}")
332-
}
333-
case other =>
334-
println(s"[RULE-DEBUG] sub.plan is: ${other.getClass.getSimpleName}")
335-
}
336-
case _ =>
337-
}
338-
}
339-
}
340305
val converted = convertNode(op)
341306
// Replace SubqueryBroadcastExec with CometSubqueryBroadcastExec in DPP expressions
342307
// when the broadcast child has a Comet plan underneath. This enables exchange reuse
343308
// between the DPP subquery and the join's CometBroadcastExchangeExec because both
344309
// will have the same CometBroadcastExchangeExec type and canonical form.
345310
convertSubqueryBroadcasts(converted)
346311
}
347-
// scalastyle:on println
348312
}
349313

350314
/**
351315
* Replace SubqueryBroadcastExec with CometSubqueryBroadcastExec in a node's expressions.
352316
*
353-
* When CometExecRule converts BroadcastExchangeExec to CometBroadcastExchangeExec on the
354-
* join side, the DPP subquery still references the original BroadcastExchangeExec.
355-
* ReuseExchangeAndSubquery (which runs after Comet rules) can't match them because they
356-
* have different types. By replacing SubqueryBroadcastExec with CometSubqueryBroadcastExec
357-
* (which wraps a CometBroadcastExchangeExec), both sides have the same exchange type and
358-
* reuse works.
317+
* When CometExecRule converts BroadcastExchangeExec to CometBroadcastExchangeExec on the join
318+
* side, the DPP subquery still references the original BroadcastExchangeExec.
319+
* ReuseExchangeAndSubquery (which runs after Comet rules) can't match them because they have
320+
* different types. By replacing SubqueryBroadcastExec with CometSubqueryBroadcastExec (which
321+
* wraps a CometBroadcastExchangeExec), both sides have the same exchange type and reuse works.
359322
*
360-
* The BroadcastExchangeExec in the subquery has a CometNativeColumnarToRowExec child
361-
* (inserted by ApplyColumnarRulesAndInsertTransitions because BroadcastExchangeExec expects
362-
* row input). We strip this transition and create CometBroadcastExchangeExec with the
363-
* underlying Comet plan directly.
323+
* The BroadcastExchangeExec in the subquery has a CometNativeColumnarToRowExec child (inserted
324+
* by ApplyColumnarRulesAndInsertTransitions because BroadcastExchangeExec expects row input).
325+
* We strip this transition and create CometBroadcastExchangeExec with the underlying Comet plan
326+
* directly.
364327
*/
365328
private def convertSubqueryBroadcasts(plan: SparkPlan): SparkPlan = {
366-
plan.transformExpressionsUp {
367-
case inSub: InSubqueryExec =>
368-
inSub.plan match {
369-
case sub: SubqueryBroadcastExec =>
370-
sub.child match {
371-
case b: BroadcastExchangeExec =>
372-
// The BroadcastExchangeExec child is CometNativeColumnarToRowExec wrapping
373-
// a Comet plan. Strip the row transition to get the columnar Comet plan.
374-
val cometChild = b.child match {
375-
case c2r: CometNativeColumnarToRowExec => c2r.child
376-
case other => other
377-
}
378-
if (cometChild.isInstanceOf[CometNativeExec]) {
379-
// scalastyle:off println
380-
println(s"[RULE-DEBUG] Converting SubqueryBroadcastExec to " +
381-
s"CometSubqueryBroadcastExec, cometChild=${cometChild.getClass.getSimpleName}")
382-
// scalastyle:on println
383-
val cometBroadcast = CometBroadcastExchangeExec(
384-
b, b.output, b.mode, cometChild)
385-
val cometSub = CometSubqueryBroadcastExec(
386-
sub.name,
387-
getSubqueryBroadcastExecIndices(sub),
388-
sub.buildKeys,
389-
cometBroadcast)
390-
inSub.withNewPlan(cometSub)
391-
} else {
392-
inSub
393-
}
394-
case _ => inSub
395-
}
396-
case _ => inSub
397-
}
329+
plan.transformExpressionsUp { case inSub: InSubqueryExec =>
330+
inSub.plan match {
331+
case sub: SubqueryBroadcastExec =>
332+
sub.child match {
333+
case b: BroadcastExchangeExec =>
334+
// The BroadcastExchangeExec child is CometNativeColumnarToRowExec wrapping
335+
// a Comet plan. Strip the row transition to get the columnar Comet plan.
336+
val cometChild = b.child match {
337+
case c2r: CometNativeColumnarToRowExec => c2r.child
338+
case other => other
339+
}
340+
if (cometChild.isInstanceOf[CometNativeExec]) {
341+
logInfo(
342+
s"Converting SubqueryBroadcastExec to " +
343+
s"CometSubqueryBroadcastExec for DPP exchange reuse")
344+
val cometBroadcast = CometBroadcastExchangeExec(b, b.output, b.mode, cometChild)
345+
val cometSub = CometSubqueryBroadcastExec(
346+
sub.name,
347+
getSubqueryBroadcastExecIndices(sub),
348+
sub.buildKeys,
349+
cometBroadcast)
350+
inSub.withNewPlan(cometSub)
351+
} else {
352+
inSub
353+
}
354+
case _ => inSub
355+
}
356+
case _ => inSub
357+
}
398358
}
399359
}
400360

spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -176,37 +176,20 @@ case class CometNativeScanExec(
176176
* partition's files (lazily, as tasks are scheduled).
177177
*/
178178
@transient private lazy val serializedPartitionData: (Array[Byte], Array[Array[Byte]]) = {
179-
// scalastyle:off println
180-
println(s"[DPP-DEBUG] serializedPartitionData: checking partitionFilters")
181-
partitionFilters.foreach {
182-
case DynamicPruningExpression(e: InSubqueryExec) =>
183-
println(s"[DPP-DEBUG] InSubqueryExec plan=${e.plan.getClass.getSimpleName} " +
184-
s"values empty=${e.values().isEmpty}")
185-
case other =>
186-
println(s"[DPP-DEBUG] filter: ${other.getClass.getSimpleName}")
187-
}
188-
// scalastyle:on println
189179
// Ensure DPP subqueries are resolved before accessing file partitions.
190180
// serializedPartitionData can be triggered from findAllPlanData (via commonData) on a
191181
// different execution path than the standard prepare() -> executeSubqueries() flow
192182
// (e.g., from a BroadcastExchangeExec thread). We must resolve DPP here explicitly.
193183
partitionFilters.foreach {
194184
case DynamicPruningExpression(e: InSubqueryExec) if e.values().isEmpty =>
195-
// scalastyle:off println
196-
println(s"[DPP-DEBUG] calling updateResult on InSubqueryExec " +
197-
s"plan=${e.plan.getClass.getSimpleName} id=${System.identityHashCode(e)}")
198-
// scalastyle:on println
185+
logDebug(s"Resolving DPP subquery: plan=${e.plan.getClass.getSimpleName}")
199186
try {
200187
e.updateResult()
201-
// scalastyle:off println
202-
println(s"[DPP-DEBUG] updateResult succeeded, values empty=${e.values().isEmpty}")
203-
// scalastyle:on println
188+
logDebug(s"DPP subquery resolved successfully")
204189
} catch {
205-
// scalastyle:off println
206190
case ex: Exception =>
207-
println(s"[DPP-DEBUG] updateResult FAILED: ${ex.getMessage}")
191+
logError(s"DPP subquery resolution failed: ${ex.getMessage}")
208192
throw ex
209-
// scalastyle:on println
210193
}
211194
case _ =>
212195
}
@@ -216,10 +199,9 @@ case class CometNativeScanExec(
216199
if (scan != null) {
217200
scan.partitionFilters.foreach {
218201
case DynamicPruningExpression(e: InSubqueryExec) if e.values().isEmpty =>
219-
// scalastyle:off println
220-
println(s"[DPP-DEBUG] also resolving scan's InSubqueryExec " +
221-
s"plan=${e.plan.getClass.getSimpleName} id=${System.identityHashCode(e)}")
222-
// scalastyle:on println
202+
logDebug(
203+
s"Resolving CometScanExec DPP subquery: " +
204+
s"plan=${e.plan.getClass.getSimpleName}")
223205
e.updateResult()
224206
case _ =>
225207
}

spark/src/main/scala/org/apache/spark/sql/comet/CometSubqueryBroadcastExec.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,17 @@ import org.apache.spark.util.io.ChunkedByteBuffer
4040
* Comet replacement for SubqueryBroadcastExec that consumes Arrow broadcast data from a
4141
* CometBroadcastExchangeExec instead of HashedRelation from BroadcastExchangeExec.
4242
*
43-
* This enables broadcast exchange reuse between DPP subqueries and broadcast hash joins
44-
* when CometExecRule converts BroadcastExchangeExec to CometBroadcastExchangeExec.
45-
* Without this, the two exchanges have different types and canonical forms, so Spark's
46-
* ReuseExchangeAndSubquery (which runs after Comet rules) cannot match them.
43+
* This enables broadcast exchange reuse between DPP subqueries and broadcast hash joins when
44+
* CometExecRule converts BroadcastExchangeExec to CometBroadcastExchangeExec. Without this, the
45+
* two exchanges have different types and canonical forms, so Spark's ReuseExchangeAndSubquery
46+
* (which runs after Comet rules) cannot match them.
4747
*
48-
* @param indices the indices of the join keys in the list of keys from the build side
49-
* @param buildKeys the join keys from the build side of the join
50-
* @param child the CometBroadcastExchangeExec (or ReusedExchangeExec after reuse)
48+
* @param indices
49+
* the indices of the join keys in the list of keys from the build side
50+
* @param buildKeys
51+
* the join keys from the build side of the join
52+
* @param child
53+
* the CometBroadcastExchangeExec (or ReusedExchangeExec after reuse)
5154
*/
5255
case class CometSubqueryBroadcastExec(
5356
name: String,
@@ -150,8 +153,7 @@ case class CometSubqueryBroadcastExec(
150153

151154
override def stringArgs: Iterator[Any] = super.stringArgs ++ Iterator(s"[id=#$id]")
152155

153-
override protected def withNewChildInternal(
154-
newChild: SparkPlan): CometSubqueryBroadcastExec =
156+
override protected def withNewChildInternal(newChild: SparkPlan): CometSubqueryBroadcastExec =
155157
copy(child = newChild)
156158
}
157159

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 38 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -205,8 +205,7 @@ class CometExecSuite extends CometTestBase {
205205
val df = spark.sql(
206206
"select * from dpp_fact_bhj join dpp_dim_bhj on fact_date = dim_date where dim_id > 7")
207207
// Exclude ReusedExchangeExec — it appears inside the DPP subquery after exchange reuse
208-
val (_, cometPlan) = checkSparkAnswerAndOperator(
209-
df, classOf[ReusedExchangeExec])
208+
val (_, cometPlan) = checkSparkAnswerAndOperator(df, classOf[ReusedExchangeExec])
210209

211210
val nativeScans = cometPlan.collect { case s: CometNativeScanExec => s }
212211
assert(nativeScans.nonEmpty, "Expected CometNativeScanExec in plan")
@@ -265,17 +264,25 @@ class CometExecSuite extends CometTestBase {
265264
}
266265
}
267266

268-
test("DPP broadcast exchange reuse investigation") {
267+
test("non-AQE DPP with BHJ reuses broadcast exchange") {
269268
withTempDir { dir =>
270269
val path = s"${dir.getAbsolutePath}/data"
271270
withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") {
272-
spark.range(100).selectExpr(
273-
"id % 10 as store_id", "cast(id * 2 as int) as date_id",
274-
"cast(id * 3 as int) as product_id", "cast(id as int) as units_sold")
275-
.write.partitionBy("store_id").parquet(s"$path/fact")
276-
spark.range(10).selectExpr(
277-
"cast(id as int) as store_id", "cast(id as string) as country")
278-
.write.parquet(s"$path/dim")
271+
spark
272+
.range(100)
273+
.selectExpr(
274+
"id % 10 as store_id",
275+
"cast(id * 2 as int) as date_id",
276+
"cast(id * 3 as int) as product_id",
277+
"cast(id as int) as units_sold")
278+
.write
279+
.partitionBy("store_id")
280+
.parquet(s"$path/fact")
281+
spark
282+
.range(10)
283+
.selectExpr("cast(id as int) as store_id", "cast(id as string) as country")
284+
.write
285+
.parquet(s"$path/dim")
279286
}
280287

281288
withSQLConf(
@@ -285,53 +292,35 @@ class CometExecSuite extends CometTestBase {
285292
spark.read.parquet(s"$path/fact").createOrReplaceTempView("fact_reuse")
286293
spark.read.parquet(s"$path/dim").createOrReplaceTempView("dim_reuse")
287294

288-
val df = spark.sql(
289-
"""SELECT f.date_id, f.store_id
295+
val df = spark.sql("""SELECT f.date_id, f.store_id
290296
|FROM fact_reuse f JOIN dim_reuse d
291297
|ON f.store_id = d.store_id
292298
|WHERE d.country = 'DE'""".stripMargin)
293-
df.collect()
294-
val plan = df.queryExecution.executedPlan
295-
// scalastyle:off println
296-
println(s"[REUSE-DEBUG] Plan:\n${plan.treeString}")
297-
298-
// Walk into subquery expressions to see what's inside
299-
plan.foreach { node =>
300-
node.expressions.foreach { expr =>
301-
expr.foreach {
302-
case sub: InSubqueryExec =>
303-
println(s"[REUSE-DEBUG] Found InSubqueryExec in ${node.getClass.getSimpleName}")
304-
println(s"[REUSE-DEBUG] sub.plan class: ${sub.plan.getClass.getSimpleName}")
305-
println(s"[REUSE-DEBUG] sub.plan tree:\n${sub.plan.treeString}")
306-
sub.plan match {
307-
case sb: SubqueryBroadcastExec =>
308-
println(s"[REUSE-DEBUG] SubqueryBroadcast child: " +
309-
s"${sb.child.getClass.getSimpleName}")
310-
println(s"[REUSE-DEBUG] SubqueryBroadcast child tree:\n" +
311-
s"${sb.child.treeString}")
312-
case other =>
313-
println(s"[REUSE-DEBUG] sub.plan is: ${other.getClass.getSimpleName}")
314-
}
315-
case _ =>
316-
}
317-
}
318-
}
299+
val (_, cometPlan) = checkSparkAnswer(df)
319300

320-
val reused = collectWithSubqueries(plan) {
321-
case e: ReusedExchangeExec => e
301+
// DPP subquery should use CometSubqueryBroadcastExec (not SubqueryBroadcastExec)
302+
val cometSubqueries = collectWithSubqueries(cometPlan) {
303+
case s: CometSubqueryBroadcastExec => s
322304
}
323-
println(s"[REUSE-DEBUG] ReusedExchangeExec count: ${reused.size}")
305+
assert(
306+
cometSubqueries.nonEmpty,
307+
"Expected CometSubqueryBroadcastExec in plan for exchange reuse")
324308

325-
val broadcasts = collectWithSubqueries(plan) {
326-
case e: BroadcastExchangeExec => ("BroadcastExchangeExec", e: SparkPlan)
327-
case e: CometBroadcastExchangeExec => ("CometBroadcastExchangeExec", e: SparkPlan)
309+
// Broadcast exchange should be reused — only one CometBroadcastExchangeExec,
310+
// the other replaced by ReusedExchangeExec
311+
val reused = collectWithSubqueries(cometPlan) { case e: ReusedExchangeExec =>
312+
e
328313
}
329-
println(s"[REUSE-DEBUG] Broadcast exchange count: ${broadcasts.size}")
330-
broadcasts.foreach { case (typ, e) =>
331-
println(s"[REUSE-DEBUG] $typ hash=${e.canonicalized.hashCode()}")
332-
println(s"[REUSE-DEBUG] $typ child: ${e.children.map(_.getClass.getSimpleName)}")
314+
assert(
315+
reused.nonEmpty,
316+
s"Expected ReusedExchangeExec for broadcast exchange reuse:\n${cometPlan.treeString}")
317+
318+
val broadcasts = collectWithSubqueries(cometPlan) { case e: CometBroadcastExchangeExec =>
319+
e
333320
}
334-
// scalastyle:on println
321+
assert(
322+
broadcasts.size == 1,
323+
s"Expected exactly 1 CometBroadcastExchangeExec (other reused):\n${cometPlan.treeString}")
335324
}
336325
}
337326
}

0 commit comments

Comments
 (0)