Skip to content

Commit 7c465c4

Browse files
authored
chore: Remove all remaining uses of legacy BatchReader from Comet [iceberg] (#3468)
1 parent 84df1ce commit 7c465c4

13 files changed

Lines changed: 56 additions & 841 deletions

File tree

common/src/main/java/org/apache/comet/parquet/BatchReader.java

Lines changed: 5 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,6 @@
2424
import java.net.URI;
2525
import java.net.URISyntaxException;
2626
import java.util.*;
27-
import java.util.concurrent.Callable;
28-
import java.util.concurrent.ExecutorService;
29-
import java.util.concurrent.Future;
30-
import java.util.concurrent.LinkedBlockingQueue;
3127

3228
import scala.Option;
3329

@@ -36,9 +32,7 @@
3632

3733
import org.apache.arrow.memory.BufferAllocator;
3834
import org.apache.arrow.memory.RootAllocator;
39-
import org.apache.commons.lang3.tuple.Pair;
4035
import org.apache.hadoop.conf.Configuration;
41-
import org.apache.hadoop.fs.FileSystem;
4236
import org.apache.hadoop.fs.Path;
4337
import org.apache.hadoop.mapreduce.InputSplit;
4438
import org.apache.hadoop.mapreduce.RecordReader;
@@ -87,7 +81,10 @@
8781
* reader.close();
8882
* }
8983
* </pre>
84+
*
85+
* @deprecated since 0.14.0. This class is kept for Iceberg compatibility only.
9086
*/
87+
@Deprecated
9188
@IcebergApi
9289
public class BatchReader extends RecordReader<Void, ColumnarBatch> implements Closeable {
9390
private static final Logger LOG = LoggerFactory.getLogger(FileReader.class);
@@ -110,8 +107,6 @@ public class BatchReader extends RecordReader<Void, ColumnarBatch> implements Cl
110107
protected AbstractColumnReader[] columnReaders;
111108
private CometSchemaImporter importer;
112109
protected ColumnarBatch currentBatch;
113-
private Future<Option<Throwable>> prefetchTask;
114-
private LinkedBlockingQueue<Pair<PageReadStore, Long>> prefetchQueue;
115110
private FileReader fileReader;
116111
private boolean[] missingColumns;
117112
protected boolean isInitialized;
@@ -363,26 +358,7 @@ public void init() throws URISyntaxException, IOException {
363358
}
364359
}
365360

366-
// Pre-fetching
367-
boolean preFetchEnabled =
368-
conf.getBoolean(
369-
CometConf.COMET_SCAN_PREFETCH_ENABLED().key(),
370-
(boolean) CometConf.COMET_SCAN_PREFETCH_ENABLED().defaultValue().get());
371-
372-
if (preFetchEnabled) {
373-
LOG.info("Prefetch enabled for BatchReader.");
374-
this.prefetchQueue = new LinkedBlockingQueue<>();
375-
}
376-
377361
isInitialized = true;
378-
synchronized (this) {
379-
// if prefetch is enabled, `init()` is called in separate thread. When
380-
// `BatchReader.nextBatch()` is called asynchronously, it is possibly that
381-
// `init()` is not called or finished. We need to hold on `nextBatch` until
382-
// initialization of `BatchReader` is done. Once we are close to finish
383-
// initialization, we notify the waiting thread of `nextBatch` to continue.
384-
notifyAll();
385-
}
386362
}
387363

