Skip to content

Commit 1ccfa14

Browse files
authored
perf: [iceberg] Single-pass FileScanTask validation (#3443)
1 parent 020d982 commit 1ccfa14

3 files changed

Lines changed: 210 additions & 210 deletions

File tree

spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala

Lines changed: 2 additions & 177 deletions
Original file line numberDiff line numberDiff line change
@@ -333,32 +333,6 @@ object IcebergReflection extends Logging {
333333
}
334334
}
335335

336-
/**
337-
* Gets delete files from scan tasks.
338-
*
339-
* @param tasks
340-
* List of Iceberg FileScanTask objects
341-
* @return
342-
* List of all delete files across all tasks
343-
* @throws Exception
344-
* if reflection fails (callers must handle appropriately based on context)
345-
*/
346-
def getDeleteFiles(tasks: java.util.List[_]): java.util.List[_] = {
347-
import scala.jdk.CollectionConverters._
348-
val allDeletes = new java.util.ArrayList[Any]()
349-
350-
// scalastyle:off classforname
351-
val fileScanTaskClass = Class.forName(ClassNames.FILE_SCAN_TASK)
352-
// scalastyle:on classforname
353-
354-
tasks.asScala.foreach { task =>
355-
val deletes = getDeleteFilesFromTask(task, fileScanTaskClass)
356-
allDeletes.addAll(deletes)
357-
}
358-
359-
allDeletes
360-
}
361-
362336
/**
363337
* Gets delete files from a single FileScanTask.
364338
*
@@ -495,91 +469,6 @@ object IcebergReflection extends Logging {
495469
}
496470
}
497471

498-
/**
499-
* Validates file formats and filesystem schemes for Iceberg tasks.
500-
*
501-
* Checks that all data files and delete files are Parquet format and use filesystem schemes
502-
* supported by iceberg-rust (file, s3, s3a, gs, gcs, oss, abfss, abfs, wasbs, wasb).
503-
*
504-
* @param tasks
505-
* List of Iceberg FileScanTask objects
506-
* @return
507-
* (allParquet, unsupportedSchemes) where: - allParquet: true if all files are Parquet format
508-
* \- unsupportedSchemes: Set of unsupported filesystem schemes found (empty if all supported)
509-
*/
510-
def validateFileFormatsAndSchemes(tasks: java.util.List[_]): (Boolean, Set[String]) = {
511-
import scala.jdk.CollectionConverters._
512-
513-
// scalastyle:off classforname
514-
val contentScanTaskClass = Class.forName(ClassNames.CONTENT_SCAN_TASK)
515-
val contentFileClass = Class.forName(ClassNames.CONTENT_FILE)
516-
// scalastyle:on classforname
517-
518-
val fileMethod = contentScanTaskClass.getMethod("file")
519-
val formatMethod = contentFileClass.getMethod("format")
520-
val pathMethod = contentFileClass.getMethod("path")
521-
522-
// Filesystem schemes supported by iceberg-rust
523-
// See: iceberg-rust/crates/iceberg/src/io/storage.rs parse_scheme()
524-
val supportedSchemes =
525-
Set("file", "s3", "s3a", "gs", "gcs", "oss", "abfss", "abfs", "wasbs", "wasb")
526-
527-
var allParquet = true
528-
val unsupportedSchemes = scala.collection.mutable.Set[String]()
529-
530-
tasks.asScala.foreach { task =>
531-
val dataFile = fileMethod.invoke(task)
532-
val fileFormat = formatMethod.invoke(dataFile).toString
533-
534-
// Check file format
535-
if (fileFormat != FileFormats.PARQUET) {
536-
allParquet = false
537-
} else {
538-
// Only check filesystem schemes for Parquet files we'll actually process
539-
try {
540-
val filePath = pathMethod.invoke(dataFile).toString
541-
val uri = new java.net.URI(filePath)
542-
val scheme = uri.getScheme
543-
544-
if (scheme != null && !supportedSchemes.contains(scheme)) {
545-
unsupportedSchemes += scheme
546-
}
547-
} catch {
548-
case _: java.net.URISyntaxException =>
549-
// Ignore URI parsing errors - file paths may contain special characters
550-
// If the path is invalid, we'll fail later during actual file access
551-
}
552-
553-
// Check delete files if they exist
554-
try {
555-
val deletesMethod = task.getClass.getMethod("deletes")
556-
val deleteFiles = deletesMethod.invoke(task).asInstanceOf[java.util.List[_]]
557-
558-
deleteFiles.asScala.foreach { deleteFile =>
559-
extractFileLocation(contentFileClass, deleteFile).foreach { deletePath =>
560-
try {
561-
val deleteUri = new java.net.URI(deletePath)
562-
val deleteScheme = deleteUri.getScheme
563-
564-
if (deleteScheme != null && !supportedSchemes.contains(deleteScheme)) {
565-
unsupportedSchemes += deleteScheme
566-
}
567-
} catch {
568-
case _: java.net.URISyntaxException =>
569-
// Ignore URI parsing errors for delete files too
570-
}
571-
}
572-
}
573-
} catch {
574-
case _: Exception =>
575-
// Ignore errors accessing delete files - they may not be supported
576-
}
577-
}
578-
}
579-
580-
(allParquet, unsupportedSchemes.toSet)
581-
}
582-
583472
/**
584473
* Validates partition column types for compatibility with iceberg-rust.
585474
*
@@ -643,68 +532,6 @@ object IcebergReflection extends Logging {
643532

644533
unsupportedTypes.toList
645534
}
646-
647-
/**
648-
* Checks if tasks have non-identity transforms in their residual expressions.
649-
*
650-
* Residual expressions are filters that must be evaluated after reading data from Parquet.
651-
* iceberg-rust can only handle simple column references in residuals, not transformed columns.
652-
* Transform functions like truncate, bucket, year, month, day, hour require evaluation by
653-
* Spark.
654-
*
655-
* @param tasks
656-
* List of Iceberg FileScanTask objects
657-
* @return
658-
* Some(transformType) if an unsupported transform is found (e.g., "truncate[4]"), None if all
659-
* transforms are identity or no transforms are present
660-
* @throws Exception
661-
* if reflection fails - caller must handle appropriately (fallback in planning, fatal in
662-
* serialization)
663-
*/
664-
def findNonIdentityTransformInResiduals(tasks: java.util.List[_]): Option[String] = {
665-
import scala.jdk.CollectionConverters._
666-
667-
// scalastyle:off classforname
668-
val fileScanTaskClass = Class.forName(ClassNames.FILE_SCAN_TASK)
669-
val contentScanTaskClass = Class.forName(ClassNames.CONTENT_SCAN_TASK)
670-
val unboundPredicateClass = Class.forName(ClassNames.UNBOUND_PREDICATE)
671-
// scalastyle:on classforname
672-
673-
tasks.asScala.foreach { task =>
674-
if (fileScanTaskClass.isInstance(task)) {
675-
try {
676-
val residualMethod = contentScanTaskClass.getMethod("residual")
677-
val residual = residualMethod.invoke(task)
678-
679-
// Check if residual is an UnboundPredicate with a transform
680-
if (unboundPredicateClass.isInstance(residual)) {
681-
val termMethod = unboundPredicateClass.getMethod("term")
682-
val term = termMethod.invoke(residual)
683-
684-
// Check if term has a transform
685-
try {
686-
val transformMethod = term.getClass.getMethod("transform")
687-
transformMethod.setAccessible(true)
688-
val transform = transformMethod.invoke(term)
689-
val transformStr = transform.toString
690-
691-
// Only identity transform is supported in residuals
692-
if (transformStr != Transforms.IDENTITY) {
693-
return Some(transformStr)
694-
}
695-
} catch {
696-
case _: NoSuchMethodException =>
697-
// No transform method means it's a simple reference - OK
698-
}
699-
}
700-
} catch {
701-
case _: Exception =>
702-
// Skip tasks where we can't get residual - they may not have one
703-
}
704-
}
705-
}
706-
None
707-
}
708535
}
709536

710537
/**
@@ -783,10 +610,8 @@ object CometIcebergNativeScanMetadata extends Logging {
783610
val globalFieldIdMapping = buildFieldIdMapping(scanSchema)
784611

785612
// File format is always PARQUET,
786-
// validated in CometScanRule.validateFileFormatsAndSchemes()
613+
// validated in CometScanRule.validateIcebergFileScanTasks()
787614
// Hardcoded here for extensibility (future ORC/Avro support would add logic here)
788-
val fileFormat = FileFormats.PARQUET
789-
790615
CometIcebergNativeScanMetadata(
791616
table = table,
792617
metadataLocation = metadataLocation,
@@ -796,7 +621,7 @@ object CometIcebergNativeScanMetadata extends Logging {
796621
tableSchema = tableSchema,
797622
globalFieldIdMapping = globalFieldIdMapping,
798623
catalogProperties = catalogProperties,
799-
fileFormat = fileFormat)
624+
fileFormat = FileFormats.PARQUET)
800625
}
801626
}
802627
}

0 commit comments

Comments
 (0)