2424import java .net .URI ;
2525import java .net .URISyntaxException ;
2626import java .util .*;
27- import java .util .concurrent .Callable ;
28- import java .util .concurrent .ExecutorService ;
29- import java .util .concurrent .Future ;
30- import java .util .concurrent .LinkedBlockingQueue ;
3127
3228import scala .Option ;
3329
3632
3733import org .apache .arrow .memory .BufferAllocator ;
3834import org .apache .arrow .memory .RootAllocator ;
39- import org .apache .commons .lang3 .tuple .Pair ;
4035import org .apache .hadoop .conf .Configuration ;
41- import org .apache .hadoop .fs .FileSystem ;
4236import org .apache .hadoop .fs .Path ;
4337import org .apache .hadoop .mapreduce .InputSplit ;
4438import org .apache .hadoop .mapreduce .RecordReader ;
8781 * reader.close();
8882 * }
8983 * </pre>
84+ *
85+ * @deprecated since 0.14.0. This class is kept for Iceberg compatibility only.
9086 */
87+ @ Deprecated
9188@ IcebergApi
9289public class BatchReader extends RecordReader <Void , ColumnarBatch > implements Closeable {
9390 private static final Logger LOG = LoggerFactory .getLogger (FileReader .class );
@@ -110,8 +107,6 @@ public class BatchReader extends RecordReader<Void, ColumnarBatch> implements Cl
110107 protected AbstractColumnReader [] columnReaders ;
111108 private CometSchemaImporter importer ;
112109 protected ColumnarBatch currentBatch ;
113- private Future <Option <Throwable >> prefetchTask ;
114- private LinkedBlockingQueue <Pair <PageReadStore , Long >> prefetchQueue ;
115110 private FileReader fileReader ;
116111 private boolean [] missingColumns ;
117112 protected boolean isInitialized ;
@@ -363,26 +358,7 @@ public void init() throws URISyntaxException, IOException {
363358 }
364359 }
365360
366- // Pre-fetching
367- boolean preFetchEnabled =
368- conf .getBoolean (
369- CometConf .COMET_SCAN_PREFETCH_ENABLED ().key (),
370- (boolean ) CometConf .COMET_SCAN_PREFETCH_ENABLED ().defaultValue ().get ());
371-
372- if (preFetchEnabled ) {
373- LOG .info ("Prefetch enabled for BatchReader." );
374- this .prefetchQueue = new LinkedBlockingQueue <>();
375- }
376-
377361 isInitialized = true ;
378- synchronized (this ) {
379- // if prefetch is enabled, `init()` is called in separate thread. When
380- // `BatchReader.nextBatch()` is called asynchronously, it is possibly that
381- // `init()` is not called or finished. We need to hold on `nextBatch` until
382- // initialization of `BatchReader` is done. Once we are close to finish
383- // initialization, we notify the waiting thread of `nextBatch` to continue.
384- notifyAll ();
385- }
386362 }
387363
388364 /**
@@ -436,51 +412,13 @@ public ColumnarBatch currentBatch() {
436412 return currentBatch ;
437413 }
438414
439- // Only for testing
440- public Future <Option <Throwable >> getPrefetchTask () {
441- return this .prefetchTask ;
442- }
443-
444- // Only for testing
445- public LinkedBlockingQueue <Pair <PageReadStore , Long >> getPrefetchQueue () {
446- return this .prefetchQueue ;
447- }
448-
449415 /**
450416 * Loads the next batch of rows.
451417 *
452418 * @return true if there are no more rows to read, false otherwise.
453419 */
454420 public boolean nextBatch () throws IOException {
455- if (this .prefetchTask == null ) {
456- Preconditions .checkState (isInitialized , "init() should be called first!" );
457- } else {
458- // If prefetch is enabled, this reader will be initialized asynchronously from a
459- // different thread. Wait until it is initialized
460- while (!isInitialized ) {
461- synchronized (this ) {
462- try {
463- // Wait until initialization of current `BatchReader` is finished (i.e., `init()`),
464- // is done. It is possibly that `init()` is done after entering this while loop,
465- // so a short timeout is given.
466- wait (100 );
467-
468- // Checks if prefetch task is finished. If so, tries to get exception if any.
469- if (prefetchTask .isDone ()) {
470- Option <Throwable > exception = prefetchTask .get ();
471- if (exception .isDefined ()) {
472- throw exception .get ();
473- }
474- }
475- } catch (RuntimeException e ) {
476- // Spark will check certain exception e.g. `SchemaColumnConvertNotSupportedException`.
477- throw e ;
478- } catch (Throwable e ) {
479- throw new IOException (e );
480- }
481- }
482- }
483- }
421+ Preconditions .checkState (isInitialized , "init() should be called first!" );
484422
485423 if (rowsRead >= totalRowCount ) return false ;
486424 boolean hasMore ;
@@ -547,7 +485,6 @@ public void close() throws IOException {
547485 }
548486 }
549487
550- @ SuppressWarnings ("deprecation" )
551488 private boolean loadNextRowGroupIfNecessary () throws Throwable {
552489 // More rows can be read from loaded row group. No need to load next one.
553490 if (rowsRead != totalRowsLoaded ) return true ;
@@ -556,21 +493,7 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable {
556493 SQLMetric numRowGroupsMetric = metrics .get ("ParquetRowGroups" );
557494 long startNs = System .nanoTime ();
558495
559- PageReadStore rowGroupReader = null ;
560- if (prefetchTask != null && prefetchQueue != null ) {
561- // Wait for pre-fetch task to finish.
562- Pair <PageReadStore , Long > rowGroupReaderPair = prefetchQueue .take ();
563- rowGroupReader = rowGroupReaderPair .getLeft ();
564-
565- // Update incremental byte read metric. Because this metric in Spark is maintained
566- // by thread local variable, we need to manually update it.
567- // TODO: We may expose metrics from `FileReader` and get from it directly.
568- long incBytesRead = rowGroupReaderPair .getRight ();
569- FileSystem .getAllStatistics ().stream ()
570- .forEach (statistic -> statistic .incrementBytesRead (incBytesRead ));
571- } else {
572- rowGroupReader = fileReader .readNextRowGroup ();
573- }
496+ PageReadStore rowGroupReader = fileReader .readNextRowGroup ();
574497
575498 if (rowGroupTimeMetric != null ) {
576499 rowGroupTimeMetric .add (System .nanoTime () - startNs );
@@ -608,48 +531,4 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable {
608531 totalRowsLoaded += rowGroupReader .getRowCount ();
609532 return true ;
610533 }
611-
612- // Submits a prefetch task for this reader.
613- public void submitPrefetchTask (ExecutorService threadPool ) {
614- this .prefetchTask = threadPool .submit (new PrefetchTask ());
615- }
616-
617- // A task for prefetching parquet row groups.
618- private class PrefetchTask implements Callable <Option <Throwable >> {
619- private long getBytesRead () {
620- return FileSystem .getAllStatistics ().stream ()
621- .mapToLong (s -> s .getThreadStatistics ().getBytesRead ())
622- .sum ();
623- }
624-
625- @ Override
626- public Option <Throwable > call () throws Exception {
627- // Gets the bytes read so far.
628- long baseline = getBytesRead ();
629-
630- try {
631- init ();
632-
633- while (true ) {
634- PageReadStore rowGroupReader = fileReader .readNextRowGroup ();
635-
636- if (rowGroupReader == null ) {
637- // Reaches the end of row groups.
638- return Option .empty ();
639- } else {
640- long incBytesRead = getBytesRead () - baseline ;
641-
642- prefetchQueue .add (Pair .of (rowGroupReader , incBytesRead ));
643- }
644- }
645- } catch (Throwable e ) {
646- // Returns exception thrown from the reader. The reader will re-throw it.
647- return Option .apply (e );
648- } finally {
649- if (fileReader != null ) {
650- fileReader .closeStream ();
651- }
652- }
653- }
654- }
655534}
0 commit comments