388364
/**
@@ -436,51 +412,13 @@ public ColumnarBatch currentBatch() {
436412
return currentBatch;
437413
}
438414

439-
// Only for testing
440-
public Future<Option<Throwable>> getPrefetchTask() {
441-
return this.prefetchTask;
442-
}
443-
444-
// Only for testing
445-
public LinkedBlockingQueue<Pair<PageReadStore, Long>> getPrefetchQueue() {
446-
return this.prefetchQueue;
447-
}
448-
449415
/**
450416
* Loads the next batch of rows.
451417
*
452418
* @return true if there are no more rows to read, false otherwise.
453419
*/
454420
public boolean nextBatch() throws IOException {
455-
if (this.prefetchTask == null) {
456-
Preconditions.checkState(isInitialized, "init() should be called first!");
457-
} else {
458-
// If prefetch is enabled, this reader will be initialized asynchronously from a
459-
// different thread. Wait until it is initialized
460-
while (!isInitialized) {
461-
synchronized (this) {
462-
try {
463-
// Wait until initialization of current `BatchReader` is finished (i.e., `init()`),
464-
// is done. It is possibly that `init()` is done after entering this while loop,
465-
// so a short timeout is given.
466-
wait(100);
467-
468-
// Checks if prefetch task is finished. If so, tries to get exception if any.
469-
if (prefetchTask.isDone()) {
470-
Option<Throwable> exception = prefetchTask.get();
471-
if (exception.isDefined()) {
472-
throw exception.get();
473-
}
474-
}
475-
} catch (RuntimeException e) {
476-
// Spark will check certain exception e.g. `SchemaColumnConvertNotSupportedException`.
477-
throw e;
478-
} catch (Throwable e) {
479-
throw new IOException(e);
480-
}
481-
}
482-
}
483-
}
421+
Preconditions.checkState(isInitialized, "init() should be called first!");
484422

485423
if (rowsRead >= totalRowCount) return false;
486424
boolean hasMore;
@@ -547,7 +485,6 @@ public void close() throws IOException {
547485
}
548486
}
549487

