Skip to content

Commit 1441269

Browse files
authored
chore: refactor scalarvalue/encoding using available upstream arrow-rs methods (#19797)
These PRs are available to us now as part of upgrade to arrow-s 57.2.0 (#19355): - apache/arrow-rs#8993 - apache/arrow-rs#9040 Make use of them in some refactorings here.
1 parent 14d919d commit 1441269

2 files changed

Lines changed: 8 additions & 101 deletions

File tree

datafusion/common/src/scalar/mod.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2988,13 +2988,8 @@ impl ScalarValue {
29882988
},
29892989
ScalarValue::Utf8View(e) => match e {
29902990
Some(value) => {
2991-
let mut builder =
2992-
StringViewBuilder::with_capacity(size).with_deduplicate_strings();
2993-
// Replace with upstream arrow-rs code when available:
2994-
// https://github.com/apache/arrow-rs/issues/9034
2995-
for _ in 0..size {
2996-
builder.append_value(value);
2997-
}
2991+
let mut builder = StringViewBuilder::with_capacity(size);
2992+
builder.try_append_value_n(value, size)?;
29982993
let array = builder.finish();
29992994
Arc::new(array)
30002995
}
@@ -3012,11 +3007,8 @@ impl ScalarValue {
30123007
},
30133008
ScalarValue::BinaryView(e) => match e {
30143009
Some(value) => {
3015-
let mut builder =
3016-
BinaryViewBuilder::with_capacity(size).with_deduplicate_strings();
3017-
for _ in 0..size {
3018-
builder.append_value(value);
3019-
}
3010+
let mut builder = BinaryViewBuilder::with_capacity(size);
3011+
builder.try_append_value_n(value, size)?;
30203012
let array = builder.finish();
30213013
Arc::new(array)
30223014
}

datafusion/functions/src/encoding/inner.rs

Lines changed: 4 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
2020
use arrow::{
2121
array::{
22-
Array, ArrayRef, AsArray, BinaryArrayType, FixedSizeBinaryArray,
23-
GenericBinaryArray, GenericStringArray, OffsetSizeTrait,
22+
Array, ArrayRef, AsArray, BinaryArrayType, GenericBinaryArray,
23+
GenericStringArray, OffsetSizeTrait,
2424
},
2525
datatypes::DataType,
2626
};
@@ -239,7 +239,7 @@ fn encode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue> {
239239
encoding.encode_array::<_, i64>(&array.as_binary::<i64>())
240240
}
241241
DataType::FixedSizeBinary(_) => {
242-
encoding.encode_fsb_array(array.as_fixed_size_binary())
242+
encoding.encode_array::<_, i32>(&array.as_fixed_size_binary())
243243
}
244244
dt => {
245245
internal_err!("Unexpected data type for encode: {dt}")
@@ -307,7 +307,7 @@ fn decode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue> {
307307
let array = array.as_fixed_size_binary();
308308
// TODO: could we be more conservative by accounting for nulls?
309309
let estimate = array.len().saturating_mul(*size as usize);
310-
encoding.decode_fsb_array(array, estimate)
310+
encoding.decode_array::<_, i32>(&array, estimate)
311311
}
312312
dt => {
313313
internal_err!("Unexpected data type for decode: {dt}")
@@ -404,24 +404,6 @@ impl Encoding {
404404
}
405405
}
406406

407-
// TODO: refactor this away once https://github.com/apache/arrow-rs/pull/8993 lands
408-
fn encode_fsb_array(self, array: &FixedSizeBinaryArray) -> Result<ArrayRef> {
409-
match self {
410-
Self::Base64 => {
411-
let array: GenericStringArray<i32> = array
412-
.iter()
413-
.map(|x| x.map(|x| BASE64_ENGINE.encode(x)))
414-
.collect();
415-
Ok(Arc::new(array))
416-
}
417-
Self::Hex => {
418-
let array: GenericStringArray<i32> =
419-
array.iter().map(|x| x.map(hex::encode)).collect();
420-
Ok(Arc::new(array))
421-
}
422-
}
423-
}
424-
425407
// OutputOffset important to ensure Large types output Large arrays
426408
fn decode_array<'a, InputBinaryArray, OutputOffset>(
427409
self,
@@ -461,73 +443,6 @@ impl Encoding {
461443
}
462444
}
463445
}
464-
465-
// TODO: refactor this away once https://github.com/apache/arrow-rs/pull/8993 lands
466-
fn decode_fsb_array(
467-
self,
468-
value: &FixedSizeBinaryArray,
469-
approx_data_size: usize,
470-
) -> Result<ArrayRef> {
471-
fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
472-
// only write input / 2 bytes to buf
473-
let out_len = input.len() / 2;
474-
let buf = &mut buf[..out_len];
475-
hex::decode_to_slice(input, buf)
476-
.map_err(|e| exec_datafusion_err!("Failed to decode from hex: {e}"))?;
477-
Ok(out_len)
478-
}
479-
480-
fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
481-
BASE64_ENGINE
482-
.decode_slice(input, buf)
483-
.map_err(|e| exec_datafusion_err!("Failed to decode from base64: {e}"))
484-
}
485-
486-
fn delegated_decode<DecodeFunction>(
487-
decode: DecodeFunction,
488-
input: &FixedSizeBinaryArray,
489-
conservative_upper_bound_size: usize,
490-
) -> Result<ArrayRef>
491-
where
492-
DecodeFunction: Fn(&[u8], &mut [u8]) -> Result<usize>,
493-
{
494-
let mut values = vec![0; conservative_upper_bound_size];
495-
let mut offsets = OffsetBufferBuilder::new(input.len());
496-
let mut total_bytes_decoded = 0;
497-
for v in input.iter() {
498-
if let Some(v) = v {
499-
let cursor = &mut values[total_bytes_decoded..];
500-
let decoded = decode(v, cursor)?;
501-
total_bytes_decoded += decoded;
502-
offsets.push_length(decoded);
503-
} else {
504-
offsets.push_length(0);
505-
}
506-
}
507-
// We reserved an upper bound size for the values buffer, but we only use the actual size
508-
values.truncate(total_bytes_decoded);
509-
let binary_array = GenericBinaryArray::<i32>::try_new(
510-
offsets.finish(),
511-
Buffer::from_vec(values),
512-
input.nulls().cloned(),
513-
)?;
514-
Ok(Arc::new(binary_array))
515-
}
516-
517-
match self {
518-
Self::Base64 => {
519-
let upper_bound = base64::decoded_len_estimate(approx_data_size);
520-
delegated_decode(base64_decode, value, upper_bound)
521-
}
522-
Self::Hex => {
523-
// Calculate the upper bound for decoded byte size
524-
// For hex encoding, each pair of hex characters (2 bytes) represents 1 byte when decoded
525-
// So the upper bound is half the length of the input values.
526-
let upper_bound = approx_data_size / 2;
527-
delegated_decode(hex_decode, value, upper_bound)
528-
}
529-
}
530-
}
531446
}
532447

533448
fn delegated_decode<'a, DecodeFunction, InputBinaryArray, OutputOffset>(

0 commit comments

Comments
 (0)