Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions dev/diffs/4.1.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -2732,27 +2732,29 @@ index 3e7d26f74bd..04cfdf075ab 100644
assert(collect(initialExecutedPlan) {
case i: InMemoryTableScanLike => i
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala
index 47b935a2880..15010242a3b 100644
index 47b935a2880..3e9b87f5c32 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala
@@ -22,6 +22,7 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.IgnoreComet
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, UnsafeProjection}
import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer}
@@ -212,7 +213,8 @@ class CachedBatchSerializerNoUnwrapSuite extends QueryTest
classOf[DefaultCachedBatchSerializerNoUnwrap].getName)
@@ -230,9 +230,16 @@ class CachedBatchSerializerNoUnwrapSuite extends QueryTest
assert(cachedPlans.length == 2)
cachedPlans.foreach {
cachedPlan =>
- assert(cachedPlan.isInstanceOf[WholeStageCodegenExec])
- assert(cachedPlan.asInstanceOf[WholeStageCodegenExec]
- .child.isInstanceOf[ColumnarToRowExec])
+ // Comet replaces ColumnarToRowExec with its own columnar-to-row operator
+ // (CometNativeColumnarToRow / CometColumnarToRow). Accept either the
+ // Spark shape or the equivalent Comet shape, since the important property
+ // under this serializer is that the row conversion is not unwrapped.
+ val isSparkShape =
+ cachedPlan.isInstanceOf[WholeStageCodegenExec] &&
+ cachedPlan.asInstanceOf[WholeStageCodegenExec]
+ .child.isInstanceOf[ColumnarToRowExec]
+ val isCometShape = cachedPlan.getClass.getName.startsWith("org.apache.spark.sql.comet.")
+ assert(isSparkShape || isCometShape, s"unexpected cached plan:\n$cachedPlan")
}
}
}

- test("Do not unwrap ColumnarToRowExec") {
+ test("Do not unwrap ColumnarToRowExec",
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4137")) {
withTempPath { workDir =>
val workDirPath = workDir.getAbsolutePath
val input = Seq(100, 200).toDF("count")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
index 269990d7d14..140ee4112b1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
Expand Down
Loading