550-
@SuppressWarnings("deprecation")
551488
private boolean loadNextRowGroupIfNecessary() throws Throwable {
552489
// More rows can be read from loaded row group. No need to load next one.
553490
if (rowsRead != totalRowsLoaded) return true;
@@ -556,21 +493,7 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable {
556493
SQLMetric numRowGroupsMetric = metrics.get("ParquetRowGroups");
557494
long startNs = System.nanoTime();
558495

559-
PageReadStore rowGroupReader = null;
560-
if (prefetchTask != null && prefetchQueue != null) {
561-
// Wait for pre-fetch task to finish.
562-
Pair<PageReadStore, Long> rowGroupReaderPair = prefetchQueue.take();
563-
rowGroupReader = rowGroupReaderPair.getLeft();
564-
565-
// Update incremental byte read metric. Because this metric in Spark is maintained
566-
// by thread local variable, we need to manually update it.
567-
// TODO: We may expose metrics from `FileReader` and get from it directly.
568-
long incBytesRead = rowGroupReaderPair.getRight();
569-
FileSystem.getAllStatistics().stream()
570-
.forEach(statistic -> statistic.incrementBytesRead(incBytesRead));
571-
} else {
572-
rowGroupReader = fileReader.readNextRowGroup();
573-
}
496+
PageReadStore rowGroupReader = fileReader.readNextRowGroup();
574497

575498
if (rowGroupTimeMetric != null) {
576499
rowGroupTimeMetric.add(System.nanoTime() - startNs);
@@ -608,48 +531,4 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable {
608531
totalRowsLoaded += rowGroupReader.getRowCount();
609532
return true;
610533
}
611-
612-
// Submits a prefetch task for this reader.
613-
public void submitPrefetchTask(ExecutorService threadPool) {
614-
this.prefetchTask = threadPool.submit(new PrefetchTask());
615-
}
616-
617-
// A task for prefetching parquet row groups.
618-
private class PrefetchTask implements Callable<Option<Throwable>> {
619-
private long getBytesRead() {
620-
return FileSystem.getAllStatistics().stream()
621-
.mapToLong(s -> s.getThreadStatistics().getBytesRead())
622-
.sum();
623-
}
624-
625-
@Override
626-
public Option<Throwable> call() throws Exception {
627-
// Gets the bytes read so far.
628-
long baseline = getBytesRead();
629-
630-
try {
631-
init();
632-
633-
while (true) {
634-
PageReadStore rowGroupReader = fileReader.readNextRowGroup();
635-
636-
if (rowGroupReader == null) {
637-
// Reaches the end of row groups.
638-
return Option.empty();
639-
} else {
640-
long incBytesRead = getBytesRead() - baseline;
641-
642-
prefetchQueue.add(Pair.of(rowGroupReader, incBytesRead));
643-
}
644-
}
645-
} catch (Throwable e) {
646-
// Returns exception thrown from the reader. The reader will re-throw it.
647-
return Option.apply(e);
648-
} finally {
649-
if (fileReader != null) {
650-
fileReader.closeStream();
651-
}
652-
}
653-
}
654-
}
655534
}

common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@
2424
import org.apache.spark.sql.types.StructType;
2525
import org.apache.spark.sql.vectorized.ColumnarBatch;
2626

27+
import org.apache.comet.IcebergApi;
2728
import org.apache.comet.vector.CometVector;
2829

2930
/** This class is a public interface used by Apache Iceberg to read batches using Comet */
31+
@IcebergApi
3032
public class IcebergCometBatchReader extends BatchReader {
3133
public IcebergCometBatchReader(int numColumns, StructType schema) {
3234
this.columnReaders = new AbstractColumnReader[numColumns];

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -675,23 +675,6 @@ object CometConf extends ShimCometConf {
675675
.doubleConf
676676
.createWithDefault(1.0)
677677

678-
val COMET_SCAN_PREFETCH_ENABLED: ConfigEntry[Boolean] =
679-
conf("spark.comet.scan.preFetch.enabled")
680-
.category(CATEGORY_SCAN)
681-
.doc("Whether to enable pre-fetching feature of CometScan.")
682-
.booleanConf
683-
.createWithDefault(false)
684-
685-
val COMET_SCAN_PREFETCH_THREAD_NUM: ConfigEntry[Int] =
686-
conf("spark.comet.scan.preFetch.threadNum")
687-
.category(CATEGORY_SCAN)
688-
.doc(
689-
"The number of threads running pre-fetching for CometScan. Effective if " +
690-
s"${COMET_SCAN_PREFETCH_ENABLED.key} is enabled. Note that more " +
691-
"pre-fetching threads means more memory requirement to store pre-fetched row groups.")
692-
.intConf
693-
.createWithDefault(2)
694-
695678
val COMET_NATIVE_LOAD_REQUIRED: ConfigEntry[Boolean] = conf("spark.comet.nativeLoadRequired")
696679
.category(CATEGORY_EXEC)
697680
.doc(

common/src/main/scala/org/apache/comet/parquet/CometReaderThreadPool.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,6 @@ abstract class CometReaderThreadPool {
5454

5555
}
5656

57-
// A thread pool used for pre-fetching files.
58-
object CometPrefetchThreadPool extends CometReaderThreadPool {
59-
override def threadNamePrefix: String = "prefetch_thread"
60-
}
61-
6257
// Thread pool used by the Parquet parallel reader
6358
object CometFileReaderThreadPool extends CometReaderThreadPool {
6459
override def threadNamePrefix: String = "file_reader_thread"

spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala

Lines changed: 37 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ import org.apache.comet.vector.CometVector
5757
* in [[org.apache.comet.CometSparkSessionExtensions]]
5858
* - `buildReaderWithPartitionValues`, so Spark calls Comet's Parquet reader to read values.
5959
*/
60-
class CometParquetFileFormat(session: SparkSession, scanImpl: String)
60+
class CometParquetFileFormat(session: SparkSession)
6161
extends ParquetFileFormat
6262
with MetricsSupport
6363
with ShimSQLConf {
@@ -110,8 +110,6 @@ class CometParquetFileFormat(session: SparkSession, scanImpl: String)
110110
// Comet specific configurations
111111
val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf)
112112

113-
val nativeIcebergCompat = scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT
114-
115113
(file: PartitionedFile) => {
116114
val sharedConf = broadcastedHadoopConf.value.value
117115
val footer = FooterReader.readFooter(sharedConf, file)
@@ -135,85 +133,42 @@ class CometParquetFileFormat(session: SparkSession, scanImpl: String)
135133
isCaseSensitive,
136134
datetimeRebaseSpec)
137135

138-
val recordBatchReader =
139-
if (nativeIcebergCompat) {
140-
// We still need the predicate in the conf to allow us to generate row indexes based on
141-
// the actual row groups read
142-
val pushed = if (parquetFilterPushDown) {
143-
filters
144-
// Collects all converted Parquet filter predicates. Notice that not all predicates
145-
// can be converted (`ParquetFilters.createFilter` returns an `Option`). That's why
146-
// a `flatMap` is used here.
147-
.flatMap(parquetFilters.createFilter)
148-
.reduceOption(FilterApi.and)
149-
} else {
150-
None
151-
}
152-
pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p))
153-
val pushedNative = if (parquetFilterPushDown) {
154-
parquetFilters.createNativeFilters(filters)
155-
} else {
156-
None
157-
}
158-
val batchReader = new NativeBatchReader(
159-
sharedConf,
160-
file,
161-
footer,
162-
pushedNative.orNull,
163-
capacity,
164-
requiredSchema,
165-
dataSchema,
166-
isCaseSensitive,
167-
useFieldId,
168-
ignoreMissingIds,
169-
datetimeRebaseSpec.mode == CORRECTED,
170-
partitionSchema,
171-
file.partitionValues,
172-
metrics.asJava,
173-
CometMetricNode(metrics))
174-
try {
175-
batchReader.init()
176-
} catch {
177-
case e: Throwable =>
178-
batchReader.close()
179-
throw e
180-
}
181-
batchReader
182-
} else {
183-
val pushed = if (parquetFilterPushDown) {
184-
filters
185-
// Collects all converted Parquet filter predicates. Notice that not all predicates
186-
// can be converted (`ParquetFilters.createFilter` returns an `Option`). That's why
187-
// a `flatMap` is used here.
188-
.flatMap(parquetFilters.createFilter)
189-
.reduceOption(FilterApi.and)
190-
} else {
191-
None
192-
}
193-
pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p))
194-
195-
val batchReader = new BatchReader(
196-
sharedConf,
197-
file,
198-
footer,
199-
capacity,
200-
requiredSchema,
201-
isCaseSensitive,
202-
useFieldId,
203-
ignoreMissingIds,
204-
datetimeRebaseSpec.mode == CORRECTED,
205-
partitionSchema,
206-
file.partitionValues,
207-
metrics.asJava)
208-
try {
209-
batchReader.init()
210-
} catch {
211-
case e: Throwable =>
212-
batchReader.close()
213-
throw e
214-
}
215-
batchReader
216-
}
136+
val pushed = if (parquetFilterPushDown) {
137+
filters
138+
.flatMap(parquetFilters.createFilter)
139+
.reduceOption(FilterApi.and)
140+
} else {
141+
None
142+
}
143+
pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p))
144+
val pushedNative = if (parquetFilterPushDown) {
145+
parquetFilters.createNativeFilters(filters)
146+
} else {
147+
None
148+
}
149+
val recordBatchReader = new NativeBatchReader(
150+
sharedConf,
151+
file,
152+
footer,
153+
pushedNative.orNull,
154+
capacity,
155+
requiredSchema,
156+
dataSchema,
157+
isCaseSensitive,
158+
useFieldId,
159+
ignoreMissingIds,
160+
datetimeRebaseSpec.mode == CORRECTED,
161+
partitionSchema,
162+
file.partitionValues,
163+
metrics.asJava,
164+
CometMetricNode(metrics))
165+
try {
166+
recordBatchReader.init()
167+
} catch {
168+
case e: Throwable =>
169+
recordBatchReader.close()
170+
throw e
171+
}
217172
val iter = new RecordReaderIterator(recordBatchReader)
218173
try {
219174
iter.asInstanceOf[Iterator[InternalRow]]

0 commit comments

Comments
 (0)