Skip to content
  •  
  •  
  •  
32 changes: 32 additions & 0 deletions spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,32 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {

private lazy val showTransformations = CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get()

/**
* Revert any `CometShuffleExchangeExec` with `CometColumnarShuffle` that is sandwiched between
* two non-Comet operators back to the original Spark `ShuffleExchangeExec`. Columnar shuffle
* converts row-based input to Arrow batches for the shuffle read side; if neither the parent
* nor the child is a Comet plan that can consume columnar output, that conversion is pure
* overhead (row->arrow->shuffle->arrow->row vs. row->shuffle->row).
*/
private def revertRedundantColumnarShuffle(plan: SparkPlan): SparkPlan = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the opttimzation @andygrove
I guess this is the first time where CometShuffleExchangeExec is reverted back to a plain ShuffleExchangeExec.

The two shuffle paths use different memory systems:

  • Comet columnar shuffle uses Comet's own memory pool. (off-heap)
  • Spark vanilla shuffle uses the JVM execution memory pool , with spills managed by ExternalSorter.

Users who have tuned their clusters for Comet (smaller JVM heap) could see unexpected spills after this chang, shifting shuffle memory pressure back to theJVM.
Additionally, Comet's Arrow IPC columnar format typically compresses better than Spark's row-based UnsafeRowSerializer path, so shuffle I/O mayalso increase.
It would be good to document or log when a shuffle is reverted so users can correlate any unexpected behavior with this optimization.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback @karuppayya. I'm assuming the cost of doing two transitions (r2c then c2r) would outweigh the benefits of using Comet shuffle? I agree that it would be worth adding documentation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this optimization will improve performance and compute efficiency.

My main concern is determining the best recommendation for users to tune memory, particularly since they cannot explicitly disable it.

Also can it be a seperate rule in itself and have it only in org.apache.comet.CometSparkSessionExtensions.CometExecColumnar#postColumnarTransitions?

def isRedundantShuffle(child: SparkPlan): Boolean = child match {
case s: CometShuffleExchangeExec =>
s.shuffleType == CometColumnarShuffle && !s.child.isInstanceOf[CometPlan]
case _ => false
}

plan.transform {
case op if !op.isInstanceOf[CometPlan] && op.children.exists(isRedundantShuffle) =>
val newChildren = op.children.map {
case s: CometShuffleExchangeExec
if s.shuffleType == CometColumnarShuffle && !s.child.isInstanceOf[CometPlan] =>
s.originalPlan.withNewChildren(Seq(s.child)).asInstanceOf[SparkPlan]
case other => other
}
op.withNewChildren(newChildren)
}
}

