|
20 | 20 | package org.apache.comet.parquet |
21 | 21 |
|
22 | 22 | import java.io.File |
23 | | - |
24 | 23 | import scala.util.Random |
25 | | - |
26 | 24 | import org.apache.spark.sql.{CometTestBase, DataFrame, Row} |
27 | 25 | import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometNativeWriteExec, CometScanExec} |
28 | 26 | import org.apache.spark.sql.execution.{FileSourceScanExec, QueryExecution, SparkPlan} |
29 | 27 | import org.apache.spark.sql.execution.command.DataWritingCommandExec |
30 | 28 | import org.apache.spark.sql.internal.SQLConf |
31 | 29 | import org.apache.spark.sql.types.StructType |
32 | | - |
33 | 30 | import org.apache.comet.CometConf |
| 31 | +import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus |
34 | 32 | import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator, SchemaGenOptions} |
35 | 33 |
|
36 | 34 | class CometParquetWriterSuite extends CometTestBase { |
@@ -377,6 +375,238 @@ class CometParquetWriterSuite extends CometTestBase { |
377 | 375 | } |
378 | 376 | } |
379 | 377 |
|
| 378 | +// NATIVE COMET WRITER TESTS WHICH FAIL IN SPARK |
| 379 | + // https://github.com/apache/datafusion-comet/issues/3417 |
| 380 | + ignore("Spark compat: empty file should be skipped while write to file") { |
| 381 | + withSQLConf( |
| 382 | + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", |
| 383 | + CometConf.COMET_EXEC_ENABLED.key -> "true", |
| 384 | + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { |
| 385 | + withTempPath { path => |
| 386 | + spark.range(100).repartition(10).where("id = 50").write.parquet(path.toString) |
| 387 | + val partFiles = path |
| 388 | + .listFiles() |
| 389 | + .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_")) |
| 390 | + assert(partFiles.length === 2) |
| 391 | + } |
| 392 | + } |
| 393 | + } |
| 394 | + |
| 395 | + // https://github.com/apache/datafusion-comet/issues/3418 |
| 396 | + ignore("Spark compat: SPARK-33901 ctas should not change table's schema") { |
| 397 | + withSQLConf( |
| 398 | + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", |
| 399 | + CometConf.COMET_EXEC_ENABLED.key -> "true", |
| 400 | + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { |
| 401 | + withTable("t1", "t2") { |
| 402 | + sql("CREATE TABLE t1(i CHAR(5), c VARCHAR(4)) USING parquet") |
| 403 | + sql("CREATE TABLE t2 USING parquet AS SELECT * FROM t1") |
| 404 | + checkAnswer( |
| 405 | + sql("desc t2").selectExpr("data_type").where("data_type like '%char%'"), |
| 406 | + Seq(Row("char(5)"), Row("varchar(4)"))) |
| 407 | + } |
| 408 | + } |
| 409 | + } |
| 410 | + |
| 411 | + // https://github.com/apache/datafusion-comet/issues/3419 |
| 412 | + ignore("Spark compat: SPARK-37160 CREATE TABLE AS SELECT with CHAR_AS_VARCHAR") { |
| 413 | + withSQLConf( |
| 414 | + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", |
| 415 | + CometConf.COMET_EXEC_ENABLED.key -> "true", |
| 416 | + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { |
| 417 | + withTable("t1", "t2") { |
| 418 | + sql("CREATE TABLE t1(col CHAR(5)) USING parquet") |
| 419 | + withSQLConf(SQLConf.CHAR_AS_VARCHAR.key -> "true") { |
| 420 | + sql("CREATE TABLE t2 USING parquet AS SELECT * FROM t1") |
| 421 | + checkAnswer( |
| 422 | + sql("desc t2").selectExpr("data_type").where("data_type like '%char%'"), |
| 423 | + Seq(Row("varchar(5)"))) |
| 424 | + } |
| 425 | + } |
| 426 | + } |
| 427 | + } |
| 428 | + |
| 429 | + // https://github.com/apache/datafusion-comet/issues/3420 |
| 430 | + ignore("Spark compat: SPARK-29174 Support LOCAL in INSERT OVERWRITE DIRECTORY") { |
| 431 | + withSQLConf( |
| 432 | + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", |
| 433 | + CometConf.COMET_EXEC_ENABLED.key -> "true", |
| 434 | + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { |
| 435 | + withTempPath { dir => |
| 436 | + val path = dir.toURI.getPath |
| 437 | + withTable("tab1", "tab2") { |
| 438 | + sql(s"""create table tab1 (a int) using parquet location '$path'""") |
| 439 | + sql("insert into tab1 values(1)") |
| 440 | + checkAnswer(sql("select * from tab1"), Seq(Row(1))) |
| 441 | + sql("create table tab2 (a int) using parquet") |
| 442 | + sql("insert into tab2 values(2)") |
| 443 | + sql(s"""insert overwrite local directory '$path' using parquet select * from tab2""") |
| 444 | + sql("refresh table tab1") |
| 445 | + checkAnswer(sql("select * from tab1"), Seq(Row(2))) |
| 446 | + } |
| 447 | + } |
| 448 | + } |
| 449 | + } |
| 450 | + |
| 451 | + // https://github.com/apache/datafusion-comet/issues/3421 |
| 452 | + ignore("Spark compat: SPARK-38336 INSERT INTO with default columns positive tests") { |
| 453 | + withSQLConf( |
| 454 | + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", |
| 455 | + CometConf.COMET_EXEC_ENABLED.key -> "true", |
| 456 | + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { |
| 457 | + withTable("t") { |
| 458 | + sql("create table t(i boolean, s bigint) using parquet") |
| 459 | + sql("insert into t(i) values(true)") |
| 460 | + checkAnswer(spark.table("t"), Row(true, null)) |
| 461 | + } |
| 462 | + } |
| 463 | + } |
| 464 | + |
| 465 | + // https://github.com/apache/datafusion-comet/issues/3422 |
| 466 | + ignore("Spark compat: SPARK-38811 INSERT INTO on ALTER TABLE ADD COLUMNS positive tests") { |
| 467 | + withSQLConf( |
| 468 | + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", |
| 469 | + CometConf.COMET_EXEC_ENABLED.key -> "true", |
| 470 | + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { |
| 471 | + withTable("t") { |
| 472 | + sql("create table t(i boolean) using parquet") |
| 473 | + sql("alter table t add column s string default concat('abc', 'def')") |
| 474 | + sql("insert into t values(true, default)") |
| 475 | + checkAnswer(spark.table("t"), Row(true, "abcdef")) |
| 476 | + } |
| 477 | + } |
| 478 | + } |
| 479 | + |
| 480 | + // https://github.com/apache/datafusion-comet/issues/3423 |
| 481 | + ignore("Spark compat: SPARK-43071 INSERT INTO from non-projection queries") { |
| 482 | + withSQLConf( |
| 483 | + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", |
| 484 | + CometConf.COMET_EXEC_ENABLED.key -> "true", |
| 485 | + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { |
| 486 | + withTable("t1", "t2") { |
| 487 | + sql("create table t1(i boolean, s bigint default 42) using parquet") |
| 488 | + sql("insert into t1 values (true, 41), (false, default)") |
| 489 | + sql("create table t2(i boolean, s bigint) using parquet") |
| 490 | + sql("insert into t2 select * from t1 order by s") |
| 491 | + checkAnswer(spark.table("t2"), Seq(Row(true, 41), Row(false, 42))) |
| 492 | + } |
| 493 | + } |
| 494 | + } |
| 495 | + |
| 496 | + // https://github.com/apache/datafusion-comet/issues/3424 |
| 497 | + ignore("Spark compat: Insert overwrite table command should output correct schema basic") { |
| 498 | + withSQLConf( |
| 499 | + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", |
| 500 | + CometConf.COMET_EXEC_ENABLED.key -> "true", |
| 501 | + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { |
| 502 | + withTable("tbl", "tbl2") { |
| 503 | + withView("view1") { |
| 504 | + val df = spark.range(10).toDF("id") |
| 505 | + df.write.format("parquet").saveAsTable("tbl") |
| 506 | + spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl") |
| 507 | + spark.sql("CREATE TABLE tbl2(ID long) USING parquet") |
| 508 | + spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1") |
| 509 | + checkAnswer(spark.table("tbl2"), (0 until 10).map(Row(_))) |
| 510 | + } |
| 511 | + } |
| 512 | + } |
| 513 | + } |
| 514 | + |
| 515 | + // https://github.com/apache/datafusion-comet/issues/3425 |
| 516 | + ignore("Spark compat: parquet timestamp conversion") { |
| 517 | + withSQLConf( |
| 518 | + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", |
| 519 | + CometConf.COMET_EXEC_ENABLED.key -> "true", |
| 520 | + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { |
| 521 | + withTempPath { dir => |
| 522 | + spark |
| 523 | + .range(1) |
| 524 | + .selectExpr("current_timestamp() as ts") |
| 525 | + .write |
| 526 | + .parquet(dir.toString + "/spark") |
| 527 | + val result = spark.read.parquet(dir.toString + "/spark").collect() |
| 528 | + assert(result.length == 1) |
| 529 | + } |
| 530 | + } |
| 531 | + } |
| 532 | + |
| 533 | + // https://github.com/apache/datafusion-comet/issues/3426 |
| 534 | + ignore("Spark compat: INSERT INTO TABLE - complex type but different names") { |
| 535 | + withSQLConf( |
| 536 | + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", |
| 537 | + CometConf.COMET_EXEC_ENABLED.key -> "true", |
| 538 | + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { |
| 539 | + withTable("tab1", "tab2") { |
| 540 | + sql("""CREATE TABLE tab1 (s struct<a: string, b: string>) USING parquet""") |
| 541 | + sql("""CREATE TABLE tab2 (s struct<c: string, d: string>) USING parquet""") |
| 542 | + sql("INSERT INTO tab1 VALUES (named_struct('a', 'x', 'b', 'y'))") |
| 543 | + sql("INSERT INTO tab2 SELECT * FROM tab1") |
| 544 | + checkAnswer(spark.table("tab2"), Row(Row("x", "y"))) |
| 545 | + } |
| 546 | + } |
| 547 | + } |
| 548 | + |
| 549 | + // https://github.com/apache/datafusion-comet/issues/3427 |
| 550 | + ignore("Spark compat: Write Spark version into Parquet metadata") { |
| 551 | + withSQLConf( |
| 552 | + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", |
| 553 | + CometConf.COMET_EXEC_ENABLED.key -> "true", |
| 554 | + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { |
| 555 | + withTempPath { dir => |
| 556 | + spark.range(1).repartition(1).write.parquet(dir.getAbsolutePath) |
| 557 | + val files = dir.listFiles().filter(_.getName.endsWith(".parquet")) |
| 558 | + assert(files.nonEmpty, "Expected parquet files to be written") |
| 559 | + } |
| 560 | + } |
| 561 | + } |
| 562 | + |
| 563 | + // https://github.com/apache/datafusion-comet/issues/3428 |
| 564 | + ignore("Spark compat: write path implements onTaskCommit API correctly") { |
| 565 | + withSQLConf( |
| 566 | + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", |
| 567 | + CometConf.COMET_EXEC_ENABLED.key -> "true", |
| 568 | + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { |
| 569 | + withTempDir { dir => |
| 570 | + val path = dir.getCanonicalPath |
| 571 | + spark.range(10).repartition(10).write.mode("overwrite").parquet(path) |
| 572 | + val files = new File(path).listFiles().filter(_.getName.startsWith("part-")) |
| 573 | + assert(files.length > 0, "Expected part files to be written") |
| 574 | + } |
| 575 | + } |
| 576 | + } |
| 577 | + |
| 578 | + // COMET NATIVE WRITER Spark 4.x test failures |
| 579 | + // https://github.com/apache/datafusion-comet/issues/3429 |
| 580 | + ignore("Spark compat: ctas with union") { |
| 581 | + assume(isSpark40Plus) |
| 582 | + withSQLConf( |
| 583 | + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", |
| 584 | + CometConf.COMET_EXEC_ENABLED.key -> "true", |
| 585 | + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { |
| 586 | + withTable("t") { |
| 587 | + sql("CREATE TABLE t USING parquet AS SELECT 1 AS c UNION ALL SELECT 2") |
| 588 | + checkAnswer(spark.table("t"), Seq(Row(1), Row(2))) |
| 589 | + } |
| 590 | + } |
| 591 | + } |
| 592 | + |
| 593 | + // https://github.com/apache/datafusion-comet/issues/3430 |
| 594 | + ignore("Spark compat: SPARK-48817 test multi insert") { |
| 595 | + assume(isSpark40Plus) |
| 596 | + withSQLConf( |
| 597 | + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", |
| 598 | + CometConf.COMET_EXEC_ENABLED.key -> "true", |
| 599 | + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { |
| 600 | + withTable("t1", "t2") { |
| 601 | + sql("CREATE TABLE t1(a INT) USING parquet") |
| 602 | + sql("CREATE TABLE t2(a INT) USING parquet") |
| 603 | + sql("FROM (SELECT 1 AS a) src INSERT INTO t1 SELECT a INSERT INTO t2 SELECT a") |
| 604 | + checkAnswer(spark.table("t1"), Row(1)) |
| 605 | + checkAnswer(spark.table("t2"), Row(1)) |
| 606 | + } |
| 607 | + } |
| 608 | + } |
| 609 | + |
380 | 610 | private def createTestData(inputDir: File): String = { |
381 | 611 | val inputPath = new File(inputDir, "input.parquet").getAbsolutePath |
382 | 612 | val schema = FuzzDataGenerator.generateSchema( |
|
0 commit comments