Skip to content

Commit 51b02f1

Browse files
lyang24claudealamb
authored
[Parquet] perf: preallocate capacity for ArrayReaderBuilder (#9093)
# Which issue does this PR close? - Closes #9059. # Rationale for this change reduce allocation cost mentioned in #9059 from experiment: Pre-allocation overhead may offset the savings from avoiding incremental growth # What changes are included in this PR? - add with_capacity method to ValuesBuffer trait, and remove defaults to enforce the capacity hint is required for ArrayReaderBuilder. - The capacity hint will be passed down to GenericRecordReader to preallocate the buffer. # Are there any user-facing changes? yes ArrayReaders needs an extra capacity variable to indicate the preferred batch size and we will provision buffer with this capacity. --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 89b1497 commit 51b02f1

16 files changed

Lines changed: 381 additions & 167 deletions

File tree

parquet/benches/arrow_reader.rs

Lines changed: 71 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use parquet::arrow::array_reader::{
2727
ListArrayReader, make_byte_array_reader, make_byte_view_array_reader,
2828
make_fixed_len_byte_array_reader,
2929
};
30+
use parquet::arrow::arrow_reader::DEFAULT_BATCH_SIZE;
3031
use parquet::basic::Type;
3132
use parquet::data_type::{ByteArray, FixedLenByteArrayType};
3233
use parquet::util::{DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator};
@@ -709,15 +710,23 @@ fn create_primitive_array_reader(
709710
use parquet::arrow::array_reader::PrimitiveArrayReader;
710711
match column_desc.physical_type() {
711712
Type::INT32 => {
712-
let reader =
713-
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
714-
.unwrap();
713+
let reader = PrimitiveArrayReader::<Int32Type>::new(
714+
Box::new(page_iterator),
715+
column_desc,
716+
None,
717+
DEFAULT_BATCH_SIZE,
718+
)
719+
.unwrap();
715720
Box::new(reader)
716721
}
717722
Type::INT64 => {
718-
let reader =
719-
PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, None)
720-
.unwrap();
723+
let reader = PrimitiveArrayReader::<Int64Type>::new(
724+
Box::new(page_iterator),
725+
column_desc,
726+
None,
727+
DEFAULT_BATCH_SIZE,
728+
)
729+
.unwrap();
721730
Box::new(reader)
722731
}
723732
_ => unreachable!(),
@@ -730,9 +739,13 @@ fn create_f16_by_bytes_reader(
730739
) -> Box<dyn ArrayReader> {
731740
let physical_type = column_desc.physical_type();
732741
match physical_type {
733-
Type::FIXED_LEN_BYTE_ARRAY => {
734-
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
735-
}
742+
Type::FIXED_LEN_BYTE_ARRAY => make_fixed_len_byte_array_reader(
743+
Box::new(page_iterator),
744+
column_desc,
745+
None,
746+
DEFAULT_BATCH_SIZE,
747+
)
748+
.unwrap(),
736749
_ => unimplemented!(),
737750
}
738751
}
@@ -743,12 +756,20 @@ fn create_decimal_by_bytes_reader(
743756
) -> Box<dyn ArrayReader> {
744757
let physical_type = column_desc.physical_type();
745758
match physical_type {
746-
Type::BYTE_ARRAY => {
747-
make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
748-
}
749-
Type::FIXED_LEN_BYTE_ARRAY => {
750-
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
751-
}
759+
Type::BYTE_ARRAY => make_byte_array_reader(
760+
Box::new(page_iterator),
761+
column_desc,
762+
None,
763+
DEFAULT_BATCH_SIZE,
764+
)
765+
.unwrap(),
766+
Type::FIXED_LEN_BYTE_ARRAY => make_fixed_len_byte_array_reader(
767+
Box::new(page_iterator),
768+
column_desc,
769+
None,
770+
DEFAULT_BATCH_SIZE,
771+
)
772+
.unwrap(),
752773
_ => unimplemented!(),
753774
}
754775
}
@@ -757,28 +778,52 @@ fn create_fixed_len_byte_array_reader(
757778
page_iterator: impl PageIterator + 'static,
758779
column_desc: ColumnDescPtr,
759780
) -> Box<dyn ArrayReader> {
760-
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
781+
make_fixed_len_byte_array_reader(
782+
Box::new(page_iterator),
783+
column_desc,
784+
None,
785+
DEFAULT_BATCH_SIZE,
786+
)
787+
.unwrap()
761788
}
762789

763790
fn create_byte_array_reader(
764791
page_iterator: impl PageIterator + 'static,
765792
column_desc: ColumnDescPtr,
766793
) -> Box<dyn ArrayReader> {
767-
make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
794+
make_byte_array_reader(
795+
Box::new(page_iterator),
796+
column_desc,
797+
None,
798+
DEFAULT_BATCH_SIZE,
799+
)
800+
.unwrap()
768801
}
769802

770803
fn create_byte_view_array_reader(
771804
page_iterator: impl PageIterator + 'static,
772805
column_desc: ColumnDescPtr,
773806
) -> Box<dyn ArrayReader> {
774-
make_byte_view_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
807+
make_byte_view_array_reader(
808+
Box::new(page_iterator),
809+
column_desc,
810+
None,
811+
DEFAULT_BATCH_SIZE,
812+
)
813+
.unwrap()
775814
}
776815

777816
fn create_string_view_byte_array_reader(
778817
page_iterator: impl PageIterator + 'static,
779818
column_desc: ColumnDescPtr,
780819
) -> Box<dyn ArrayReader> {
781-
make_byte_view_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
820+
make_byte_view_array_reader(
821+
Box::new(page_iterator),
822+
column_desc,
823+
None,
824+
DEFAULT_BATCH_SIZE,
825+
)
826+
.unwrap()
782827
}
783828

784829
fn create_string_byte_array_dictionary_reader(
@@ -788,8 +833,13 @@ fn create_string_byte_array_dictionary_reader(
788833
use parquet::arrow::array_reader::make_byte_array_dictionary_reader;
789834
let arrow_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
790835

791-
make_byte_array_dictionary_reader(Box::new(page_iterator), column_desc, Some(arrow_type))
792-
.unwrap()
836+
make_byte_array_dictionary_reader(
837+
Box::new(page_iterator),
838+
column_desc,
839+
Some(arrow_type),
840+
DEFAULT_BATCH_SIZE,
841+
)
842+
.unwrap()
793843
}
794844

795845
fn create_string_list_reader(

parquet/src/arrow/array_reader/builder.rs

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::arrow::array_reader::{
3333
NullArrayReader, PrimitiveArrayReader, RowGroups, StructArrayReader,
3434
make_byte_array_dictionary_reader, make_byte_array_reader,
3535
};
36+
use crate::arrow::arrow_reader::DEFAULT_BATCH_SIZE;
3637
use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
3738
use crate::arrow::schema::{ParquetField, ParquetFieldType, VirtualColumnType};
3839
use crate::basic::Type as PhysicalType;
@@ -96,18 +97,30 @@ pub struct ArrayReaderBuilder<'a> {
9697
parquet_metadata: Option<&'a ParquetMetaData>,
9798
/// metrics
9899
metrics: &'a ArrowReaderMetrics,
100+
/// Batch size for pre-allocating internal buffers
101+
batch_size: usize,
99102
}
100103

101104
impl<'a> ArrayReaderBuilder<'a> {
105+
/// Create a new `ArrayReaderBuilder`
102106
pub fn new(row_groups: &'a dyn RowGroups, metrics: &'a ArrowReaderMetrics) -> Self {
103107
Self {
104108
row_groups,
105109
cache_options: None,
106110
parquet_metadata: None,
107111
metrics,
112+
batch_size: DEFAULT_BATCH_SIZE,
108113
}
109114
}
110115

116+
/// Set the batch size used to pre-allocate internal buffers.
117+
///
118+
/// This avoids reallocations when reading the first batch of data.
119+
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
120+
self.batch_size = batch_size;
121+
self
122+
}
123+
111124
/// Add cache options to the builder
112125
pub fn with_cache_options(mut self, cache_options: Option<&'a CacheOptions<'a>>) -> Self {
113126
self.cache_options = cache_options;
@@ -414,55 +427,78 @@ impl<'a> ArrayReaderBuilder<'a> {
414427
page_iterator,
415428
column_desc,
416429
arrow_type,
430+
self.batch_size,
417431
)?) as _,
418432
PhysicalType::INT32 => {
419433
if let Some(DataType::Null) = arrow_type {
420434
Box::new(NullArrayReader::<Int32Type>::new(
421435
page_iterator,
422436
column_desc,
437+
self.batch_size,
423438
)?) as _
424439
} else {
425440
Box::new(PrimitiveArrayReader::<Int32Type>::new(
426441
page_iterator,
427442
column_desc,
428443
arrow_type,
444+
self.batch_size,
429445
)?) as _
430446
}
431447
}
432448
PhysicalType::INT64 => Box::new(PrimitiveArrayReader::<Int64Type>::new(
433449
page_iterator,
434450
column_desc,
435451
arrow_type,
452+
self.batch_size,
436453
)?) as _,
437454
PhysicalType::INT96 => Box::new(PrimitiveArrayReader::<Int96Type>::new(
438455
page_iterator,
439456
column_desc,
440457
arrow_type,
458+
self.batch_size,
441459
)?) as _,
442460
PhysicalType::FLOAT => Box::new(PrimitiveArrayReader::<FloatType>::new(
443461
page_iterator,
444462
column_desc,
445463
arrow_type,
464+
self.batch_size,
446465
)?) as _,
447466
PhysicalType::DOUBLE => Box::new(PrimitiveArrayReader::<DoubleType>::new(
448467
page_iterator,
449468
column_desc,
450469
arrow_type,
470+
self.batch_size,
451471
)?) as _,
452472
PhysicalType::BYTE_ARRAY => match arrow_type {
453-
Some(DataType::Dictionary(_, _)) => {
454-
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
473+
Some(DataType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
474+
page_iterator,
475+
column_desc,
476+
arrow_type,
477+
self.batch_size,
478+
)?,
479+
Some(DataType::Utf8View | DataType::BinaryView) => make_byte_view_array_reader(
480+
page_iterator,
481+
column_desc,
482+
arrow_type,
483+
self.batch_size,
484+
)?,
485+
_ => {
486+
make_byte_array_reader(page_iterator, column_desc, arrow_type, self.batch_size)?
455487
}
456-
Some(DataType::Utf8View | DataType::BinaryView) => {
457-
make_byte_view_array_reader(page_iterator, column_desc, arrow_type)?
458-
}
459-
_ => make_byte_array_reader(page_iterator, column_desc, arrow_type)?,
460488
},
461489
PhysicalType::FIXED_LEN_BYTE_ARRAY => match arrow_type {
462-
Some(DataType::Dictionary(_, _)) => {
463-
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
464-
}
465-
_ => make_fixed_len_byte_array_reader(page_iterator, column_desc, arrow_type)?,
490+
Some(DataType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
491+
page_iterator,
492+
column_desc,
493+
arrow_type,
494+
self.batch_size,
495+
)?,
496+
_ => make_fixed_len_byte_array_reader(
497+
page_iterator,
498+
column_desc,
499+
arrow_type,
500+
self.batch_size,
501+
)?,
466502
},
467503
};
468504
Ok(Some(reader))
@@ -533,6 +569,7 @@ mod tests {
533569

534570
let metrics = ArrowReaderMetrics::disabled();
535571
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
572+
.with_batch_size(DEFAULT_BATCH_SIZE)
536573
.build_array_reader(fields.as_ref(), &mask)
537574
.unwrap();
538575

@@ -566,6 +603,7 @@ mod tests {
566603

567604
let metrics = ArrowReaderMetrics::disabled();
568605
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
606+
.with_batch_size(DEFAULT_BATCH_SIZE)
569607
.with_parquet_metadata(file_reader.metadata())
570608
.build_array_reader(fields.as_ref(), &mask)
571609
.unwrap();

0 commit comments

Comments
 (0)