diff --git a/src/query/expression/src/kernels/stream_partition.rs b/src/query/expression/src/kernels/stream_partition.rs index 7b9af52186240..8eef73f92b749 100644 --- a/src/query/expression/src/kernels/stream_partition.rs +++ b/src/query/expression/src/kernels/stream_partition.rs @@ -14,38 +14,15 @@ use std::collections::HashSet; -use databend_common_column::bitmap::Bitmap; -use databend_common_column::bitmap::MutableBitmap; -use databend_common_column::buffer::Buffer; -use databend_common_column::types::Index; - -use crate::BlockEntry; -use crate::Column; -use crate::ColumnBuilder; +use crate::ChunkIndex; use crate::DataBlock; -use crate::types::AnyType; -use crate::types::ArrayColumn; -use crate::types::BinaryColumn; -use crate::types::DecimalColumn; -use crate::types::DecimalColumnBuilder; -use crate::types::NullableColumn; -use crate::types::NumberColumn; -use crate::types::NumberColumnBuilder; -use crate::types::OpaqueColumn; -use crate::types::OpaqueColumnBuilder; -use crate::types::StringColumn; -use crate::types::VectorColumn; -use crate::types::VectorColumnBuilder; -use crate::types::array::ArrayColumnBuilder; -use crate::types::binary::BinaryColumnBuilder; -use crate::types::nullable::NullableColumnBuilder; -use crate::types::string::StringColumnBuilder; -use crate::with_decimal_type; -use crate::with_number_mapped_type; +use crate::DataBlockVec; struct PartitionBlockBuilder { num_rows: usize, - columns_builder: Vec, + estimated_bytes: usize, + data_blocks: DataBlockVec, + indices: ChunkIndex, } pub struct BlockPartitionStream { @@ -91,49 +68,44 @@ impl BlockPartitionStream { if !self.initialize { self.initialize = true; - self.partitions.reserve(self.scatter_size); for _ in 0..self.scatter_size { - let mut columns_builder = Vec::with_capacity(block.num_columns()); - - for column in block.columns() { - let data_type = column.data_type(); - columns_builder.push(ColumnBuilder::with_capacity(&data_type, 0)); - } - - let block_builder = PartitionBlockBuilder { + self.partitions.push(PartitionBlockBuilder { num_rows: 0, - columns_builder, - }; - self.partitions.push(block_builder); + estimated_bytes: 0, + data_blocks: DataBlockVec::with_capacity(1), + indices: ChunkIndex::default(), + }); } } - let columns = block - .take_columns() - .into_iter() - .map(|x| x.to_column()) - .collect::>(); - + let block_num_rows = block.num_rows(); + let block_memory_size = block.memory_size(); let scatter_indices = DataBlock::divide_indices_by_scatter_size(&indices, self.scatter_size); - for (partition_id, indices) in scatter_indices.iter().enumerate() { - self.partitions[partition_id].num_rows += indices.len(); - } - - for (column_idx, column) in columns.into_iter().enumerate() { - for (partition_id, indices) in scatter_indices.iter().enumerate() { - if indices.is_empty() { - continue; - } - - let partition = &mut self.partitions[partition_id]; - let column_builder = &mut partition.columns_builder[column_idx]; - copy_column(indices, &column, column_builder); + for (partition_id, rows) in scatter_indices.iter().enumerate() { + if rows.is_empty() { + continue; } - drop(column); + let partition = &mut self.partitions[partition_id]; + let block_id = partition.data_blocks.block_rows().len() as u32; + + partition + .data_blocks + .push(block.clone()) + .expect("BlockPartitionStream received blocks with mismatched schema"); + push_scatter_rows(&mut partition.indices, block_id, rows); + partition.num_rows += rows.len(); + partition.estimated_bytes = + partition + .estimated_bytes + .saturating_add(estimate_partition_bytes( + block_memory_size, + block_num_rows, + rows.len(), + )); } if !out_ready { @@ -142,29 +114,12 @@ impl BlockPartitionStream { let mut ready_blocks = Vec::with_capacity(self.partitions.len()); for (id, partition) in self.partitions.iter_mut().enumerate() { - let memory_size = partition - .columns_builder - .iter() - .map(|x| x.memory_size()) - .sum::(); - - let rows = partition.num_rows; - - if memory_size >= self.bytes_threshold || rows >= self.rows_threshold { - let mut columns = Vec::with_capacity(partition.columns_builder.len()); - let columns_builder = std::mem::take(&mut partition.columns_builder); - partition.columns_builder.reserve(columns_builder.len()); - - for column_builder in columns_builder { - let historical_size = column_builder.len(); - let data_type = column_builder.data_type(); - let new_builder = ColumnBuilder::with_capacity(&data_type, historical_size); - partition.columns_builder.push(new_builder); - columns.push(BlockEntry::from(column_builder.build())); + if partition.estimated_bytes >= self.bytes_threshold + || partition.num_rows >= self.rows_threshold + { + if let Some(block) = take_partition_data(partition) { + ready_blocks.push((id, block)); } - - partition.num_rows = 0; - ready_blocks.push((id, DataBlock::new(columns, rows))); } } @@ -192,7 +147,6 @@ impl BlockPartitionStream { } let capacity = self.partitions.len() - excluded.len(); - let mut take_blocks = Vec::with_capacity(capacity); for (id, partition) in self.partitions.iter_mut().enumerate() { @@ -200,21 +154,9 @@ impl BlockPartitionStream { continue; } - let mut columns = Vec::with_capacity(partition.columns_builder.len()); - let columns_builder = std::mem::take(&mut partition.columns_builder); - partition.columns_builder.reserve(columns_builder.len()); - - for column_builder in columns_builder { - let historical_size = column_builder.len(); - let data_type = column_builder.data_type(); - let new_builder = ColumnBuilder::with_capacity(&data_type, historical_size); - partition.columns_builder.push(new_builder); - columns.push(BlockEntry::from(column_builder.build())); + if let Some(block) = take_partition_data(partition) { + take_blocks.push((id, block)); } - - let num_rows = partition.num_rows; - partition.num_rows = 0; - take_blocks.push((id, DataBlock::new(columns, num_rows))); } take_blocks @@ -225,315 +167,56 @@ impl BlockPartitionStream { return None; } - let partition = &mut self.partitions[partition_id]; - - let num_rows = partition.num_rows; - - if num_rows == 0 { - return None; - } - - let mut columns = Vec::with_capacity(partition.columns_builder.len()); - let columns_builder = std::mem::take(&mut partition.columns_builder); - partition.columns_builder.reserve(columns_builder.len()); - - for column_builder in columns_builder { - let data_type = column_builder.data_type(); - let new_builder = ColumnBuilder::with_capacity(&data_type, 0); - partition.columns_builder.push(new_builder); - columns.push(BlockEntry::from(column_builder.build())); - } - - partition.num_rows = 0; - Some(DataBlock::new(columns, num_rows)) + take_partition_data(&mut self.partitions[partition_id]) } } -pub fn copy_column(indices: &[I], from: &Column, to: &mut ColumnBuilder) { - match to { - ColumnBuilder::EmptyArray { len } => match from { - Column::EmptyArray { .. } => *len += indices.len(), - Column::Array(column) => { - let capacity = *len + column.len(); - match ColumnBuilder::with_capacity(&from.data_type(), capacity) { - ColumnBuilder::Array(mut builder) => { - builder.offsets.extend(vec![0; *len]); - copy_array(&mut builder, column, indices); - *to = ColumnBuilder::Array(builder); - } - _ => unreachable!( - "ColumnBuilder::with_capacity for Array type should return ColumnBuilder::Array, \ - but got different variant. data_type: {}, capacity: {}", - from.data_type(), - capacity - ), - } - } - _ => unreachable!( - "EmptyArray builder can only copy from EmptyArray or Array, but got from type: {}", - from.data_type() - ), - }, - ColumnBuilder::Array(builder) => match from { - Column::EmptyArray { .. } => { - for _ in 0..indices.len() { - builder.commit_row(); - } - } - Column::Array(column) => { - copy_array(builder, column, indices); - } - _ => unreachable!( - "Array builder can only copy from EmptyArray or Array, but got from type: {}", - from.data_type() - ), - }, - ColumnBuilder::Null { len } => match from { - Column::Null { .. } => *len += indices.len(), - Column::Nullable(column) => { - let capacity = *len + column.len(); - - match ColumnBuilder::with_capacity(&from.data_type(), capacity) { - ColumnBuilder::Nullable(mut builder) => { - builder.push_repeat_null(*len); - copy_nullable(&mut builder, column, indices); - *to = ColumnBuilder::Nullable(builder); - } - _ => unreachable!( - "ColumnBuilder::with_capacity for Nullable type should return ColumnBuilder::Nullable, \ - but got different variant. data_type: {}, capacity: {}", - from.data_type(), - capacity - ), - } - } - _ => unreachable!( - "Null builder can only copy from Null or Nullable, but got from type: {}", - from.data_type() - ), - }, - ColumnBuilder::Nullable(builder) => match from { - Column::Null { .. } => { - builder.push_repeat_null(indices.len()); - } - Column::Nullable(column) => { - copy_nullable(builder, column, indices); - } - _ => unreachable!( - "Nullable builder can only copy from Null or Nullable, but got from type: {}", - from.data_type() - ), - }, - ColumnBuilder::EmptyMap { len } => match from { - Column::EmptyMap { .. } => *len += indices.len(), - Column::Map(column) => { - let capacity = *len + indices.len(); - match ColumnBuilder::with_capacity(&from.data_type(), capacity) { - ColumnBuilder::Map(mut builder) => { - builder.offsets.extend(vec![0; *len]); - copy_array(&mut builder, column, indices); - *to = ColumnBuilder::Map(builder); - } - _ => unreachable!( - "ColumnBuilder::with_capacity for Map type should return ColumnBuilder::Map, \ - but got different variant. data_type: {}, capacity: {}", - from.data_type(), - capacity - ), - } - } - _ => unreachable!( - "EmptyMap builder can only copy from EmptyMap or Map, but got from type: {}", - from.data_type() - ), - }, - ColumnBuilder::Map(builder) => match from { - Column::Map(column) => { - copy_array(builder, column, indices); - } - Column::EmptyMap { .. } => { - for _ in 0..indices.len() { - builder.commit_row(); - } - } - _ => unreachable!( - "Map builder can only copy from EmptyMap or Map, but got from type: {}", - from.data_type() - ), - }, - _ => match (to, from) { - (ColumnBuilder::Number(builder), Column::Number(number_column)) => { - with_number_mapped_type!(|NUM_TYPE| match (builder, number_column) { - (NumberColumnBuilder::NUM_TYPE(b), NumberColumn::NUM_TYPE(c)) => { - copy_primitive_type(b, c, indices); - } - _ => unreachable!(), - }) - } - (ColumnBuilder::Decimal(builder), Column::Decimal(column)) => { - with_decimal_type!(|DECIMAL_TYPE| match (builder, column) { - ( - DecimalColumnBuilder::DECIMAL_TYPE(builder, _), - DecimalColumn::DECIMAL_TYPE(column, _), - ) => { - copy_primitive_type(builder, column, indices); - } - _ => unreachable!(), - }); - } - (ColumnBuilder::Boolean(builder), Column::Boolean(column)) => { - copy_boolean(builder, column, indices) - } - (ColumnBuilder::Date(builder), Column::Date(column)) => { - copy_primitive_type(builder, column, indices); - } - (ColumnBuilder::Interval(builder), Column::Interval(column)) => { - copy_primitive_type(builder, column, indices); - } - (ColumnBuilder::Timestamp(builder), Column::Timestamp(column)) => { - copy_primitive_type(builder, column, indices); - } - (ColumnBuilder::Bitmap(builder), Column::Bitmap(column)) => { - copy_binary(builder, column, indices); - } - (ColumnBuilder::Binary(builder), Column::Binary(column)) => { - copy_binary(builder, column, indices); - } - (ColumnBuilder::Variant(builder), Column::Variant(column)) => { - copy_binary(builder, column, indices); - } - (ColumnBuilder::Geometry(builder), Column::Geometry(column)) => { - copy_binary(builder, column, indices); - } - (ColumnBuilder::Geography(builder), Column::Geography(column)) => { - copy_binary(builder, &column.0, indices); - } - (ColumnBuilder::String(builder), Column::String(column)) => { - copy_string(builder, column, indices); - } - (ColumnBuilder::Vector(builder), Column::Vector(column)) => { - copy_vector(indices, builder, column); - } - (ColumnBuilder::Opaque(builder), Column::Opaque(column)) => { - copy_opaque(indices, builder, column); - } - (ColumnBuilder::Tuple(builders), Column::Tuple(columns)) => { - for (builder, column) in builders.iter_mut().zip(columns.iter()) { - copy_column(indices, column, builder) - } - } - (to, from) => unreachable!( - "Unsupported column builder type for copy_column. to type: {:?}, from type: {}", - to.data_type(), - from.data_type() - ), - }, +fn push_scatter_rows(indices: &mut ChunkIndex, block_id: u32, rows: &[u32]) { + let Some((&first, tail)) = rows.split_first() else { + return; }; -} -fn copy_boolean(to: &mut MutableBitmap, from: &Bitmap, indices: &[I]) { - let num_rows = indices.len(); + let mut start = first; + let mut prev = first; - if num_rows == 0 { - return; - } + for &row in tail { + if row == prev + 1 { + prev = row; + continue; + } - // Fast path: avoid iterating column to generate a new bitmap. - // If this [`Bitmap`] is all true or all false and `num_rows <= bitmap.len()``, - // we can just slice it. - if num_rows <= from.len() && (from.null_count() == 0 || from.null_count() == from.len()) { - to.extend_constant(num_rows, from.get_bit(0)); - return; + indices.push_merge_range(block_id, start, prev - start + 1); + start = row; + prev = row; } - to.extend_from_trusted_len_iter(indices.iter().map(|index| from.get_bit(index.to_usize()))); + indices.push_merge_range(block_id, start, prev - start + 1); } -fn copy_primitive_type(to: &mut Vec, from: &Buffer, indices: &[I]) { - to.extend( - indices - .iter() - .map(|index| unsafe { *from.get_unchecked(index.to_usize()) }), - ); -} +fn estimate_partition_bytes(block_bytes: usize, total_rows: usize, selected_rows: usize) -> usize { + debug_assert!(selected_rows <= total_rows); -fn copy_binary(to: &mut BinaryColumnBuilder, from: &BinaryColumn, indices: &[I]) { - let num_rows = indices.len(); - - let row_bytes = from.total_bytes_len() / from.len(); - let data_capacity = row_bytes * (indices.len() * 4).div_ceil(3); - to.reserve(num_rows, data_capacity); - - for index in indices.iter() { - unsafe { - to.put_slice(from.index_unchecked(index.to_usize())); - to.commit_row(); - } + if selected_rows == 0 || block_bytes == 0 { + return 0; } -} - -fn copy_string(to: &mut StringColumnBuilder, from: &StringColumn, indices: &[I]) { - to.data.reserve(indices.len()); - for index in indices.iter() { - unsafe { - to.put_and_commit(from.index_unchecked(index.to_usize())); - } + if selected_rows == total_rows { + return block_bytes; } -} -fn copy_nullable( - to: &mut NullableColumnBuilder, - from: &NullableColumn, - indices: &[I], -) { - copy_boolean(&mut to.validity, &from.validity, indices); - copy_column(indices, &from.column, &mut to.builder) -} - -fn copy_opaque(indices: &[I], builder: &mut OpaqueColumnBuilder, column: &OpaqueColumn) { - match (builder, column) { - (OpaqueColumnBuilder::Opaque1(builder), OpaqueColumn::Opaque1(column)) => { - copy_primitive_type(builder, column, indices); - } - (OpaqueColumnBuilder::Opaque2(builder), OpaqueColumn::Opaque2(column)) => { - copy_primitive_type(builder, column, indices); - } - (OpaqueColumnBuilder::Opaque3(builder), OpaqueColumn::Opaque3(column)) => { - copy_primitive_type(builder, column, indices); - } - (OpaqueColumnBuilder::Opaque4(builder), OpaqueColumn::Opaque4(column)) => { - copy_primitive_type(builder, column, indices); - } - (OpaqueColumnBuilder::Opaque5(builder), OpaqueColumn::Opaque5(column)) => { - copy_primitive_type(builder, column, indices); - } - (OpaqueColumnBuilder::Opaque6(builder), OpaqueColumn::Opaque6(column)) => { - copy_primitive_type(builder, column, indices); - } - _ => unreachable!(), - } + let estimated = (block_bytes as u128 * selected_rows as u128).div_ceil(total_rows as u128); + estimated.min(usize::MAX as u128) as usize } -fn copy_vector(indices: &[I], builder: &mut VectorColumnBuilder, column: &VectorColumn) { - match (builder, column) { - (VectorColumnBuilder::Int8((builder, _)), VectorColumn::Int8((column, _))) => { - copy_primitive_type(builder, column, indices); - } - (VectorColumnBuilder::Float32((builder, _)), VectorColumn::Float32((column, _))) => { - copy_primitive_type(builder, column, indices); - } - _ => unreachable!(), +fn take_partition_data(partition: &mut PartitionBlockBuilder) -> Option { + if partition.num_rows == 0 { + return None; } -} -fn copy_array( - to: &mut ArrayColumnBuilder, - from: &ArrayColumn, - indices: &[I], -) { - // TODO: - for index in indices { - unsafe { to.push(from.index_unchecked(index.to_usize())) } - } + let block = partition.data_blocks.take(&partition.indices); + partition.data_blocks.clear(); + partition.indices.clear(); + partition.num_rows = 0; + partition.estimated_bytes = 0; + Some(block) } diff --git a/src/query/expression/tests/it/kernel.rs b/src/query/expression/tests/it/kernel.rs index e71e5dea072dc..ef3a0d60bcda7 100644 --- a/src/query/expression/tests/it/kernel.rs +++ b/src/query/expression/tests/it/kernel.rs @@ -13,10 +13,12 @@ // limitations under the License. use core::ops::Range; +use std::collections::HashSet; use databend_common_base::base::OrderedFloat; use databend_common_column::bitmap::Bitmap; use databend_common_expression::BlockEntry; +use databend_common_expression::BlockPartitionStream; use databend_common_expression::Column; use databend_common_expression::ColumnBuilder; use databend_common_expression::DataBlock; @@ -565,6 +567,106 @@ pub fn test_scatter() -> databend_common_exception::Result<()> { Ok(()) } +#[test] +pub fn test_block_partition_stream_lazy_rows_threshold() -> databend_common_exception::Result<()> { + let block1 = rand_block_for_all_types(6, DataTypeFilter::All); + let block2 = rand_block_for_all_types(5, DataTypeFilter::All); + + let mut stream = BlockPartitionStream::create(4, 0, 3); + + assert!( + stream + .partition(vec![0, 1, 0, 2, 1, 0], block1.clone(), true) + .is_empty() + ); + + let ready_blocks = stream.partition(vec![2, 0, 1, 0, 2], block2.clone(), true); + assert_eq!(ready_blocks.len(), 1); + assert_eq!(ready_blocks[0].0, 0); + + let p0_block1 = [0_u32, 2, 5]; + let p0_block2 = [1_u32, 3]; + let expected_p0 = + DataBlock::concat(&[block1.take(&p0_block1[..])?, block2.take(&p0_block2[..])?])?; + assert_block_value_eq(&ready_blocks[0].1, &expected_p0); + + let pending_ids = stream.partition_ids(); + assert_eq!(pending_ids, vec![1, 2]); + + let p1_block1 = [1_u32, 4]; + let p1_block2 = [2_u32]; + let expected_p1 = + DataBlock::concat(&[block1.take(&p1_block1[..])?, block2.take(&p1_block2[..])?])?; + let p2_block1 = [3_u32]; + let p2_block2 = [0_u32, 4]; + let expected_p2 = + DataBlock::concat(&[block1.take(&p2_block1[..])?, block2.take(&p2_block2[..])?])?; + + let actual_p1 = stream.finalize_partition(1).unwrap(); + let actual_p2 = stream.finalize_partition(2).unwrap(); + assert_block_value_eq(&actual_p1, &expected_p1); + assert_block_value_eq(&actual_p2, &expected_p2); + assert!(stream.finalize_partition(0).is_none()); + + Ok(()) +} + +#[test] +pub fn test_block_partition_stream_take_partitions_preserves_excluded() +-> databend_common_exception::Result<()> { + let block = rand_block_for_all_types(6, DataTypeFilter::All); + let mut stream = BlockPartitionStream::create(0, 0, 3); + + assert!( + stream + .partition(vec![0, 1, 2, 0, 1, 2], block.clone(), false) + .is_empty() + ); + + let taken = stream.take_partitions(&HashSet::from([1_usize])); + assert_eq!(taken.len(), 2); + assert_eq!(taken[0].0, 0); + assert_eq!(taken[1].0, 2); + + let p0_rows = [0_u32, 3]; + let p2_rows = [2_u32, 5]; + let expected_p0 = block.take(&p0_rows[..])?; + let expected_p2 = block.take(&p2_rows[..])?; + assert_block_value_eq(&taken[0].1, &expected_p0); + assert_block_value_eq(&taken[1].1, &expected_p2); + + let p1_rows = [1_u32, 4]; + let expected_p1 = block.take(&p1_rows[..])?; + let actual_p1 = stream.finalize_partition(1).unwrap(); + assert_block_value_eq(&actual_p1, &expected_p1); + assert!(stream.finalize_partition(0).is_none()); + assert!(stream.finalize_partition(2).is_none()); + + Ok(()) +} + +#[test] +pub fn test_block_partition_stream_estimated_bytes_threshold() +-> databend_common_exception::Result<()> { + let block = rand_block_for_all_types(4, DataTypeFilter::All); + let mut stream = BlockPartitionStream::create(0, 1, 2); + + let ready_blocks = stream.partition(vec![0, 1, 0, 1], block.clone(), true); + assert_eq!(ready_blocks.len(), 2); + + let p0_rows = [0_u32, 2]; + let p1_rows = [1_u32, 3]; + let expected_p0 = block.take(&p0_rows[..])?; + let expected_p1 = block.take(&p1_rows[..])?; + + assert_eq!(ready_blocks[0].0, 0); + assert_block_value_eq(&ready_blocks[0].1, &expected_p0); + assert_eq!(ready_blocks[1].0, 1); + assert_block_value_eq(&ready_blocks[1].1, &expected_p1); + + Ok(()) +} + #[test] fn test_builder() { let ty = DataType::String;