@@ -386,7 +386,9 @@ class CometParquetWriterSuite extends CometTestBase {
386386 CometConf .COMET_EXEC_ENABLED .key -> " true" ,
387387 CometConf .getOperatorAllowIncompatConfigKey(classOf [DataWritingCommandExec ]) -> " true" ) {
388388 withTempPath { path =>
389- spark.range(100 ).repartition(10 ).where(" id = 50" ).write.parquet(path.toString)
389+ checkCometNativeWriter {
390+ spark.range(100 ).repartition(10 ).where(" id = 50" ).write.parquet(path.toString)
391+ }
390392 val partFiles = path
391393 .listFiles()
392394 .filter(f => f.isFile && ! f.getName.startsWith(" ." ) && ! f.getName.startsWith(" _" ))
@@ -403,7 +405,9 @@ class CometParquetWriterSuite extends CometTestBase {
403405 CometConf .getOperatorAllowIncompatConfigKey(classOf [DataWritingCommandExec ]) -> " true" ) {
404406 withTable(" t1" , " t2" ) {
405407 sql(" CREATE TABLE t1(i CHAR(5), c VARCHAR(4)) USING parquet" )
406- sql(" CREATE TABLE t2 USING parquet AS SELECT * FROM t1" )
408+ checkCometNativeWriter {
409+ sql(" CREATE TABLE t2 USING parquet AS SELECT * FROM t1" )
410+ }
407411 checkAnswer(
408412 sql(" desc t2" ).selectExpr(" data_type" ).where(" data_type like '%char%'" ),
409413 Seq (Row (" char(5)" ), Row (" varchar(4)" )))
@@ -420,7 +424,9 @@ class CometParquetWriterSuite extends CometTestBase {
420424 withTable(" t1" , " t2" ) {
421425 sql(" CREATE TABLE t1(col CHAR(5)) USING parquet" )
422426 withSQLConf(SQLConf .CHAR_AS_VARCHAR .key -> " true" ) {
423- sql(" CREATE TABLE t2 USING parquet AS SELECT * FROM t1" )
427+ checkCometNativeWriter {
428+ sql(" CREATE TABLE t2 USING parquet AS SELECT * FROM t1" )
429+ }
424430 checkAnswer(
425431 sql(" desc t2" ).selectExpr(" data_type" ).where(" data_type like '%char%'" ),
426432 Seq (Row (" varchar(5)" )))
@@ -439,11 +445,17 @@ class CometParquetWriterSuite extends CometTestBase {
439445 val path = dir.toURI.getPath
440446 withTable(" tab1" , " tab2" ) {
441447 sql(s """ create table tab1 (a int) using parquet location ' $path' """ )
442- sql(" insert into tab1 values(1)" )
448+ checkCometNativeWriter {
449+ sql(" insert into tab1 values(1)" )
450+ }
443451 checkAnswer(sql(" select * from tab1" ), Seq (Row (1 )))
444452 sql(" create table tab2 (a int) using parquet" )
445- sql(" insert into tab2 values(2)" )
446- sql(s """ insert overwrite local directory ' $path' using parquet select * from tab2 """ )
453+ checkCometNativeWriter {
454+ sql(" insert into tab2 values(2)" )
455+ }
456+ checkCometNativeWriter {
457+ sql(s """ insert overwrite local directory ' $path' using parquet select * from tab2 """ )
458+ }
447459 sql(" refresh table tab1" )
448460 checkAnswer(sql(" select * from tab1" ), Seq (Row (2 )))
449461 }
@@ -459,7 +471,9 @@ class CometParquetWriterSuite extends CometTestBase {
459471 CometConf .getOperatorAllowIncompatConfigKey(classOf [DataWritingCommandExec ]) -> " true" ) {
460472 withTable(" t" ) {
461473 sql(" create table t(i boolean, s bigint) using parquet" )
462- sql(" insert into t(i) values(true)" )
474+ checkCometNativeWriter {
475+ sql(" insert into t(i) values(true)" )
476+ }
463477 checkAnswer(spark.table(" t" ), Row (true , null ))
464478 }
465479 }
@@ -474,7 +488,9 @@ class CometParquetWriterSuite extends CometTestBase {
474488 withTable(" t" ) {
475489 sql(" create table t(i boolean) using parquet" )
476490 sql(" alter table t add column s string default concat('abc', 'def')" )
477- sql(" insert into t values(true, default)" )
491+ checkCometNativeWriter {
492+ sql(" insert into t values(true, default)" )
493+ }
478494 checkAnswer(spark.table(" t" ), Row (true , " abcdef" ))
479495 }
480496 }
@@ -488,9 +504,13 @@ class CometParquetWriterSuite extends CometTestBase {
488504 CometConf .getOperatorAllowIncompatConfigKey(classOf [DataWritingCommandExec ]) -> " true" ) {
489505 withTable(" t1" , " t2" ) {
490506 sql(" create table t1(i boolean, s bigint default 42) using parquet" )
491- sql(" insert into t1 values (true, 41), (false, default)" )
507+ checkCometNativeWriter {
508+ sql(" insert into t1 values (true, 41), (false, default)" )
509+ }
492510 sql(" create table t2(i boolean, s bigint) using parquet" )
493- sql(" insert into t2 select * from t1 order by s" )
511+ checkCometNativeWriter {
512+ sql(" insert into t2 select * from t1 order by s" )
513+ }
494514 checkAnswer(spark.table(" t2" ), Seq (Row (true , 41 ), Row (false , 42 )))
495515 }
496516 }
@@ -505,10 +525,14 @@ class CometParquetWriterSuite extends CometTestBase {
505525 withTable(" tbl" , " tbl2" ) {
506526 withView(" view1" ) {
507527 val df = spark.range(10 ).toDF(" id" )
508- df.write.format(" parquet" ).saveAsTable(" tbl" )
528+ checkCometNativeWriter {
529+ df.write.format(" parquet" ).saveAsTable(" tbl" )
530+ }
509531 spark.sql(" CREATE VIEW view1 AS SELECT id FROM tbl" )
510532 spark.sql(" CREATE TABLE tbl2(ID long) USING parquet" )
511- spark.sql(" INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1" )
533+ checkCometNativeWriter {
534+ spark.sql(" INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1" )
535+ }
512536 checkAnswer(spark.table(" tbl2" ), (0 until 10 ).map(Row (_)))
513537 }
514538 }
@@ -522,11 +546,13 @@ class CometParquetWriterSuite extends CometTestBase {
522546 CometConf .COMET_EXEC_ENABLED .key -> " true" ,
523547 CometConf .getOperatorAllowIncompatConfigKey(classOf [DataWritingCommandExec ]) -> " true" ) {
524548 withTempPath { dir =>
525- spark
526- .range(1 )
527- .selectExpr(" current_timestamp() as ts" )
528- .write
529- .parquet(dir.toString + " /spark" )
549+ checkCometNativeWriter {
550+ spark
551+ .range(1 )
552+ .selectExpr(" current_timestamp() as ts" )
553+ .write
554+ .parquet(dir.toString + " /spark" )
555+ }
530556 val result = spark.read.parquet(dir.toString + " /spark" ).collect()
531557 assert(result.length == 1 )
532558 }
@@ -542,8 +568,12 @@ class CometParquetWriterSuite extends CometTestBase {
542568 withTable(" tab1" , " tab2" ) {
543569 sql(""" CREATE TABLE tab1 (s struct<a: string, b: string>) USING parquet""" )
544570 sql(""" CREATE TABLE tab2 (s struct<c: string, d: string>) USING parquet""" )
545- sql(" INSERT INTO tab1 VALUES (named_struct('a', 'x', 'b', 'y'))" )
546- sql(" INSERT INTO tab2 SELECT * FROM tab1" )
571+ checkCometNativeWriter {
572+ sql(" INSERT INTO tab1 VALUES (named_struct('a', 'x', 'b', 'y'))" )
573+ }
574+ checkCometNativeWriter {
575+ sql(" INSERT INTO tab2 SELECT * FROM tab1" )
576+ }
547577 checkAnswer(spark.table(" tab2" ), Row (Row (" x" , " y" )))
548578 }
549579 }
@@ -556,7 +586,9 @@ class CometParquetWriterSuite extends CometTestBase {
556586 CometConf .COMET_EXEC_ENABLED .key -> " true" ,
557587 CometConf .getOperatorAllowIncompatConfigKey(classOf [DataWritingCommandExec ]) -> " true" ) {
558588 withTempPath { dir =>
559- spark.range(1 ).repartition(1 ).write.parquet(dir.getAbsolutePath)
589+ checkCometNativeWriter {
590+ spark.range(1 ).repartition(1 ).write.parquet(dir.getAbsolutePath)
591+ }
560592 val files = dir.listFiles().filter(_.getName.endsWith(" .parquet" ))
561593 assert(files.nonEmpty, " Expected parquet files to be written" )
562594 }
@@ -571,7 +603,9 @@ class CometParquetWriterSuite extends CometTestBase {
571603 CometConf .getOperatorAllowIncompatConfigKey(classOf [DataWritingCommandExec ]) -> " true" ) {
572604 withTempDir { dir =>
573605 val path = dir.getCanonicalPath
574- spark.range(10 ).repartition(10 ).write.mode(" overwrite" ).parquet(path)
606+ checkCometNativeWriter {
607+ spark.range(10 ).repartition(10 ).write.mode(" overwrite" ).parquet(path)
608+ }
575609 val files = new File (path).listFiles().filter(_.getName.startsWith(" part-" ))
576610 assert(files.length > 0 , " Expected part files to be written" )
577611 }
@@ -587,7 +621,9 @@ class CometParquetWriterSuite extends CometTestBase {
587621 CometConf .COMET_EXEC_ENABLED .key -> " true" ,
588622 CometConf .getOperatorAllowIncompatConfigKey(classOf [DataWritingCommandExec ]) -> " true" ) {
589623 withTable(" t" ) {
590- sql(" CREATE TABLE t USING parquet AS SELECT 1 AS c UNION ALL SELECT 2" )
624+ checkCometNativeWriter {
625+ sql(" CREATE TABLE t USING parquet AS SELECT 1 AS c UNION ALL SELECT 2" )
626+ }
591627 checkAnswer(spark.table(" t" ), Seq (Row (1 ), Row (2 )))
592628 }
593629 }
@@ -603,7 +639,9 @@ class CometParquetWriterSuite extends CometTestBase {
603639 withTable(" t1" , " t2" ) {
604640 sql(" CREATE TABLE t1(a INT) USING parquet" )
605641 sql(" CREATE TABLE t2(a INT) USING parquet" )
606- sql(" FROM (SELECT 1 AS a) src INSERT INTO t1 SELECT a INSERT INTO t2 SELECT a" )
642+ checkCometNativeWriter {
643+ sql(" FROM (SELECT 1 AS a) src INSERT INTO t1 SELECT a INSERT INTO t2 SELECT a" )
644+ }
607645 checkAnswer(spark.table(" t1" ), Row (1 ))
608646 checkAnswer(spark.table(" t2" ), Row (1 ))
609647 }
@@ -699,6 +737,11 @@ class CometParquetWriterSuite extends CometTestBase {
699737 s " Expected exactly one CometNativeWriteExec in the plan, but found $nativeWriteCount: \n ${plan.treeString}" )
700738 }
701739
740+ private def checkCometNativeWriter (op : => Unit ): Unit = {
741+ val plan = captureWritePlan(_ => op, " " )
742+ assertHasCometNativeWriteExec(plan)
743+ }
744+
702745 private def writeWithCometNativeWriteExec (
703746 inputPath : String ,
704747 outputPath : String ,
0 commit comments