Skip to content

Commit 3481d43

Browse files
committed
enable_spark_tests_comet_native_writer_fix_spark_rebase_main
1 parent 5064d58 commit 3481d43

1 file changed

Lines changed: 233 additions & 3 deletions

File tree

spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala

Lines changed: 233 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,15 @@
2020
package org.apache.comet.parquet
2121

2222
import java.io.File
23-
2423
import scala.util.Random
25-
2624
import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
2725
import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometNativeWriteExec, CometScanExec}
2826
import org.apache.spark.sql.execution.{FileSourceScanExec, QueryExecution, SparkPlan}
2927
import org.apache.spark.sql.execution.command.DataWritingCommandExec
3028
import org.apache.spark.sql.internal.SQLConf
3129
import org.apache.spark.sql.types.StructType
32-
3330
import org.apache.comet.CometConf
31+
import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus
3432
import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator, SchemaGenOptions}
3533

3634
class CometParquetWriterSuite extends CometTestBase {
@@ -377,6 +375,238 @@ class CometParquetWriterSuite extends CometTestBase {
377375
}
378376
}
379377

378+
// NATIVE COMET WRITER TESTS WHICH FAIL IN SPARK
379+
// https://github.com/apache/datafusion-comet/issues/3417
380+
ignore("Spark compat: empty file should be skipped while write to file") {
381+
withSQLConf(
382+
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
383+
CometConf.COMET_EXEC_ENABLED.key -> "true",
384+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
385+
withTempPath { path =>
386+
spark.range(100).repartition(10).where("id = 50").write.parquet(path.toString)
387+
val partFiles = path
388+
.listFiles()
389+
.filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_"))
390+
assert(partFiles.length === 2)
391+
}
392+
}
393+
}
394+
395+
// https://github.com/apache/datafusion-comet/issues/3418
396+
ignore("Spark compat: SPARK-33901 ctas should not change table's schema") {
397+
withSQLConf(
398+
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
399+
CometConf.COMET_EXEC_ENABLED.key -> "true",
400+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
401+
withTable("t1", "t2") {
402+
sql("CREATE TABLE t1(i CHAR(5), c VARCHAR(4)) USING parquet")
403+
sql("CREATE TABLE t2 USING parquet AS SELECT * FROM t1")
404+
checkAnswer(
405+
sql("desc t2").selectExpr("data_type").where("data_type like '%char%'"),
406+
Seq(Row("char(5)"), Row("varchar(4)")))
407+
}
408+
}
409+
}
410+
411+
// https://github.com/apache/datafusion-comet/issues/3419
412+
ignore("Spark compat: SPARK-37160 CREATE TABLE AS SELECT with CHAR_AS_VARCHAR") {
413+
withSQLConf(
414+
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
415+
CometConf.COMET_EXEC_ENABLED.key -> "true",
416+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
417+
withTable("t1", "t2") {
418+
sql("CREATE TABLE t1(col CHAR(5)) USING parquet")
419+
withSQLConf(SQLConf.CHAR_AS_VARCHAR.key -> "true") {
420+
sql("CREATE TABLE t2 USING parquet AS SELECT * FROM t1")
421+
checkAnswer(
422+
sql("desc t2").selectExpr("data_type").where("data_type like '%char%'"),
423+
Seq(Row("varchar(5)")))
424+
}
425+
}
426+
}
427+
}
428+
429+
// https://github.com/apache/datafusion-comet/issues/3420
430+
ignore("Spark compat: SPARK-29174 Support LOCAL in INSERT OVERWRITE DIRECTORY") {
431+
withSQLConf(
432+
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
433+
CometConf.COMET_EXEC_ENABLED.key -> "true",
434+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
435+
withTempPath { dir =>
436+
val path = dir.toURI.getPath
437+
withTable("tab1", "tab2") {
438+
sql(s"""create table tab1 (a int) using parquet location '$path'""")
439+
sql("insert into tab1 values(1)")
440+
checkAnswer(sql("select * from tab1"), Seq(Row(1)))
441+
sql("create table tab2 (a int) using parquet")
442+
sql("insert into tab2 values(2)")
443+
sql(s"""insert overwrite local directory '$path' using parquet select * from tab2""")
444+
sql("refresh table tab1")
445+
checkAnswer(sql("select * from tab1"), Seq(Row(2)))
446+
}
447+
}
448+
}
449+
}
450+
451+
// https://github.com/apache/datafusion-comet/issues/3421
452+
ignore("Spark compat: SPARK-38336 INSERT INTO with default columns positive tests") {
453+
withSQLConf(
454+
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
455+
CometConf.COMET_EXEC_ENABLED.key -> "true",
456+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
457+
withTable("t") {
458+
sql("create table t(i boolean, s bigint) using parquet")
459+
sql("insert into t(i) values(true)")
460+
checkAnswer(spark.table("t"), Row(true, null))
461+
}
462+
}
463+
}
464+
465+
// https://github.com/apache/datafusion-comet/issues/3422
466+
ignore("Spark compat: SPARK-38811 INSERT INTO on ALTER TABLE ADD COLUMNS positive tests") {
467+
withSQLConf(
468+
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
469+
CometConf.COMET_EXEC_ENABLED.key -> "true",
470+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
471+
withTable("t") {
472+
sql("create table t(i boolean) using parquet")
473+
sql("alter table t add column s string default concat('abc', 'def')")
474+
sql("insert into t values(true, default)")
475+
checkAnswer(spark.table("t"), Row(true, "abcdef"))
476+
}
477+
}
478+
}
479+
480+
// https://github.com/apache/datafusion-comet/issues/3423
481+
ignore("Spark compat: SPARK-43071 INSERT INTO from non-projection queries") {
482+
withSQLConf(
483+
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
484+
CometConf.COMET_EXEC_ENABLED.key -> "true",
485+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
486+
withTable("t1", "t2") {
487+
sql("create table t1(i boolean, s bigint default 42) using parquet")
488+
sql("insert into t1 values (true, 41), (false, default)")
489+
sql("create table t2(i boolean, s bigint) using parquet")
490+
sql("insert into t2 select * from t1 order by s")
491+
checkAnswer(spark.table("t2"), Seq(Row(true, 41), Row(false, 42)))
492+
}
493+
}
494+
}
495+
496+
// https://github.com/apache/datafusion-comet/issues/3424
497+
ignore("Spark compat: Insert overwrite table command should output correct schema basic") {
498+
withSQLConf(
499+
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
500+
CometConf.COMET_EXEC_ENABLED.key -> "true",
501+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
502+
withTable("tbl", "tbl2") {
503+
withView("view1") {
504+
val df = spark.range(10).toDF("id")
505+
df.write.format("parquet").saveAsTable("tbl")
506+
spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl")
507+
spark.sql("CREATE TABLE tbl2(ID long) USING parquet")
508+
spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1")
509+
checkAnswer(spark.table("tbl2"), (0 until 10).map(Row(_)))
510+
}
511+
}
512+
}
513+
}
514+
515+
// https://github.com/apache/datafusion-comet/issues/3425
516+
ignore("Spark compat: parquet timestamp conversion") {
517+
withSQLConf(
518+
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
519+
CometConf.COMET_EXEC_ENABLED.key -> "true",
520+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
521+
withTempPath { dir =>
522+
spark
523+
.range(1)
524+
.selectExpr("current_timestamp() as ts")
525+
.write
526+
.parquet(dir.toString + "/spark")
527+
val result = spark.read.parquet(dir.toString + "/spark").collect()
528+
assert(result.length == 1)
529+
}
530+
}
531+
}
532+
533+
// https://github.com/apache/datafusion-comet/issues/3426
534+
ignore("Spark compat: INSERT INTO TABLE - complex type but different names") {
535+
withSQLConf(
536+
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
537+
CometConf.COMET_EXEC_ENABLED.key -> "true",
538+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
539+
withTable("tab1", "tab2") {
540+
sql("""CREATE TABLE tab1 (s struct<a: string, b: string>) USING parquet""")
541+
sql("""CREATE TABLE tab2 (s struct<c: string, d: string>) USING parquet""")
542+
sql("INSERT INTO tab1 VALUES (named_struct('a', 'x', 'b', 'y'))")
543+
sql("INSERT INTO tab2 SELECT * FROM tab1")
544+
checkAnswer(spark.table("tab2"), Row(Row("x", "y")))
545+
}
546+
}
547+
}
548+
549+
// https://github.com/apache/datafusion-comet/issues/3427
550+
ignore("Spark compat: Write Spark version into Parquet metadata") {
551+
withSQLConf(
552+
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
553+
CometConf.COMET_EXEC_ENABLED.key -> "true",
554+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
555+
withTempPath { dir =>
556+
spark.range(1).repartition(1).write.parquet(dir.getAbsolutePath)
557+
val files = dir.listFiles().filter(_.getName.endsWith(".parquet"))
558+
assert(files.nonEmpty, "Expected parquet files to be written")
559+
}
560+
}
561+
}
562+
563+
// https://github.com/apache/datafusion-comet/issues/3428
564+
ignore("Spark compat: write path implements onTaskCommit API correctly") {
565+
withSQLConf(
566+
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
567+
CometConf.COMET_EXEC_ENABLED.key -> "true",
568+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
569+
withTempDir { dir =>
570+
val path = dir.getCanonicalPath
571+
spark.range(10).repartition(10).write.mode("overwrite").parquet(path)
572+
val files = new File(path).listFiles().filter(_.getName.startsWith("part-"))
573+
assert(files.length > 0, "Expected part files to be written")
574+
}
575+
}
576+
}
577+
578+
// COMET NATIVE WRITER Spark 4.x test failures
579+
// https://github.com/apache/datafusion-comet/issues/3429
580+
ignore("Spark compat: ctas with union") {
581+
assume(isSpark40Plus)
582+
withSQLConf(
583+
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
584+
CometConf.COMET_EXEC_ENABLED.key -> "true",
585+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
586+
withTable("t") {
587+
sql("CREATE TABLE t USING parquet AS SELECT 1 AS c UNION ALL SELECT 2")
588+
checkAnswer(spark.table("t"), Seq(Row(1), Row(2)))
589+
}
590+
}
591+
}
592+
593+
// https://github.com/apache/datafusion-comet/issues/3430
594+
ignore("Spark compat: SPARK-48817 test multi insert") {
595+
assume(isSpark40Plus)
596+
withSQLConf(
597+
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
598+
CometConf.COMET_EXEC_ENABLED.key -> "true",
599+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
600+
withTable("t1", "t2") {
601+
sql("CREATE TABLE t1(a INT) USING parquet")
602+
sql("CREATE TABLE t2(a INT) USING parquet")
603+
sql("FROM (SELECT 1 AS a) src INSERT INTO t1 SELECT a INSERT INTO t2 SELECT a")
604+
checkAnswer(spark.table("t1"), Row(1))
605+
checkAnswer(spark.table("t2"), Row(1))
606+
}
607+
}
608+
}
609+
380610
private def createTestData(inputDir: File): String = {
381611
val inputPath = new File(inputDir, "input.parquet").getAbsolutePath
382612
val schema = FuzzDataGenerator.generateSchema(

0 commit comments

Comments
 (0)