private def applyCometShuffle(plan: SparkPlan): SparkPlan = {
plan.transformUp { case s: ShuffleExchangeExec =>
CometShuffleExchangeExec.shuffleSupported(s) match {
Expand Down Expand Up @@ -409,6 +435,12 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
case CometScanWrapper(_, s) => s
}

// Revert CometColumnarShuffle to Spark's ShuffleExchangeExec when both the parent and
// the child are non-Comet (JVM) operators. In that case the Comet shuffle only adds
// row->arrow->arrow->row conversion overhead with no Comet operator on either side to
// benefit from columnar output. See https://github.com/apache/datafusion-comet/issues/4004.
newPlan = revertRedundantColumnarShuffle(newPlan)

// Set up logical links
newPlan = newPlan.transform {
case op: CometExec =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,43 @@ TakeOrderedAndProject
: : +- BroadcastHashJoin
: : :- Filter
: : : +- HashAggregate
: : : +- CometNativeColumnarToRow
: : : +- CometColumnarExchange
: : : +- HashAggregate
: : : +- Project
: : : +- BroadcastHashJoin
: : : :- Filter
: : : : +- ColumnarToRow
: : : : +- Scan parquet spark_catalog.default.store_returns [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
: : : : +- SubqueryBroadcast
: : : : +- BroadcastExchange
: : : : +- CometNativeColumnarToRow
: : : : +- CometProject
: : : : +- CometFilter
: : : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : +- BroadcastExchange
: : : +- CometNativeColumnarToRow
: : : +- CometProject
: : : +- CometFilter
: : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : +- Exchange
: : : +- HashAggregate
: : : +- Project
: : : +- BroadcastHashJoin
: : : :- Filter
: : : : +- ColumnarToRow
: : : : +- Scan parquet spark_catalog.default.store_returns [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
: : : : +- SubqueryBroadcast
: : : : +- BroadcastExchange
: : : : +- CometNativeColumnarToRow
: : : : +- CometProject
: : : : +- CometFilter
: : : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : +- BroadcastExchange
: : : +- CometNativeColumnarToRow
: : : +- CometProject
: : : +- CometFilter
: : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : +- BroadcastExchange
: : +- Filter
: : +- HashAggregate
: : +- CometNativeColumnarToRow
: : +- CometColumnarExchange
: : +- Exchange
: : +- HashAggregate
: : +- HashAggregate
: : +- HashAggregate
: : +- CometNativeColumnarToRow
: : +- CometColumnarExchange
: : +- HashAggregate
: : +- Project
: : +- BroadcastHashJoin
: : :- Filter
: : : +- ColumnarToRow
: : : +- Scan parquet spark_catalog.default.store_returns [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
: : : +- ReusedSubquery
: : +- BroadcastExchange
: : +- CometNativeColumnarToRow
: : +- CometProject
: : +- CometFilter
: : +- CometNativeScan parquet spark_catalog.default.date_dim
: : +- Exchange
: : +- HashAggregate
: : +- Project
: : +- BroadcastHashJoin
: : :- Filter
: : : +- ColumnarToRow
: : : +- Scan parquet spark_catalog.default.store_returns [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
: : : +- ReusedSubquery
: : +- BroadcastExchange
: : +- CometNativeColumnarToRow
: : +- CometProject
: : +- CometFilter
: : +- CometNativeScan parquet spark_catalog.default.date_dim
: +- BroadcastExchange
: +- CometNativeColumnarToRow
: +- CometProject
Expand All @@ -58,4 +55,4 @@ TakeOrderedAndProject
+- CometFilter
+- CometNativeScan parquet spark_catalog.default.customer

Comet accelerated 18 out of 49 eligible operators (36%). Final plan contains 10 transitions between Spark and Comet.
Comet accelerated 15 out of 49 eligible operators (30%). Final plan contains 7 transitions between Spark and Comet.
Original file line number Diff line number Diff line change
@@ -1,67 +1,66 @@
TakeOrderedAndProject
+- HashAggregate
+- CometNativeColumnarToRow
+- CometColumnarExchange
+- HashAggregate
+- Project
+- BroadcastHashJoin
:- Project
: +- BroadcastHashJoin
: :- Project
: : +- Filter
: : +- BroadcastHashJoin
: : :- BroadcastHashJoin
: : : :- BroadcastHashJoin
: : : : :- CometNativeColumnarToRow
: : : : : +- CometFilter
: : : : : +- CometNativeScan parquet spark_catalog.default.customer
: : : : +- BroadcastExchange
: : : : +- Project
: : : : +- BroadcastHashJoin
: : : : :- ColumnarToRow
: : : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
: : : : : +- SubqueryBroadcast
: : : : : +- BroadcastExchange
: : : : : +- CometNativeColumnarToRow
: : : : : +- CometProject
: : : : : +- CometFilter
: : : : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : : +- BroadcastExchange
: : : : +- CometNativeColumnarToRow
: : : : +- CometProject
: : : : +- CometFilter
: : : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : +- BroadcastExchange
: : : +- Project
: : : +- BroadcastHashJoin
: : : :- ColumnarToRow
: : : : +- Scan parquet spark_catalog.default.web_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
: : : : +- ReusedSubquery
: : : +- BroadcastExchange
: : : +- CometNativeColumnarToRow
: : : +- CometProject
: : : +- CometFilter
: : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : +- BroadcastExchange
: : +- Project
: : +- BroadcastHashJoin
: : :- ColumnarToRow
: : : +- Scan parquet spark_catalog.default.catalog_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
: : : +- ReusedSubquery
: : +- BroadcastExchange
: : +- CometNativeColumnarToRow
: : +- CometProject
: : +- CometFilter
: : +- CometNativeScan parquet spark_catalog.default.date_dim
: +- BroadcastExchange
: +- CometNativeColumnarToRow
: +- CometProject
: +- CometFilter
: +- CometNativeScan parquet spark_catalog.default.customer_address
+- BroadcastExchange
+- CometNativeColumnarToRow
+- CometProject
+- CometFilter
+- CometNativeScan parquet spark_catalog.default.customer_demographics
+- Exchange
+- HashAggregate
+- Project
+- BroadcastHashJoin
:- Project
: +- BroadcastHashJoin
: :- Project
: : +- Filter
: : +- BroadcastHashJoin
: : :- BroadcastHashJoin
: : : :- BroadcastHashJoin
: : : : :- CometNativeColumnarToRow
: : : : : +- CometFilter
: : : : : +- CometNativeScan parquet spark_catalog.default.customer
: : : : +- BroadcastExchange
: : : : +- Project
: : : : +- BroadcastHashJoin
: : : : :- ColumnarToRow
: : : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
: : : : : +- SubqueryBroadcast
: : : : : +- BroadcastExchange
: : : : : +- CometNativeColumnarToRow
: : : : : +- CometProject
: : : : : +- CometFilter
: : : : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : : +- BroadcastExchange
: : : : +- CometNativeColumnarToRow
: : : : +- CometProject
: : : : +- CometFilter
: : : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : +- BroadcastExchange
: : : +- Project
: : : +- BroadcastHashJoin
: : : :- ColumnarToRow
: : : : +- Scan parquet spark_catalog.default.web_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
: : : : +- ReusedSubquery
: : : +- BroadcastExchange
: : : +- CometNativeColumnarToRow
: : : +- CometProject
: : : +- CometFilter
: : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : +- BroadcastExchange
: : +- Project
: : +- BroadcastHashJoin
: : :- ColumnarToRow
: : : +- Scan parquet spark_catalog.default.catalog_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
: : : +- ReusedSubquery
: : +- BroadcastExchange
: : +- CometNativeColumnarToRow
: : +- CometProject
: : +- CometFilter
: : +- CometNativeScan parquet spark_catalog.default.date_dim
: +- BroadcastExchange
: +- CometNativeColumnarToRow
: +- CometProject
: +- CometFilter
: +- CometNativeScan parquet spark_catalog.default.customer_address
+- BroadcastExchange
+- CometNativeColumnarToRow
+- CometProject
+- CometFilter
+- CometNativeScan parquet spark_catalog.default.customer_demographics

Comet accelerated 21 out of 54 eligible operators (38%). Final plan contains 11 transitions between Spark and Comet.
Comet accelerated 20 out of 54 eligible operators (37%). Final plan contains 10 transitions between Spark and Comet.
Loading
Loading