test("count distinct") {
withSQLConf("spark.comet.enabled" -> "false") {
sql("drop table if exists t")
sql("CREATE TABLE t(v VARCHAR(10), v1 VARCHAR(10), i INT) USING PARQUET")
sql("INSERT INTO t VALUES ('c', 'a', 1)")
sql("INSERT INTO t VALUES ('c1', 'a1', 1)")
sql("INSERT INTO t VALUES ('c2', 'a2', 2)")
sql("INSERT INTO t VALUES ('c3', 'a3', 2)")
sql("INSERT INTO t VALUES ('c4', 'a4', 2)")
sql("INSERT INTO t VALUES ('c', 'a', 1)")
sql("INSERT INTO t VALUES ('c1', 'a1', 1)")
sql("INSERT INTO t VALUES ('c2', 'a2', 2)")
sql("INSERT INTO t VALUES ('c3', 'a3', 2)")
sql("INSERT INTO t VALUES ('c4', 'a4', 2)")
sql("INSERT INTO t VALUES ('c', 'a', 1)")
sql("INSERT INTO t VALUES ('c1', 'a1', 1)")
sql("INSERT INTO t VALUES ('c2', 'a2', 2)")
sql("INSERT INTO t VALUES ('c3', 'a3', 2)")
sql("INSERT INTO t VALUES ('c4', 'a4', 2)")
sql("select * from t").repartition(3).write.mode("overwrite").parquet("/tmp/test_11")
}
withSQLConf(
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
"spark.comet.cast.allowIncompatible" -> "true",
"spark.sql.adaptive.enabled" -> "false",
"spark.comet.explain.native.enabled" -> "true",
"spark.comet.enabled" -> "true",
// "spark.comet.exec.replaceSortMergeJoin" -> "true",
"spark.comet.exec.shuffle.enableFastEncoding" -> "true",
"spark.comet.exec.shuffle.enabled" -> "true",
// "spark.comet.exec.shuffle.fallbackToColumnar" -> "true",
"spark.comet.explainFallback.enabled" -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_iceberg_compat",
"spark.shuffle.manager" -> "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager",
"spark.comet.logFallbackReasons.enabled" -> "true") {
// SparkEnv.get.conf.set("spark.comet.explain.native.enabled", "true")
spark.read.parquet("/tmp/test_11").createOrReplaceTempView("t2")
// sql("SELECT count(distinct v, v) FROM t2").explain()
checkSparkAnswerAndOperator("SELECT i, sum(v1), count(distinct v) FROM t2 group by i")
}
}
Expected only Comet native operators, but found HashAggregate.
plan: HashAggregate
+- CometColumnarToRow
+- CometColumnarExchange
+- HashAggregate
+- HashAggregate [COMET: Unsupported aggregation mode PartialMerge]
+- CometColumnarToRow
+- CometColumnarExchange
+- HashAggregate [COMET: cast(v1#146 as double) is not fully compatible with Spark (Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode.). To enable it anyway, set spark.comet.expression.Cast.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html)., Unsupported aggregate expression(s)]
+- CometColumnarToRow
+- CometScan [native_iceberg_compat] parquet
Comet falls back with simple agg queries
SELECT i, sum(v1), count(distinct v) FROM t2 group by iReproduce