@@ -57,7 +57,7 @@ import org.apache.comet.vector.CometVector
5757 * in [[org.apache.comet.CometSparkSessionExtensions ]]
5858 * - `buildReaderWithPartitionValues`, so Spark calls Comet's Parquet reader to read values.
5959 */
60- class CometParquetFileFormat (session : SparkSession , scanImpl : String )
60+ class CometParquetFileFormat (session : SparkSession )
6161 extends ParquetFileFormat
6262 with MetricsSupport
6363 with ShimSQLConf {
@@ -110,8 +110,6 @@ class CometParquetFileFormat(session: SparkSession, scanImpl: String)
110110 // Comet specific configurations
111111 val capacity = CometConf .COMET_BATCH_SIZE .get(sqlConf)
112112
113- val nativeIcebergCompat = scanImpl == CometConf .SCAN_NATIVE_ICEBERG_COMPAT
114-
115113 (file : PartitionedFile ) => {
116114 val sharedConf = broadcastedHadoopConf.value.value
117115 val footer = FooterReader .readFooter(sharedConf, file)
@@ -135,85 +133,47 @@ class CometParquetFileFormat(session: SparkSession, scanImpl: String)
135133 isCaseSensitive,
136134 datetimeRebaseSpec)
137135
138- val recordBatchReader =
139- if (nativeIcebergCompat) {
140- // We still need the predicate in the conf to allow us to generate row indexes based on
141- // the actual row groups read
142- val pushed = if (parquetFilterPushDown) {
143- filters
144- // Collects all converted Parquet filter predicates. Notice that not all predicates
145- // can be converted (`ParquetFilters.createFilter` returns an `Option`). That's why
146- // a `flatMap` is used here.
147- .flatMap(parquetFilters.createFilter)
148- .reduceOption(FilterApi .and)
149- } else {
150- None
151- }
152- pushed.foreach(p => ParquetInputFormat .setFilterPredicate(sharedConf, p))
153- val pushedNative = if (parquetFilterPushDown) {
154- parquetFilters.createNativeFilters(filters)
155- } else {
156- None
157- }
158- val batchReader = new NativeBatchReader (
159- sharedConf,
160- file,
161- footer,
162- pushedNative.orNull,
163- capacity,
164- requiredSchema,
165- dataSchema,
166- isCaseSensitive,
167- useFieldId,
168- ignoreMissingIds,
169- datetimeRebaseSpec.mode == CORRECTED ,
170- partitionSchema,
171- file.partitionValues,
172- metrics.asJava,
173- CometMetricNode (metrics))
174- try {
175- batchReader.init()
176- } catch {
177- case e : Throwable =>
178- batchReader.close()
179- throw e
180- }
181- batchReader
182- } else {
183- val pushed = if (parquetFilterPushDown) {
184- filters
185- // Collects all converted Parquet filter predicates. Notice that not all predicates
186- // can be converted (`ParquetFilters.createFilter` returns an `Option`). That's why
187- // a `flatMap` is used here.
188- .flatMap(parquetFilters.createFilter)
189- .reduceOption(FilterApi .and)
190- } else {
191- None
192- }
193- pushed.foreach(p => ParquetInputFormat .setFilterPredicate(sharedConf, p))
194-
195- val batchReader = new BatchReader (
196- sharedConf,
197- file,
198- footer,
199- capacity,
200- requiredSchema,
201- isCaseSensitive,
202- useFieldId,
203- ignoreMissingIds,
204- datetimeRebaseSpec.mode == CORRECTED ,
205- partitionSchema,
206- file.partitionValues,
207- metrics.asJava)
208- try {
209- batchReader.init()
210- } catch {
211- case e : Throwable =>
212- batchReader.close()
213- throw e
214- }
215- batchReader
216- }
136+ // We still need the predicate in the conf to allow us to generate row indexes based on
137+ // the actual row groups read
138+ val pushed = if (parquetFilterPushDown) {
139+ filters
140+ // Collects all converted Parquet filter predicates. Notice that not all predicates
141+ // can be converted (`ParquetFilters.createFilter` returns an `Option`). That's why
142+ // a `flatMap` is used here.
143+ .flatMap(parquetFilters.createFilter)
144+ .reduceOption(FilterApi .and)
145+ } else {
146+ None
147+ }
148+ pushed.foreach(p => ParquetInputFormat .setFilterPredicate(sharedConf, p))
149+ val pushedNative = if (parquetFilterPushDown) {
150+ parquetFilters.createNativeFilters(filters)
151+ } else {
152+ None
153+ }
154+ val recordBatchReader = new NativeBatchReader (
155+ sharedConf,
156+ file,
157+ footer,
158+ pushedNative.orNull,
159+ capacity,
160+ requiredSchema,
161+ dataSchema,
162+ isCaseSensitive,
163+ useFieldId,
164+ ignoreMissingIds,
165+ datetimeRebaseSpec.mode == CORRECTED ,
166+ partitionSchema,
167+ file.partitionValues,
168+ metrics.asJava,
169+ CometMetricNode (metrics))
170+ try {
171+ recordBatchReader.init()
172+ } catch {
173+ case e : Throwable =>
174+ recordBatchReader.close()
175+ throw e
176+ }
217177 val iter = new RecordReaderIterator (recordBatchReader)
218178 try {
219179 iter.asInstanceOf [Iterator [InternalRow ]]
0 commit comments