@@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.datasources.HadoopFsRelation
3838import org .apache .spark .sql .execution .datasources .parquet .ParquetUtils
3939import org .apache .spark .sql .execution .datasources .v2 .BatchScanExec
4040import org .apache .spark .sql .execution .datasources .v2 .csv .CSVScan
41+ import org .apache .spark .sql .execution .datasources .v2 .parquet .ParquetScan
4142import org .apache .spark .sql .internal .SQLConf
4243import org .apache .spark .sql .types ._
4344
@@ -47,7 +48,7 @@ import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, withInfo, wi
4748import org .apache .comet .DataTypeSupport .isComplexType
4849import org .apache .comet .iceberg .{CometIcebergNativeScanMetadata , IcebergReflection }
4950import org .apache .comet .objectstore .NativeConfig
50- import org .apache .comet .parquet .{Native , SupportsComet }
51+ import org .apache .comet .parquet .{CometParquetScan , Native , SupportsComet }
5152import org .apache .comet .parquet .CometParquetUtils .{encryptionEnabled , isEncryptionConfigSupported }
5253import org .apache .comet .serde .operator .CometNativeScan
5354import org .apache .comet .shims .{CometTypeShim , ShimFileFormat , ShimSubqueryBroadcast }
@@ -60,6 +61,8 @@ case class CometScanRule(session: SparkSession)
6061 with CometTypeShim
6162 with ShimSubqueryBroadcast {
6263
64+ import CometScanRule ._
65+
6366 private lazy val showTransformations = CometConf .COMET_EXPLAIN_TRANSFORMATIONS .get()
6467
6568 override def apply (plan : SparkPlan ): SparkPlan = {
@@ -173,6 +176,8 @@ case class CometScanRule(session: SparkSession)
173176 nativeDataFusionScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
174177 case SCAN_NATIVE_ICEBERG_COMPAT =>
175178 nativeIcebergCompatScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
179+ case SCAN_NATIVE_COMET =>
180+ nativeCometScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
176181 }
177182
178183 case _ =>
@@ -226,9 +231,47 @@ case class CometScanRule(session: SparkSession)
226231 Some (CometScanExec (scanExec, session, SCAN_NATIVE_ICEBERG_COMPAT ))
227232 }
228233
234+ private def nativeCometScan (
235+ session : SparkSession ,
236+ scanExec : FileSourceScanExec ,
237+ r : HadoopFsRelation ,
238+ hadoopConf : Configuration ): Option [SparkPlan ] = {
239+ if (! isSchemaSupported(scanExec, SCAN_NATIVE_COMET , r)) {
240+ return None
241+ }
242+ Some (CometScanExec (scanExec, session, SCAN_NATIVE_COMET ))
243+ }
244+
229245 private def transformV2Scan (scanExec : BatchScanExec ): SparkPlan = {
230246
231247 scanExec.scan match {
248+ case scan : ParquetScan if COMET_NATIVE_SCAN_IMPL .get() == SCAN_NATIVE_COMET =>
249+ val fallbackReasons = new ListBuffer [String ]()
250+ val schemaSupported =
251+ CometBatchScanExec .isSchemaSupported(scan.readDataSchema, fallbackReasons)
252+ if (! schemaSupported) {
253+ fallbackReasons += s " Schema ${scan.readDataSchema} is not supported "
254+ }
255+
256+ val partitionSchemaSupported =
257+ CometBatchScanExec .isSchemaSupported(scan.readPartitionSchema, fallbackReasons)
258+ if (! partitionSchemaSupported) {
259+ fallbackReasons += s " Partition schema ${scan.readPartitionSchema} is not supported "
260+ }
261+
262+ if (scan.pushedAggregate.nonEmpty) {
263+ fallbackReasons += " Comet does not support pushed aggregate"
264+ }
265+
266+ if (schemaSupported && partitionSchemaSupported && scan.pushedAggregate.isEmpty) {
267+ val cometScan = CometParquetScan (session, scanExec.scan.asInstanceOf [ParquetScan ])
268+ CometBatchScanExec (
269+ scanExec.copy(scan = cometScan),
270+ runtimeFilters = scanExec.runtimeFilters)
271+ } else {
272+ withInfos(scanExec, fallbackReasons.toSet)
273+ }
274+
232275 case scan : CSVScan if COMET_CSV_V2_NATIVE_ENABLED .get() =>
233276 val fallbackReasons = new ListBuffer [String ]()
234277 val schemaSupported =
@@ -643,6 +686,48 @@ case class CometScanRule(session: SparkSession)
643686 }
644687 }
645688
689+ private def selectScan (
690+ scanExec : FileSourceScanExec ,
691+ partitionSchema : StructType ,
692+ hadoopConf : Configuration ): String = {
693+
694+ val fallbackReasons = new ListBuffer [String ]()
695+
696+ // native_iceberg_compat only supports local filesystem and S3
697+ if (scanExec.relation.inputFiles
698+ .forall(path => path.startsWith(" file://" ) || path.startsWith(" s3a://" ))) {
699+
700+ val filePath = scanExec.relation.inputFiles.headOption
701+ if (filePath.exists(_.startsWith(" s3a://" ))) {
702+ validateObjectStoreConfig(filePath.get, hadoopConf, fallbackReasons)
703+ }
704+ } else {
705+ fallbackReasons += s " $SCAN_NATIVE_ICEBERG_COMPAT only supports local filesystem and S3 "
706+ }
707+
708+ val typeChecker = CometScanTypeChecker (SCAN_NATIVE_ICEBERG_COMPAT )
709+ val schemaSupported =
710+ typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons)
711+ val partitionSchemaSupported =
712+ typeChecker.isSchemaSupported(partitionSchema, fallbackReasons)
713+
714+ val cometExecEnabled = COMET_EXEC_ENABLED .get()
715+ if (! cometExecEnabled) {
716+ fallbackReasons += s " $SCAN_NATIVE_ICEBERG_COMPAT requires ${COMET_EXEC_ENABLED .key}=true "
717+ }
718+
719+ if (cometExecEnabled && schemaSupported && partitionSchemaSupported &&
720+ fallbackReasons.isEmpty) {
721+ logInfo(s " Auto scan mode selecting $SCAN_NATIVE_ICEBERG_COMPAT" )
722+ SCAN_NATIVE_ICEBERG_COMPAT
723+ } else {
724+ logInfo(
725+ s " Auto scan mode falling back to $SCAN_NATIVE_COMET due to " +
726+ s " ${fallbackReasons.mkString(" , " )}" )
727+ SCAN_NATIVE_COMET
728+ }
729+ }
730+
646731 private def isDynamicPruningFilter (e : Expression ): Boolean =
647732 e.exists(_.isInstanceOf [PlanExpression [_]])
648733
@@ -685,12 +770,16 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C
685770 name : String ,
686771 fallbackReasons : ListBuffer [String ]): Boolean = {
687772 dt match {
688- case ShortType if CometConf .COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK .get() =>
773+ case ShortType
774+ if scanImpl != CometConf .SCAN_NATIVE_COMET &&
775+ CometConf .COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK .get() =>
689776 fallbackReasons += s " $scanImpl scan may not handle unsigned UINT_8 correctly for $dt. " +
690777 s " Set ${CometConf .COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK .key}=false to allow " +
691778 " native execution if your data does not contain unsigned small integers. " +
692779 CometConf .COMPAT_GUIDE
693780 false
781+ case _ : StructType | _ : ArrayType | _ : MapType if scanImpl == CometConf .SCAN_NATIVE_COMET =>
782+ false
694783 case dt if isStringCollationType(dt) =>
695784 // we don't need specific support for collation in scans, but this
696785 // is a convenient place to force the whole query to fall back to Spark for now
0 commit comments