Skip to content

Commit 4cab60d

Browse files
authored
fix: Expose bucketing information from CometNativeScanExec (#3319) (#3437)
1 parent e9dafd0 commit 4cab60d

3 files changed

Lines changed: 55 additions & 106 deletions

File tree

dev/diffs/3.4.3.diff

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1150,7 +1150,7 @@ index 02990a7a40d..bddf5e1ccc2 100644
11501150
}
11511151
}
11521152
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
1153-
index cfc8b2cc845..c6fcfd7bd08 100644
1153+
index cfc8b2cc845..c4be7eb3731 100644
11541154
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
11551155
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
11561156
@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
@@ -2385,7 +2385,7 @@ index d083cac48ff..3c11bcde807 100644
23852385
import testImplicits._
23862386

23872387
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
2388-
index 266bb343526..e58a2f49eb9 100644
2388+
index 266bb343526..f8ad838e2b2 100644
23892389
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
23902390
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
23912391
@@ -19,15 +19,18 @@ package org.apache.spark.sql.sources
@@ -2409,7 +2409,7 @@ index 266bb343526..e58a2f49eb9 100644
24092409
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
24102410
import org.apache.spark.sql.functions._
24112411
import org.apache.spark.sql.internal.SQLConf
2412-
@@ -101,12 +104,20 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2412+
@@ -101,12 +104,22 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
24132413
}
24142414
}
24152415

@@ -2419,6 +2419,7 @@ index 266bb343526..e58a2f49eb9 100644
24192419
+ val fileScan = collect(plan) {
24202420
+ case f: FileSourceScanExec => f
24212421
+ case f: CometScanExec => f
2422+
+ case f: CometNativeScanExec => f
24222423
+ }
24232424
assert(fileScan.nonEmpty, plan)
24242425
fileScan.head
@@ -2427,12 +2428,13 @@ index 266bb343526..e58a2f49eb9 100644
24272428
+ private def getBucketScan(plan: SparkPlan): Boolean = getFileScan(plan) match {
24282429
+ case fs: FileSourceScanExec => fs.bucketedScan
24292430
+ case bs: CometScanExec => bs.bucketedScan
2431+
+ case ns: CometNativeScanExec => ns.bucketedScan
24302432
+ }
24312433
+
24322434
// To verify if the bucket pruning works, this function checks two conditions:
24332435
// 1) Check if the pruned buckets (before filtering) are empty.
24342436
// 2) Verify the final result is the same as the expected one
2435-
@@ -155,7 +166,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2437+
@@ -155,7 +168,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
24362438
val planWithoutBucketedScan = bucketedDataFrame.filter(filterCondition)
24372439
.queryExecution.executedPlan
24382440
val fileScan = getFileScan(planWithoutBucketedScan)
@@ -2442,7 +2444,7 @@ index 266bb343526..e58a2f49eb9 100644
24422444

24432445
val bucketColumnType = bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
24442446
val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
2445-
@@ -451,28 +463,54 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2447+
@@ -451,28 +465,54 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
24462448
val joinOperator = if (joined.sqlContext.conf.adaptiveExecutionEnabled) {
24472449
val executedPlan =
24482450
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
@@ -2505,7 +2507,7 @@ index 266bb343526..e58a2f49eb9 100644
25052507
s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}")
25062508

25072509
// check the output partitioning
2508-
@@ -835,11 +873,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2510+
@@ -835,11 +875,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
25092511
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")
25102512

25112513
val scanDF = spark.table("bucketed_table").select("j")
@@ -2519,7 +2521,7 @@ index 266bb343526..e58a2f49eb9 100644
25192521
checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
25202522
}
25212523
}
2522-
@@ -894,7 +932,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2524+
@@ -894,7 +934,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
25232525
}
25242526

25252527
test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") {
@@ -2530,7 +2532,7 @@ index 266bb343526..e58a2f49eb9 100644
25302532
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
25312533
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") {
25322534
val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
2533-
@@ -913,7 +954,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2535+
@@ -913,7 +956,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
25342536
}
25352537

25362538
test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than bucket number") {
@@ -2541,7 +2543,7 @@ index 266bb343526..e58a2f49eb9 100644
25412543
SQLConf.SHUFFLE_PARTITIONS.key -> "9",
25422544
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") {
25432545

2544-
@@ -943,7 +987,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2546+
@@ -943,7 +989,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
25452547
}
25462548

25472549
test("bucket coalescing eliminates shuffle") {
@@ -2552,7 +2554,7 @@ index 266bb343526..e58a2f49eb9 100644
25522554
SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
25532555
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
25542556
// The side with bucketedTableTestSpec1 will be coalesced to have 4 output partitions.
2555-
@@ -1026,15 +1073,23 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2557+
@@ -1026,15 +1075,26 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
25562558
expectedNumShuffles: Int,
25572559
expectedCoalescedNumBuckets: Option[Int]): Unit = {
25582560
val plan = sql(query).queryExecution.executedPlan
@@ -2565,6 +2567,7 @@ index 266bb343526..e58a2f49eb9 100644
25652567
val scans = plan.collect {
25662568
case f: FileSourceScanExec if f.optionalNumCoalescedBuckets.isDefined => f
25672569
+ case b: CometScanExec if b.optionalNumCoalescedBuckets.isDefined => b
2570+
+ case b: CometNativeScanExec if b.optionalNumCoalescedBuckets.isDefined => b
25682571
}
25692572
if (expectedCoalescedNumBuckets.isDefined) {
25702573
assert(scans.length == 1)
@@ -2574,6 +2577,8 @@ index 266bb343526..e58a2f49eb9 100644
25742577
+ assert(f.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets)
25752578
+ case b: CometScanExec =>
25762579
+ assert(b.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets)
2580+
+ case b: CometNativeScanExec =>
2581+
+ assert(b.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets)
25772582
+ }
25782583
} else {
25792584
assert(scans.isEmpty)
@@ -2604,25 +2609,26 @@ index b5f6d2f9f68..277784a92af 100644
26042609

26052610
protected override lazy val sql = spark.sql _
26062611
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
2607-
index 1f55742cd67..42377f7cf26 100644
2612+
index 1f55742cd67..f20129d9dd8 100644
26082613
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
26092614
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
26102615
@@ -20,6 +20,7 @@ package org.apache.spark.sql.sources
26112616
import org.apache.spark.sql.QueryTest
26122617
import org.apache.spark.sql.catalyst.expressions.AttributeReference
26132618
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
2614-
+import org.apache.spark.sql.comet.CometScanExec
2619+
+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
26152620
import org.apache.spark.sql.execution.FileSourceScanExec
26162621
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
26172622
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
2618-
@@ -71,7 +72,10 @@ abstract class DisableUnnecessaryBucketedScanSuite
2623+
@@ -71,7 +72,11 @@ abstract class DisableUnnecessaryBucketedScanSuite
26192624

26202625
def checkNumBucketedScan(query: String, expectedNumBucketedScan: Int): Unit = {
26212626
val plan = sql(query).queryExecution.executedPlan
26222627
- val bucketedScan = collect(plan) { case s: FileSourceScanExec if s.bucketedScan => s }
26232628
+ val bucketedScan = collect(plan) {
26242629
+ case s: FileSourceScanExec if s.bucketedScan => s
26252630
+ case s: CometScanExec if s.bucketedScan => s
2631+
+ case s: CometNativeScanExec if s.bucketedScan => s
26262632
+ }
26272633
assert(bucketedScan.length == expectedNumBucketedScan)
26282634
}

0 commit comments

Comments
 (0)