diff --git a/src/query/catalog/src/plan/datasource/datasource_plan.rs b/src/query/catalog/src/plan/datasource/datasource_plan.rs index 3da5a4104947b..7c42f1a2bba7d 100644 --- a/src/query/catalog/src/plan/datasource/datasource_plan.rs +++ b/src/query/catalog/src/plan/datasource/datasource_plan.rs @@ -25,6 +25,33 @@ use crate::plan::PushDownInfo; use crate::plan::datasource::datasource_info::DataSourceInfo; use crate::table_args::TableArgs; +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)] +pub struct BlockMetaOptions { + // for merge_into target build. + pub reserve_block_index: bool, + // Whether to update stream columns. + pub update_stream_columns: bool, + // used to query internal columns. + pub query_internal_columns: bool, +} + +impl BlockMetaOptions { + pub fn set_reserve_block_index(mut self, reserve_block_index: bool) -> Self { + self.reserve_block_index = reserve_block_index; + self + } + + pub fn set_update_stream_columns(mut self, update_stream_columns: bool) -> Self { + self.update_stream_columns = update_stream_columns; + self + } + + pub fn set_query_internal_columns(mut self, query_internal_columns: bool) -> Self { + self.query_internal_columns = query_internal_columns; + self + } +} + #[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] pub struct DataSourcePlan { pub source_info: DataSourceInfo, @@ -39,8 +66,7 @@ pub struct DataSourcePlan { pub push_downs: Option, pub internal_columns: Option>, pub base_block_ids: Option, - // used for recluster to update stream columns - pub update_stream_columns: bool, + pub block_meta_options: BlockMetaOptions, pub table_index: usize, pub scan_id: usize, diff --git a/src/query/catalog/src/plan/datasource/mod.rs b/src/query/catalog/src/plan/datasource/mod.rs index f2bfaa06a2de8..ceb6758c078a7 100644 --- a/src/query/catalog/src/plan/datasource/mod.rs +++ b/src/query/catalog/src/plan/datasource/mod.rs @@ -16,4 +16,5 @@ mod datasource_info; mod datasource_plan; pub use datasource_info::*; +pub use datasource_plan::BlockMetaOptions; pub use datasource_plan::DataSourcePlan; diff --git a/src/query/catalog/src/plan/partition.rs b/src/query/catalog/src/plan/partition.rs index fe879b268da5c..a800bf678aa25 100644 --- a/src/query/catalog/src/plan/partition.rs +++ b/src/query/catalog/src/plan/partition.rs @@ -371,53 +371,21 @@ pub struct ReclusterTask { pub type BlockMetaWithHLL = (Arc, Option); -#[derive(Clone)] -pub enum ReclusterParts { - Recluster { - tasks: Vec, - remained_blocks: Vec, - removed_segment_indexes: Vec, - removed_segment_summary: Statistics, - }, - Compact(Partitions), +#[derive(Clone, Default)] +pub struct ReclusterParts { + pub tasks: Vec, + pub remained_blocks: Vec, + pub removed_segment_indexes: Vec, + pub removed_segment_summary: Statistics, } impl ReclusterParts { pub fn is_empty(&self) -> bool { - match self { - ReclusterParts::Recluster { - tasks, - remained_blocks, - .. - } => tasks.is_empty() && remained_blocks.is_empty(), - ReclusterParts::Compact(parts) => parts.is_empty(), - } - } - - pub fn new_recluster_parts() -> Self { - Self::Recluster { - tasks: vec![], - remained_blocks: vec![], - removed_segment_indexes: vec![], - removed_segment_summary: Statistics::default(), - } - } - - pub fn new_compact_parts() -> Self { - Self::Compact(Partitions::default()) + self.tasks.is_empty() && self.remained_blocks.is_empty() } - pub fn is_distributed(&self, ctx: Arc) -> bool { - match self { - ReclusterParts::Recluster { tasks, .. } => tasks.len() > 1, - ReclusterParts::Compact(_) => { - (!ctx.get_cluster().is_empty()) - && ctx - .get_settings() - .get_enable_distributed_compact() - .unwrap_or(false) - } - } + pub fn is_distributed(&self, _ctx: Arc) -> bool { + self.tasks.len() > 1 } } diff --git a/src/query/ee/src/storages/fuse/operations/virtual_columns.rs b/src/query/ee/src/storages/fuse/operations/virtual_columns.rs index f98609e56a9e4..f79d39f08b86a 100644 --- a/src/query/ee/src/storages/fuse/operations/virtual_columns.rs +++ b/src/query/ee/src/storages/fuse/operations/virtual_columns.rs @@ -129,8 +129,7 @@ pub async fn prepare_refresh_virtual_column( let virtual_column_builder = VirtualColumnBuilder::try_create(ctx.clone(), source_schema)?; let projection = Projection::Columns(field_indices); - let block_reader = - fuse_table.create_block_reader(ctx.clone(), projection, false, false, false)?; + let block_reader = fuse_table.create_block_reader(ctx.clone(), projection, false)?; let segment_reader = MetaReaders::segment_info_reader(fuse_table.get_operator(), table_schema); diff --git a/src/query/expression/src/block.rs b/src/query/expression/src/block.rs index 4e27fd5e8c7e5..7ae9304fbbfc0 100644 --- a/src/query/expression/src/block.rs +++ b/src/query/expression/src/block.rs @@ -689,24 +689,6 @@ impl DataBlock { res } - pub fn split_by_rows_if_needed_no_tail(&self, rows_per_block: usize) -> Vec { - // Since rows_per_block represents the expected number of rows per block, - // and the minimum number of rows per block is 0.8 * rows_per_block, - // the maximum is taken as 1.8 * rows_per_block. - let max_rows_per_block = (rows_per_block * 9).div_ceil(5); - let mut res = vec![]; - let mut offset = 0; - let mut remain_rows = self.num_rows; - while remain_rows >= max_rows_per_block { - let cut = self.slice(offset..(offset + rows_per_block)); - res.push(cut); - offset += rows_per_block; - remain_rows -= rows_per_block; - } - res.push(self.slice(offset..(offset + remain_rows))); - res - } - #[inline] pub fn merge_block(&mut self, block: DataBlock) { self.entries.reserve(block.num_columns()); diff --git a/src/query/expression/src/utils/block_thresholds.rs b/src/query/expression/src/utils/block_thresholds.rs index b5bcfad557cf4..05a29d5bb6b49 100644 --- a/src/query/expression/src/utils/block_thresholds.rs +++ b/src/query/expression/src/utils/block_thresholds.rs @@ -17,6 +17,8 @@ use databend_common_io::constants::DEFAULT_BLOCK_COMPRESSED_SIZE; use databend_common_io::constants::DEFAULT_BLOCK_PER_SEGMENT; use databend_common_io::constants::DEFAULT_BLOCK_ROW_COUNT; +const MAX_BYTES_PER_BLOCK_FACTOR: usize = 2; + #[derive(Clone, Copy, Debug, serde::Serialize, serde::Deserialize)] pub struct BlockThresholds { pub max_rows_per_block: usize, @@ -35,11 +37,11 @@ impl Default for BlockThresholds { fn default() -> BlockThresholds { BlockThresholds { max_rows_per_block: DEFAULT_BLOCK_ROW_COUNT, - min_rows_per_block: (DEFAULT_BLOCK_ROW_COUNT * 4).div_ceil(5), - max_bytes_per_block: DEFAULT_BLOCK_BUFFER_SIZE * 2, - min_bytes_per_block: (DEFAULT_BLOCK_BUFFER_SIZE * 4).div_ceil(5), + min_rows_per_block: Self::min_block_threshold(DEFAULT_BLOCK_ROW_COUNT), + max_bytes_per_block: DEFAULT_BLOCK_BUFFER_SIZE * MAX_BYTES_PER_BLOCK_FACTOR, + min_bytes_per_block: Self::min_block_threshold(DEFAULT_BLOCK_BUFFER_SIZE), max_compressed_per_block: DEFAULT_BLOCK_COMPRESSED_SIZE, - min_compressed_per_block: (DEFAULT_BLOCK_COMPRESSED_SIZE * 4).div_ceil(5), + min_compressed_per_block: Self::min_block_threshold(DEFAULT_BLOCK_COMPRESSED_SIZE), block_per_segment: DEFAULT_BLOCK_PER_SEGMENT, } } @@ -54,15 +56,34 @@ impl BlockThresholds { ) -> Self { BlockThresholds { max_rows_per_block, - min_rows_per_block: (max_rows_per_block * 4).div_ceil(5), - max_bytes_per_block: bytes_per_block * 2, - min_bytes_per_block: (bytes_per_block * 4).div_ceil(5), + min_rows_per_block: Self::min_block_threshold(max_rows_per_block), + max_bytes_per_block: bytes_per_block * MAX_BYTES_PER_BLOCK_FACTOR, + min_bytes_per_block: Self::min_block_threshold(bytes_per_block), max_compressed_per_block, - min_compressed_per_block: (max_compressed_per_block * 4).div_ceil(5), + min_compressed_per_block: Self::min_block_threshold(max_compressed_per_block), block_per_segment, } } + #[inline] + pub fn set_rows_per_block(mut self, rows_per_block: usize) -> Self { + self.max_rows_per_block = rows_per_block; + self.min_rows_per_block = Self::min_block_threshold(rows_per_block); + self + } + + #[inline] + pub fn set_bytes_per_block(mut self, bytes_per_block: usize) -> Self { + self.max_bytes_per_block = bytes_per_block * MAX_BYTES_PER_BLOCK_FACTOR; + self.min_bytes_per_block = Self::min_block_threshold(bytes_per_block); + self + } + + #[inline] + pub fn min_block_threshold(value: usize) -> usize { + (value * 4).div_ceil(5) + } + #[inline] pub fn check_perfect_block( &self, @@ -112,20 +133,24 @@ impl BlockThresholds { } #[inline] - pub fn calc_rows_for_compact(&self, total_bytes: usize, total_rows: usize) -> usize { - if self.check_for_compact(total_rows, total_bytes) { - return total_rows; - } + pub fn calc_compact_block_num(&self, total_rows: usize, total_bytes: usize) -> usize { + let block_num_by_rows = if total_rows >= 2 * self.min_rows_per_block { + (total_rows / self.max_rows_per_block).max(2) + } else { + 1 + }; - let block_num_by_rows = std::cmp::max(total_rows / self.min_rows_per_block, 1); - let block_num_by_size = total_bytes / self.min_bytes_per_block; - if block_num_by_rows >= block_num_by_size { - return self.max_rows_per_block; - } - total_rows.div_ceil(block_num_by_size) + let bytes_per_block = self.max_bytes_per_block / MAX_BYTES_PER_BLOCK_FACTOR; + let block_num_by_bytes = if total_bytes >= 2 * self.min_bytes_per_block { + (total_bytes / bytes_per_block).max(2) + } else { + 1 + }; + + block_num_by_rows.max(block_num_by_bytes).min(total_rows) } - /// Calculates the optimal number of rows per block based on total data size and row count. + /// Calculates the optimal rows and bytes per block based on total data size and row count. /// /// # Parameters /// - `total_bytes`: The total size of the data in bytes. @@ -133,42 +158,55 @@ impl BlockThresholds { /// - `total_compressed`: The total compressed size of the data in bytes. /// /// # Returns - /// - The calculated number of rows per block that satisfies the thresholds. + /// - `(rows_per_block, bytes_per_block)`: rows are used as the sort block size, + /// and bytes are used by ordered compact to keep post-sort blocks near the + /// recluster target. #[inline] pub fn calc_rows_for_recluster( &self, total_rows: usize, total_bytes: usize, total_compressed: usize, - ) -> usize { + ) -> (usize, usize) { + debug_assert!(total_rows > 0); + + let default_bytes_per_block = self + .max_bytes_per_block + .div_ceil(MAX_BYTES_PER_BLOCK_FACTOR); // Check if the data is compact enough to skip further calculations. if self.check_for_compact(total_rows, total_bytes) && total_compressed < 2 * self.min_compressed_per_block { - return total_rows; + return (total_rows, default_bytes_per_block); } let block_num_by_rows = std::cmp::max(total_rows / self.min_rows_per_block, 1); let block_num_by_compressed = total_compressed.div_ceil(self.max_compressed_per_block); // If row-based block count exceeds compressed-based block count, use max rows per block. if block_num_by_rows >= block_num_by_compressed { - return self.max_rows_per_block; + return (self.max_rows_per_block, default_bytes_per_block); } let bytes_per_block = total_bytes.div_ceil(block_num_by_compressed); // Adjust the number of blocks based on block size thresholds. - let max_bytes_per_block = self.max_bytes_per_block.min(400 * 1024 * 1024); - let min_bytes_per_block = (self.min_bytes_per_block / 2).min(50 * 1024 * 1024); - let block_nums = if bytes_per_block > max_bytes_per_block { + let max_bytes_per_block = + default_bytes_per_block + default_bytes_per_block.min(DEFAULT_BLOCK_BUFFER_SIZE); + if bytes_per_block > max_bytes_per_block { // Case 1: If the block size is too bigger. - total_bytes.div_ceil(max_bytes_per_block) - } else if bytes_per_block < min_bytes_per_block { + let bytes_per_block = max_bytes_per_block; + let block_nums = total_bytes.div_ceil(bytes_per_block); + (total_rows.div_ceil(block_nums).max(1), bytes_per_block) + } else if bytes_per_block < self.min_bytes_per_block { // Case 2: If the block size is too smaller. - total_bytes / min_bytes_per_block + let bytes_per_block = self.min_bytes_per_block; + let block_nums = std::cmp::max(total_bytes / bytes_per_block, 1); + (total_rows.div_ceil(block_nums).max(1), bytes_per_block) } else { // Case 3: Otherwise, use the compressed-based block count. - block_num_by_compressed - }; - total_rows.div_ceil(block_nums.max(1)).max(1) + ( + total_rows.div_ceil(block_num_by_compressed).max(1), + bytes_per_block, + ) + } } } diff --git a/src/query/expression/tests/it/block.rs b/src/query/expression/tests/it/block.rs index 3ef3c9362bef3..1fe69e5aad291 100644 --- a/src/query/expression/tests/it/block.rs +++ b/src/query/expression/tests/it/block.rs @@ -31,21 +31,6 @@ use databend_common_expression::types::StringType; use databend_common_expression::types::number::NumberScalar; use databend_common_expression::types::string::StringColumnBuilder; -#[test] -fn test_split_block() { - let value = "abc"; - let n = 10; - let block = DataBlock::new_from_columns(vec![Column::String( - StringColumnBuilder::repeat(value, n).build(), - )]); - let sizes = block - .split_by_rows_if_needed_no_tail(3) - .iter() - .map(|b| b.num_rows()) - .collect::>(); - assert_eq!(sizes, vec![3, 3, 4]); -} - #[test] fn test_box_render_block() { let value = "abc"; diff --git a/src/query/expression/tests/it/block_thresholds.rs b/src/query/expression/tests/it/block_thresholds.rs index 8a995d361d980..60b84935c2230 100644 --- a/src/query/expression/tests/it/block_thresholds.rs +++ b/src/query/expression/tests/it/block_thresholds.rs @@ -72,19 +72,14 @@ fn test_check_too_small() { } #[test] -fn test_calc_rows_for_compact() { +fn test_calc_compact_block_num() { let t = default_thresholds(); - assert_eq!(t.calc_rows_for_compact(500_000, 1000), 1000); - - // Block number by rows wins → max_rows_per_block - assert_eq!( - t.calc_rows_for_compact(2_000_000, 10_000), - t.max_rows_per_block - ); - - // Size-based block number wins - assert_eq!(t.calc_rows_for_compact(4_000_000, 2000), 400); + assert_eq!(t.calc_compact_block_num(1000, 500_000), 1); + assert_eq!(t.calc_compact_block_num(1800, 500_000), 2); + assert_eq!(t.calc_compact_block_num(10_000, 500_000), 10); + assert_eq!(t.calc_compact_block_num(1000, 1_800_000), 2); + assert_eq!(t.calc_compact_block_num(1000, 4_000_000), 4); } #[test] @@ -92,23 +87,26 @@ fn test_calc_rows_for_recluster() { let t = default_thresholds(); // compact enough to skip further calculations - assert_eq!(t.calc_rows_for_recluster(1000, 500_000, 100_000), 1000); + assert_eq!( + t.calc_rows_for_recluster(1000, 500_000, 100_000), + (1000, 1_000_000) + ); // row-based block count exceeds compressed-based block count, use max rows per block. assert_eq!( t.calc_rows_for_recluster(10_000, 2_000_000, 100_000), - t.max_rows_per_block + (t.max_rows_per_block, 1_000_000) ); // Case 1: If the block size is too bigger. let result = t.calc_rows_for_recluster(4_000, 30_000_000, 600_000); - assert_eq!(result, 267); + assert_eq!(result, (267, 2_000_000)); // Case 2: If the block size is too smaller. - let result = t.calc_rows_for_recluster(4_000, 2_000_000, 600_000); - assert_eq!(result, 800); + let result = t.calc_rows_for_recluster(4_000, 1_600_000, 600_000); + assert_eq!(result, (2000, 800_000)); // Case 3: use the compressed-based block count. let result = t.calc_rows_for_recluster(4_000, 10_000_000, 600_000); - assert_eq!(result, 667); + assert_eq!(result, (667, 1_666_667)); } diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_compact_block.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_compact_block.rs index 07114c7cec0b0..f67677895be73 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_compact_block.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_compact_block.rs @@ -32,7 +32,7 @@ pub enum BlockCompactMeta { Concat(Vec), Split { blocks: Vec, - rows_per_block: usize, + block_num: usize, }, NoChange(Vec), } @@ -65,10 +65,7 @@ impl BlockMetaTransform for TransformCompactBlock { match meta { BlockCompactMeta::Concat(blocks) => Ok(vec![DataBlock::concat(&blocks)?]), - BlockCompactMeta::Split { - blocks, - rows_per_block, - } => Self::split_blocks(blocks, rows_per_block), + BlockCompactMeta::Split { blocks, block_num } => Self::split_blocks(blocks, block_num), BlockCompactMeta::NoChange(blocks) => Ok(blocks), } } @@ -79,25 +76,20 @@ impl BlockMetaTransform for TransformCompactBlock { } impl TransformCompactBlock { - fn split_blocks(blocks: Vec, rows_per_block: usize) -> Result> { - debug_assert!(!blocks.is_empty()); - if blocks.len() == 1 { - return Ok(blocks[0].split_by_rows_if_needed_no_tail(rows_per_block)); - } + fn split_blocks(blocks: Vec, block_num: usize) -> Result> { + let total_rows: usize = blocks.iter().map(DataBlock::num_rows).sum(); + let block_num = block_num.min(total_rows); + debug_assert!(block_num > 0); - let max_rows_per_block = (rows_per_block * 9).div_ceil(5); - let mut total_rows: usize = blocks.iter().map(DataBlock::num_rows).sum(); + let base_rows = total_rows / block_num; + let extra_rows = total_rows % block_num; let mut blocks = blocks.into_iter(); let mut current = blocks.next(); let mut offset = 0; - let mut output = Vec::new(); - - // Mirror split_by_rows_if_needed_no_tail, but consume a sequence of blocks - // while preserving their original order. Like the original helper, this - // treats rows_per_block as a target and allows a slightly larger block to - // avoid emitting a tiny tail block. - while total_rows >= max_rows_per_block { - let mut remain_rows = rows_per_block; + let mut output = Vec::with_capacity(block_num); + + for index in 0..block_num { + let mut remain_rows = base_rows + usize::from(index < extra_rows); let mut pieces = vec![]; while remain_rows > 0 { @@ -124,16 +116,10 @@ impl TransformCompactBlock { } } - output.push(DataBlock::concat(&pieces)?); - total_rows -= rows_per_block; - } - - if let Some(block) = current { - // Emit the final tail block, which may be smaller than rows_per_block by design. - let mut tail = Vec::new(); - tail.push(block.slice(offset..block.num_rows())); - tail.extend(blocks); - output.push(DataBlock::concat(&tail)?); + output.push(match pieces.len() { + 1 => pieces.pop().unwrap(), + _ => DataBlock::concat(&pieces)?, + }); } Ok(output) @@ -163,49 +149,69 @@ mod tests { .collect() } - fn assert_split_matches_reference(blocks: Vec, rows_per_block: usize) -> Result<()> { - let actual = TransformCompactBlock::split_blocks(blocks.clone(), rows_per_block)?; - let expected = DataBlock::concat(&blocks)?.split_by_rows_if_needed_no_tail(rows_per_block); + fn assert_split_result( + blocks: Vec, + block_num: usize, + expected_sizes: &[usize], + expected_values: &[Vec], + ) -> Result<()> { + let actual = TransformCompactBlock::split_blocks(blocks.clone(), block_num)?; assert_eq!( actual.iter().map(DataBlock::num_rows).collect::>(), - expected.iter().map(DataBlock::num_rows).collect::>() + expected_sizes ); assert_eq!( actual.iter().map(block_values).collect::>(), - expected.iter().map(block_values).collect::>() + expected_values ); Ok(()) } #[test] - fn test_split_blocks_matches_reference_across_block_boundaries() -> Result<()> { - assert_split_matches_reference( + fn test_split_blocks() -> Result<()> { + assert_split_result(vec![block_with_range(0, 10)], 3, &[4, 3, 3], &[ + vec![0, 1, 2, 3], + vec![4, 5, 6], + vec![7, 8, 9], + ])?; + assert_split_result( vec![ block_with_range(0, 2), block_with_range(2, 6), block_with_range(6, 10), ], 3, + &[4, 3, 3], + &[vec![0, 1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]], )?; - assert_split_matches_reference( + assert_split_result( vec![ block_with_range(0, 1), block_with_range(1, 2), block_with_range(2, 3), block_with_range(3, 10), ], - 4, + 2, + &[5, 5], + &[vec![0, 1, 2, 3, 4], vec![5, 6, 7, 8, 9]], )?; - assert_split_matches_reference( + assert_split_result( vec![ block_with_range(0, 2), block_with_range(2, 4), block_with_range(4, 6), block_with_range(6, 8), ], - 5, + 1, + &[8], + &[vec![0, 1, 2, 3, 4, 5, 6, 7]], )?; + assert_split_result(vec![block_with_range(0, 11)], 3, &[4, 4, 3], &[ + vec![0, 1, 2, 3], + vec![4, 5, 6, 7], + vec![8, 9, 10], + ])?; Ok(()) } } diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_compact_builder.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_compact_builder.rs index ba1c668066b1a..24e569b13b1dc 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_compact_builder.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_compact_builder.rs @@ -69,11 +69,11 @@ impl AccumulatingTransform for BlockCompactBuilder { // holding slices of blocks to merge later may lead to oom, so // 1. we expect blocks from file formats are not slice. // 2. if block is split here, cut evenly and emit them at once. - let rows_per_block = self.thresholds.calc_rows_for_compact(num_bytes, num_rows); + let block_num = self.thresholds.calc_compact_block_num(num_rows, num_bytes); Ok(vec![DataBlock::empty_with_meta(Box::new( BlockCompactMeta::Split { blocks: vec![data], - rows_per_block, + block_num, }, ))]) } else if self.thresholds.check_large_enough(num_rows, num_bytes) { diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_ordered_compact_builder.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_ordered_compact_builder.rs index 0d552aaedfdd1..1eea5c9625ccf 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_ordered_compact_builder.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_ordered_compact_builder.rs @@ -60,20 +60,11 @@ impl OrderedBlockCompactBuilder { // this path intentionally does not enforce a hard post-sort per-block maximum. // A block may remain above max_rows_per_block / max_bytes_per_block when that // avoids creating very small tail blocks inside the compaction window. - let mut block_nums = 2; - if blocks.total_rows > 2 * thresholds.min_rows_per_block { - block_nums = block_nums.max(blocks.total_rows / thresholds.min_rows_per_block); - } - if blocks.total_bytes > thresholds.max_bytes_per_block { - block_nums = block_nums.max(blocks.total_bytes / thresholds.min_bytes_per_block); - } - // rows_per_block is a split target for the downstream no-tail splitter rather than - // a hard upper bound for the produced blocks. - let rows_per_block = blocks.total_rows.div_ceil(block_nums).max(1); - + let block_num = + thresholds.calc_compact_block_num(blocks.total_rows, blocks.total_bytes); DataBlock::empty_with_meta(Box::new(BlockCompactMeta::Split { blocks: blocks.take_blocks(), - rows_per_block, + block_num, })) } else if blocks.len() > 1 { DataBlock::empty_with_meta(Box::new(BlockCompactMeta::Concat(blocks.take_blocks()))) @@ -181,7 +172,9 @@ mod tests { } fn row_focused_thresholds() -> BlockThresholds { - BlockThresholds::new(1000, 1 << 20, 1 << 20, 1000) + BlockThresholds::default() + .set_rows_per_block(1000) + .set_bytes_per_block(1 << 20) } #[test] @@ -219,4 +212,47 @@ mod tests { assert!(output[0].num_rows() > thresholds.max_rows_per_block); Ok(()) } + + #[test] + fn test_ordered_compact_splits_by_target_block_count() -> Result<()> { + let thresholds = BlockThresholds::default() + .set_rows_per_block(5) + .set_bytes_per_block(1 << 20); + let mut group = BlockGroup::default(); + for rows in [2, 6, 6, 2, 2] { + let block = block_with_rows(rows); + group.push(block.clone(), block.num_rows(), block.estimate_block_size()); + } + + let meta_block = OrderedBlockCompactBuilder::create_output_data(&mut group, thresholds); + let meta = BlockCompactMeta::downcast_from(meta_block.get_owned_meta().unwrap()).unwrap(); + let output = TransformCompactBlock::default().transform(meta)?; + + assert_eq!( + output.iter().map(DataBlock::num_rows).collect::>(), + vec![6, 6, 6] + ); + Ok(()) + } + + #[test] + fn test_ordered_compact_split_does_not_exceed_target_block_count() -> Result<()> { + let block = block_with_rows(1000); + let mut group = BlockGroup::default(); + group.push(block.clone(), block.num_rows(), block.estimate_block_size()); + + let target_block_count = 500; + let thresholds = BlockThresholds::default() + .set_rows_per_block(1000) + .set_bytes_per_block(group.total_bytes / target_block_count); + let block_num = thresholds.calc_compact_block_num(group.total_rows, group.total_bytes); + + let meta_block = OrderedBlockCompactBuilder::create_output_data(&mut group, thresholds); + let meta = BlockCompactMeta::downcast_from(meta_block.get_owned_meta().unwrap()).unwrap(); + let output = TransformCompactBlock::default().transform(meta)?; + + assert!(output.len() <= block_num); + assert!(output.iter().all(|block| block.num_rows() > 1)); + Ok(()) + } } diff --git a/src/query/service/src/interpreters/interpreter_table_recluster.rs b/src/query/service/src/interpreters/interpreter_table_recluster.rs index 4b5591eff163b..ceb3258344310 100644 --- a/src/query/service/src/interpreters/interpreter_table_recluster.rs +++ b/src/query/service/src/interpreters/interpreter_table_recluster.rs @@ -19,7 +19,6 @@ use std::time::SystemTime; use databend_common_catalog::lock::LockTableOption; use databend_common_catalog::plan::Filters; -use databend_common_catalog::plan::PartInfoType; use databend_common_catalog::plan::PushDownInfo; use databend_common_catalog::plan::ReclusterInfoSideCar; use databend_common_catalog::plan::ReclusterParts; @@ -67,7 +66,6 @@ use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files; use crate::interpreters::interpreter_insert_multi_table::scalar_expr_to_remote_expr; use crate::physical_plans::CommitSink; use crate::physical_plans::CommitType; -use crate::physical_plans::CompactSource; use crate::physical_plans::Exchange; use crate::physical_plans::HilbertPartition; use crate::physical_plans::PhysicalPlan; @@ -330,7 +328,7 @@ impl ReclusterTableInterpreter { let total_compressed = recluster_info.removed_statistics.compressed_byte_size as usize; // Determine rows per block based on data size and compression ratio - let rows_per_block = + let (rows_per_block, _) = block_thresholds.calc_rows_for_recluster(total_rows, total_bytes, total_compressed); // Calculate initial partition count based on data volume and block size @@ -516,56 +514,33 @@ impl ReclusterTableInterpreter { let table_info = tbl.get_table_info().clone(); let is_distributed = parts.is_distributed(self.ctx.clone()); - let plan = match parts { - ReclusterParts::Recluster { - tasks, - remained_blocks, - removed_segment_indexes, - removed_segment_summary, - } => { - let root = PhysicalPlan::new(Recluster { - tasks, - table_meta_timestamps, - - table_info: table_info.clone(), - meta: PhysicalPlanMeta::new("Recluster"), - }); + let ReclusterParts { + tasks, + remained_blocks, + removed_segment_indexes, + removed_segment_summary, + } = parts; + let root = PhysicalPlan::new(Recluster { + tasks, + table_meta_timestamps, - Self::add_commit_sink( - root, - is_distributed, - table_info, - snapshot, - false, - Some(ReclusterInfoSideCar { - merged_blocks: remained_blocks, - removed_segment_indexes, - removed_statistics: removed_segment_summary, - }), - table_meta_timestamps, - ) - } - ReclusterParts::Compact(parts) => { - let merge_meta = parts.partitions_type() == PartInfoType::LazyLevel; - let root = PhysicalPlan::new(CompactSource { - parts, - table_info: table_info.clone(), - column_ids: snapshot.schema.to_leaf_column_id_set(), - table_meta_timestamps, - meta: PhysicalPlanMeta::new("CompactSource"), - }); + table_info: table_info.clone(), + meta: PhysicalPlanMeta::new("Recluster"), + }); - Self::add_commit_sink( - root, - is_distributed, - table_info, - snapshot, - merge_meta, - None, - table_meta_timestamps, - ) - } - }; + let plan = Self::add_commit_sink( + root, + is_distributed, + table_info, + snapshot, + false, + Some(ReclusterInfoSideCar { + merged_blocks: remained_blocks, + removed_segment_indexes, + removed_statistics: removed_segment_summary, + }), + table_meta_timestamps, + ); Ok(Some(plan)) } diff --git a/src/query/service/src/physical_plans/physical_compact_source.rs b/src/query/service/src/physical_plans/physical_compact_source.rs index e84b3324d3751..db28e9857f532 100644 --- a/src/query/service/src/physical_plans/physical_compact_source.rs +++ b/src/query/service/src/physical_plans/physical_compact_source.rs @@ -116,7 +116,6 @@ impl IPhysicalPlan for CompactSource { }) .collect::>(); - let column_ids = self.column_ids.clone(); builder.main_pipeline.set_on_init(move || { let ctx = query_ctx.clone(); let partitions = @@ -125,7 +124,6 @@ impl IPhysicalPlan for CompactSource { let partitions = BlockCompactMutator::build_compact_tasks( ctx.clone(), dal.clone(), - column_ids.clone(), cluster_key_id, thresholds, lazy_parts, @@ -148,8 +146,6 @@ impl IPhysicalPlan for CompactSource { builder.ctx.clone(), Projection::Columns(table.all_column_indices()), false, - table.change_tracking_enabled(), - false, )?; let stream_ctx = if table.change_tracking_enabled() { Some(StreamContext::try_create( diff --git a/src/query/service/src/physical_plans/physical_recluster.rs b/src/query/service/src/physical_plans/physical_recluster.rs index 4ae2ba687ab55..61dfd1ed6e7cb 100644 --- a/src/query/service/src/physical_plans/physical_recluster.rs +++ b/src/query/service/src/physical_plans/physical_recluster.rs @@ -16,6 +16,7 @@ use std::any::Any; use std::sync::atomic; use std::sync::atomic::AtomicUsize; +use databend_common_catalog::plan::BlockMetaOptions; use databend_common_catalog::plan::DataSourceInfo; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::ReclusterTask; @@ -114,6 +115,8 @@ impl IPhysicalPlan for Recluster { // └──────────────┘ fn build_pipeline2(&self, builder: &mut PipelineBuilder) -> Result<()> { match self.tasks.len() { + // A zero-task recluster still rebuilds segments from sidecar `remained_blocks` + // during commit. This is the segment-only reorder path, not a no-op success path. 0 => builder.main_pipeline.add_source(EmptySource::create, 1), 1 => { let table = builder @@ -137,7 +140,8 @@ impl IPhysicalPlan for Recluster { push_downs: None, internal_columns: None, base_block_ids: None, - update_stream_columns: table.change_tracking_enabled(), + block_meta_options: BlockMetaOptions::default() + .set_update_stream_columns(table.change_tracking_enabled()), table_index: usize::MAX, scan_id: usize::MAX, }; @@ -209,7 +213,7 @@ impl IPhysicalPlan for Recluster { .collect(); // merge sort - let sort_block_size = block_thresholds.calc_rows_for_recluster( + let (rows_per_block, bytes_per_block) = block_thresholds.calc_rows_for_recluster( task.total_rows, task.total_bytes, task.total_compressed, @@ -223,18 +227,20 @@ impl IPhysicalPlan for Recluster { None, settings.get_enable_fixed_rows_sort()?, )? - .with_block_size_hit(sort_block_size); - // Todo(zhyass): Recluster will no longer perform sort in the near future. + .with_block_size_hit(rows_per_block); sort_pipeline_builder .build_full_sort_pipeline(&mut builder.main_pipeline, false)?; // Compact after merge sort. This ordered compactor keeps block growth bounded // without requiring a hard post-sort size cap, since final serialized sizes are // not known yet and over-splitting here would create small fragmented blocks. + let compact_thresholds = block_thresholds + .set_rows_per_block(rows_per_block) + .set_bytes_per_block(bytes_per_block); let max_threads = settings.get_max_threads()? as usize; build_ordered_compact_pipeline( &mut builder.main_pipeline, - block_thresholds, + compact_thresholds, max_threads, )?; diff --git a/src/query/service/src/physical_plans/physical_table_scan.rs b/src/query/service/src/physical_plans/physical_table_scan.rs index 03106a5cae058..51722c7033051 100644 --- a/src/query/service/src/physical_plans/physical_table_scan.rs +++ b/src/query/service/src/physical_plans/physical_table_scan.rs @@ -67,6 +67,7 @@ use databend_common_sql::executor::cast_expr_to_non_null_boolean; use databend_common_sql::executor::table_read_plan::ToReadDataSourcePlan; use databend_common_sql::plans::FunctionCall; use databend_common_storages_fuse::FuseTable; +use databend_common_storages_fuse::operations::need_reserve_block_info; use rand::distributions::Bernoulli; use rand::distributions::Distribution; use rand::thread_rng; @@ -508,6 +509,8 @@ impl PhysicalPlanBuilder { } source.table_index = scan.table_index; source.scan_id = scan.scan_id; + source.block_meta_options.reserve_block_index = + need_reserve_block_info(self.ctx.clone(), scan.table_index).0; if let Some(agg_index) = &scan.agg_index { let source_schema = source.schema(); let push_down = source.push_downs.as_mut().unwrap(); diff --git a/src/query/service/src/pipelines/processors/transforms/mod.rs b/src/query/service/src/pipelines/processors/transforms/mod.rs index 1edf6e447a9a8..d1e47c42e5b41 100644 --- a/src/query/service/src/pipelines/processors/transforms/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/mod.rs @@ -28,7 +28,6 @@ mod transform_branched_async_function; mod transform_cache_scan; mod transform_dictionary; mod transform_expression_scan; - mod transform_recursive_cte_scan; mod transform_recursive_cte_source; mod transform_resort_addon; diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs index e55cba91bad0e..cdeb474f52580 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs @@ -17,6 +17,5 @@ mod deletion; mod recluster_mutator; mod segments_compact_mutator; -pub use block_compact_mutator::verify_compact_tasks; pub use segments_compact_mutator::CompactSegmentTestFixture; pub use segments_compact_mutator::compact_segment; diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs index e5593cf9cf539..03bfde9d1aed9 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs @@ -20,7 +20,9 @@ use chrono::Utc; use databend_common_catalog::plan::ReclusterParts; use databend_common_exception::ErrorCode; use databend_common_expression::BlockThresholds; +use databend_common_expression::ColumnRef; use databend_common_expression::DataBlock; +use databend_common_expression::Expr; use databend_common_expression::Scalar; use databend_common_expression::TableSchema; use databend_common_expression::TableSchemaRef; @@ -30,7 +32,6 @@ use databend_common_storages_fuse::FuseBlockPartInfo; use databend_common_storages_fuse::FuseTable; use databend_common_storages_fuse::io::MetaWriter; use databend_common_storages_fuse::io::TableMetaLocationGenerator; -use databend_common_storages_fuse::operations::ReclusterMode; use databend_common_storages_fuse::operations::ReclusterMutator; use databend_common_storages_fuse::pruning::create_segment_location_vector; use databend_common_storages_fuse::statistics::reducers::merge_statistics_mut; @@ -44,15 +45,21 @@ use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::ClusterStatistics; use databend_storages_common_table_meta::meta::SegmentInfo; use databend_storages_common_table_meta::meta::Statistics; -use databend_storages_common_table_meta::meta::TableSnapshot; use databend_storages_common_table_meta::meta::Versioned; use rand::Rng; use rand::thread_rng; use uuid::Uuid; use crate::storages::fuse::operations::mutation::CompactSegmentTestFixture; -use crate::storages::fuse::operations::mutation::verify_compact_tasks; -use crate::storages::fuse::utils::new_empty_snapshot; + +fn test_cluster_key_expr() -> Expr { + Expr::ColumnRef(ColumnRef { + span: None, + data_type: DataType::Number(NumberDataType::Int32), + id: 0, + display_name: "c0".to_string(), + }) +} #[tokio::test(flavor = "multi_thread")] async fn test_recluster_mutator_block_select() -> anyhow::Result<()> { @@ -138,8 +145,6 @@ async fn test_recluster_mutator_block_select() -> anyhow::Result<()> { .await?; test_segment_locations.push(segment_location); test_block_locations.push(block_location); - // unused snapshot. - let snapshot = new_empty_snapshot(schema.as_ref().clone()); let ctx: Arc = ctx.clone(); let segment_locations = create_segment_location_vector(test_segment_locations, None); @@ -152,26 +157,22 @@ async fn test_recluster_mutator_block_select() -> anyhow::Result<()> { ) .await?; - let column_ids = snapshot.schema.to_leaf_column_id_set(); let mutator = ReclusterMutator::new( ctx, data_accessor, schema, - vec![DataType::Number(NumberDataType::Int64)], + vec![test_cluster_key_expr()], 1.0, BlockThresholds::default(), cluster_key_id, 1, - column_ids, ); - let (_, parts) = mutator - .target_select(compact_segments, ReclusterMode::Recluster) - .await?; + + let compact_segments = mutator.select_segments(&compact_segments, 8)?; + let (_, parts) = mutator.target_select(compact_segments).await?; let need_recluster = !parts.is_empty(); assert!(need_recluster); - let ReclusterParts::Recluster { tasks, .. } = parts else { - anyhow::bail!("Logical error, it's a bug"); - }; + let tasks = parts.tasks; assert_eq!(tasks.len(), 1); let total_block_nums = tasks.iter().map(|t| t.parts.len()).sum::(); assert_eq!(total_block_nums, 3); @@ -179,6 +180,97 @@ async fn test_recluster_mutator_block_select() -> anyhow::Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread")] +async fn test_recluster_mutator_zero_task_segment_rebuild() -> anyhow::Result<()> { + let fixture = TestFixture::setup().await?; + let ctx = fixture.new_query_ctx().await?; + let location_generator = TableMetaLocationGenerator::new("_prefix".to_owned()); + + let data_accessor = ctx.get_application_level_data_operator()?.operator(); + + let cluster_key_id = 0; + let thresholds = BlockThresholds::new(1000, 1_000_000, 100_000, 10); + let gen_test_seg = |cluster_stats: Option| async { + let block_id = Uuid::new_v4().simple().to_string(); + let location = (block_id, DataBlock::VERSION); + let test_block_meta = Arc::new(BlockMeta::new( + 1000, + 1_000_000, + 100_000, + HashMap::default(), + HashMap::default(), + cluster_stats, + location.clone(), + None, + 0, + None, + None, + None, + None, + None, + None, + None, + None, + meta::Compression::Lz4Raw, + Some(Utc::now()), + )); + + let statistics = reduce_block_metas(&[test_block_meta.as_ref()], thresholds, Some(0)); + + let segment = SegmentInfo::new(vec![test_block_meta], statistics); + let segment_location = location_generator + .gen_segment_info_location(TestFixture::default_table_meta_timestamps(), false); + segment + .write_meta(&data_accessor, &segment_location) + .await?; + Ok::<_, ErrorCode>((segment_location, location)) + }; + + let mut test_segment_locations = vec![]; + for (min, max) in [(1i32, 2i32), (3, 4), (5, 6)] { + let (segment_location, _) = gen_test_seg(Some(ClusterStatistics::new( + cluster_key_id, + vec![Scalar::from(min)], + vec![Scalar::from(max)], + 0, + None, + ))) + .await?; + test_segment_locations.push((segment_location, SegmentInfo::VERSION)); + } + + let schema = TableSchemaRef::new(TableSchema::empty()); + let ctx: Arc = ctx.clone(); + let segment_locations = create_segment_location_vector(test_segment_locations, None); + let compact_segments = FuseTable::segment_pruning( + &ctx, + schema.clone(), + data_accessor.clone(), + &None, + segment_locations, + ) + .await?; + + let mutator = ReclusterMutator::new( + ctx, + data_accessor, + schema, + vec![test_cluster_key_expr()], + 1.0, + thresholds, + cluster_key_id, + 1, + ); + let compact_segments = mutator.select_segments(&compact_segments, 8)?; + let (_, parts) = mutator.target_select(compact_segments).await?; + + assert!(parts.tasks.is_empty()); + assert_eq!(parts.remained_blocks.len(), 3); + assert_eq!(parts.removed_segment_indexes.len(), 3); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread")] async fn test_safety_for_recluster() -> anyhow::Result<()> { let fixture = TestFixture::setup().await?; @@ -221,16 +313,6 @@ async fn test_safety_for_recluster() -> anyhow::Result<()> { ); let unclustered: bool = rand.r#gen(); - let mut unclustered_segment_indices = HashSet::new(); - if unclustered { - unclustered_segment_indices = block_number_of_segments - .iter() - .rev() - .enumerate() - .filter(|(_, num)| *num % 4 == 0) - .map(|(index, _)| index) - .collect(); - } let (locations, _, segment_infos) = CompactSegmentTestFixture::gen_segments( ctx.clone(), block_number_of_segments, @@ -248,18 +330,6 @@ async fn test_safety_for_recluster() -> anyhow::Result<()> { merge_statistics_mut(&mut summary, &seg.summary, Some(cluster_key_id)); } - let snapshot = Arc::new(TableSnapshot::try_new( - None, - None, - schema.as_ref().clone(), - summary, - locations.clone(), - None, - None, - None, - TestFixture::default_table_meta_timestamps(), - )?); - let mut block_ids = HashSet::new(); for seg in &segment_infos { for b in &seg.blocks { @@ -278,20 +348,18 @@ async fn test_safety_for_recluster() -> anyhow::Result<()> { ) .await?; - let column_ids = snapshot.schema.to_leaf_column_id_set(); - let mut parts = ReclusterParts::new_recluster_parts(); + let mut parts = ReclusterParts::default(); let mutator = Arc::new(ReclusterMutator::new( ctx.clone(), data_accessor.clone(), schema.clone(), - vec![DataType::Number(NumberDataType::Int32)], + vec![test_cluster_key_expr()], 1.0, threshold, cluster_key_id, max_tasks, - column_ids, )); - let (mode, selected_segs) = mutator.select_segments(&compact_segments, 8)?; + let selected_segs = mutator.select_segments(&compact_segments, 8)?; // select the blocks with the highest depth. if selected_segs.is_empty() { let result = FuseTable::generate_recluster_parts(mutator, compact_segments).await?; @@ -299,68 +367,49 @@ async fn test_safety_for_recluster() -> anyhow::Result<()> { parts = recluster_parts; } } else { - let selected_segments = selected_segs - .into_iter() - .map(|i| compact_segments[i].clone()) - .collect(); - (_, parts) = mutator.target_select(selected_segments, mode).await?; + (_, parts) = mutator.target_select(selected_segs).await?; } if !parts.is_empty() { eprintln!("need_recluster"); - match parts { - ReclusterParts::Recluster { - tasks, - remained_blocks, - removed_segment_indexes, - .. - } => { - assert!(unclustered_segment_indices.is_empty()); - assert!(tasks.len() <= max_tasks); - assert!(!tasks.is_empty() || !remained_blocks.is_empty()); - eprintln!("tasks_num: {}, max_tasks: {}", tasks.len(), max_tasks); - let mut blocks = Vec::new(); - for task in tasks.into_iter() { - let parts = task.parts.partitions; - assert!(task.total_bytes <= recluster_block_size); - for part in parts.into_iter() { - let fuse_part = FuseBlockPartInfo::from_part(&part)?; - blocks.push(fuse_part.location.clone()); - } - } - - eprintln!( - "selected segments number {}, selected blocks number {}, remained blocks number {}", - removed_segment_indexes.len(), - blocks.len(), - remained_blocks.len() - ); - for remain in remained_blocks { - blocks.push(remain.0.location.0.clone()); - } - - let block_ids_after_target = HashSet::from_iter(blocks.into_iter()); - - let mut origin_blocks_ids = HashSet::new(); - for idx in &removed_segment_indexes { - for b in &segment_infos[*idx].blocks { - origin_blocks_ids.insert(b.location.0.clone()); - } - } - assert_eq!(block_ids_after_target, origin_blocks_ids); + let ReclusterParts { + tasks, + remained_blocks, + removed_segment_indexes, + .. + } = parts; + assert!(tasks.len() <= max_tasks); + assert!(!tasks.is_empty() || !remained_blocks.is_empty()); + eprintln!("tasks_num: {}, max_tasks: {}", tasks.len(), max_tasks); + let mut blocks = Vec::new(); + for task in tasks.into_iter() { + let parts = task.parts.partitions; + assert!(task.total_bytes <= recluster_block_size); + for part in parts.into_iter() { + let fuse_part = FuseBlockPartInfo::from_part(&part)?; + blocks.push(fuse_part.location.clone()); } - ReclusterParts::Compact(parts) => { - assert!(unclustered); - assert!(!unclustered_segment_indices.is_empty()); - verify_compact_tasks( - ctx.get_application_level_data_operator()?.operator(), - parts, - locations, - unclustered_segment_indices, - ) - .await?; + } + + eprintln!( + "selected segments number {}, selected blocks number {}, remained blocks number {}", + removed_segment_indexes.len(), + blocks.len(), + remained_blocks.len() + ); + for remain in remained_blocks { + blocks.push(remain.0.location.0.clone()); + } + + let block_ids_after_target = HashSet::from_iter(blocks.into_iter()); + + let mut origin_blocks_ids = HashSet::new(); + for idx in &removed_segment_indexes { + for b in &segment_infos[*idx].blocks { + origin_blocks_ids.insert(b.location.0.clone()); } - }; + } + assert_eq!(block_ids_after_target, origin_blocks_ids); } } diff --git a/src/query/service/tests/it/storages/fuse/operations/prewhere.rs b/src/query/service/tests/it/storages/fuse/operations/prewhere.rs index bd5b91ec4207f..e4701208fff3e 100644 --- a/src/query/service/tests/it/storages/fuse/operations/prewhere.rs +++ b/src/query/service/tests/it/storages/fuse/operations/prewhere.rs @@ -321,8 +321,6 @@ async fn prepare_prewhere_data() -> Result { schema.clone(), prewhere_info.output_columns.clone(), false, - false, - false, )?; // Extract column chunks from parquet bytes diff --git a/src/query/sql/src/executor/table_read_plan.rs b/src/query/sql/src/executor/table_read_plan.rs index 360f7860437af..36d145da0a629 100644 --- a/src/query/sql/src/executor/table_read_plan.rs +++ b/src/query/sql/src/executor/table_read_plan.rs @@ -16,6 +16,7 @@ use std::collections::BTreeMap; use std::sync::Arc; use databend_common_base::base::ProgressValues; +use databend_common_catalog::plan::BlockMetaOptions; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::Filters; use databend_common_catalog::plan::InternalColumn; @@ -154,6 +155,8 @@ impl ToReadDataSourcePlan for dyn Table { start.elapsed() )); + let query_internal_columns = internal_columns.is_some(); + Ok(DataSourcePlan { source_info, output_schema, @@ -164,7 +167,9 @@ impl ToReadDataSourcePlan for dyn Table { push_downs, internal_columns, base_block_ids, - update_stream_columns, + block_meta_options: BlockMetaOptions::default() + .set_update_stream_columns(update_stream_columns) + .set_query_internal_columns(query_internal_columns), // Set a dummy id, will be set real id later table_index: usize::MAX, scan_id: usize::MAX, diff --git a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader.rs b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader.rs index 936743bdb3f4f..10908e4d01f10 100644 --- a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader.rs +++ b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader.rs @@ -64,8 +64,6 @@ impl AggIndexReader { dal, agg.schema.clone(), agg.projection.clone(), - false, - false, put_cache, )?; diff --git a/src/query/storages/fuse/src/io/read/block/block_reader.rs b/src/query/storages/fuse/src/io/read/block/block_reader.rs index 483608ef3a1c8..e5c48860ad396 100644 --- a/src/query/storages/fuse/src/io/read/block/block_reader.rs +++ b/src/query/storages/fuse/src/io/read/block/block_reader.rs @@ -46,9 +46,6 @@ pub struct BlockReader { pub(crate) project_column_nodes: Arc>, pub(crate) default_vals: Vec, pub(crate) all_field_default_vals: Vec, - pub query_internal_columns: bool, - // used for mutation to update stream columns. - pub update_stream_columns: bool, pub put_cache: bool, pub original_schema: TableSchemaRef, @@ -131,8 +128,6 @@ impl BlockReader { original_schema: TableSchemaRef, all_field_default_vals: Vec, projection: Projection, - query_internal_columns: bool, - update_stream_columns: bool, put_cache: bool, ) -> Result> { let arrow_schema = Arc::new(original_schema.as_ref().into()); @@ -160,8 +155,6 @@ impl BlockReader { project_column_nodes, default_vals, all_field_default_vals, - query_internal_columns, - update_stream_columns, put_cache, original_schema, native_columns_reader: NativeColumnsReader::new()?, @@ -173,8 +166,6 @@ impl BlockReader { operator: Operator, schema: TableSchemaRef, projection: Projection, - query_internal_columns: bool, - update_stream_columns: bool, put_cache: bool, ) -> Result> { let mut all_field_default_vals = Vec::with_capacity(schema.fields().len()); @@ -189,8 +180,6 @@ impl BlockReader { schema, all_field_default_vals, projection, - query_internal_columns, - update_stream_columns, put_cache, ) } @@ -202,8 +191,6 @@ impl BlockReader { self.original_schema.clone(), self.all_field_default_vals.clone(), projection, - self.query_internal_columns, - self.update_stream_columns, self.put_cache, ) } @@ -229,14 +216,6 @@ impl BlockReader { indices } - pub fn query_internal_columns(&self) -> bool { - self.query_internal_columns - } - - pub fn update_stream_columns(&self) -> bool { - self.update_stream_columns - } - pub fn schema(&self) -> TableSchemaRef { self.projected_schema.clone() } diff --git a/src/query/storages/fuse/src/io/write/bloom_index_writer.rs b/src/query/storages/fuse/src/io/write/bloom_index_writer.rs index 5036051626c69..71c3e949c5eb5 100644 --- a/src/query/storages/fuse/src/io/write/bloom_index_writer.rs +++ b/src/query/storages/fuse/src/io/write/bloom_index_writer.rs @@ -156,8 +156,6 @@ impl BloomIndexRebuilder { self.table_schema.clone(), projection, false, - false, - false, )?; let settings = ReadSettings::from_ctx(&self.table_ctx)?; diff --git a/src/query/storages/fuse/src/operations/analyze/collect_ndv_source.rs b/src/query/storages/fuse/src/operations/analyze/collect_ndv_source.rs index 03a88563aa3dc..fadc791b66c22 100644 --- a/src/query/storages/fuse/src/operations/analyze/collect_ndv_source.rs +++ b/src/query/storages/fuse/src/operations/analyze/collect_ndv_source.rs @@ -138,8 +138,7 @@ impl AnalyzeCollectNDVSource { .distinct_column_fields(table_schema.clone(), RangeIndex::supported_table_type)?; let field_indices = ndv_columns_map.keys().cloned().collect(); let projection = Projection::Columns(field_indices); - let block_reader = - table.create_block_reader(ctx.clone(), projection, false, false, false)?; + let block_reader = table.create_block_reader(ctx.clone(), projection, false)?; // Rebuild `ndv_columns_map` so that keys correspond to the column order in the projection. // diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs b/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs index 9eb9d2bb58bbf..179bf9b490738 100644 --- a/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs +++ b/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs @@ -26,10 +26,12 @@ use databend_common_exception::Result; use databend_common_expression::BlockMetaInfoPtr; use databend_common_expression::BlockThresholds; use databend_common_expression::DataBlock; +use databend_common_expression::Expr; use databend_common_expression::TableSchemaRef; use databend_common_expression::VirtualDataSchema; use databend_common_pipeline_transforms::processors::AsyncAccumulatingTransform; use databend_common_sql::executor::physical_plans::MutationKind; +use databend_common_sql::parse_cluster_keys; use databend_storages_common_cache::SegmentStatistics; use databend_storages_common_table_meta::meta::AdditionalStatsMeta; use databend_storages_common_table_meta::meta::BlockHLL; @@ -67,19 +69,15 @@ use crate::operations::common::SnapshotMerged; use crate::operations::mutation::BlockIndex; use crate::operations::mutation::SegmentIndex; use crate::statistics::VirtualColumnAccumulator; +use crate::statistics::get_min_max_stats; use crate::statistics::reducers::merge_statistics_mut; use crate::statistics::reducers::reduce_block_metas; use crate::statistics::sort_by_cluster_stats; pub struct TableMutationAggregator { ctx: Arc, - schema: TableSchemaRef, table_id: u64, - dal: Operator, - location_gen: TableMetaLocationGenerator, - thresholds: BlockThresholds, - default_cluster_key_id: Option, base_segments: Vec, merged_blocks: Vec>, set_hilbert_level: bool, @@ -93,11 +91,10 @@ pub struct TableMutationAggregator { removed_segment_indexes: Vec, removed_statistics: Statistics, hll: BlockHLL, + write_segment_ctx: WriteSegmentCtx, - kind: MutationKind, start_time: Instant, finished_tasks: usize, - table_meta_timestamps: TableMetaTimestamps, } // takes in table mutation logs and aggregates them (former mutation_transform) @@ -122,13 +119,13 @@ impl AsyncAccumulatingTransform for TableMutationAggregator { let mut new_segment_locs = Vec::new(); new_segment_locs.extend(self.appended_segments.clone()); - let conflict_resolve_context = match self.kind { + let conflict_resolve_context = match self.write_segment_ctx.kind { MutationKind::Insert => ConflictResolveContext::AppendOnly(( SnapshotMerged { merged_segments: std::mem::take(&mut self.appended_segments), merged_statistics: std::mem::take(&mut self.appended_statistics), }, - self.schema.clone(), + self.write_segment_ctx.schema.clone(), )), MutationKind::Recluster => { let mut new_segments = std::mem::take(&mut self.appended_segments); @@ -140,9 +137,7 @@ impl AsyncAccumulatingTransform for TableMutationAggregator { if new_segments_len > removed_segments_len { // The remain new segments will be appended. let appended = new_segments.split_off(removed_segments_len); - for location in appended.into_iter().rev() { - appended_segments.push(location); - } + appended_segments.extend(appended.into_iter().rev()); } for (i, location) in new_segments.into_iter().enumerate() { @@ -188,9 +183,8 @@ impl TableMutationAggregator { kind: MutationKind, table_meta_timestamps: TableMetaTimestamps, ) -> Self { - let set_hilbert_level = table - .cluster_type() - .is_some_and(|v| matches!(v, ClusterType::Hilbert)) + let cluster_type = table.cluster_type(); + let set_hilbert_level = cluster_type.is_some_and(|v| matches!(v, ClusterType::Hilbert)) && matches!( kind, MutationKind::Delete @@ -198,15 +192,35 @@ impl TableMutationAggregator { | MutationKind::Replace | MutationKind::Recluster ); + let fill_missing_cluster_stats = + cluster_type.is_some_and(|v| matches!(v, ClusterType::Linear)); let virtual_schema = table.table_info.meta.virtual_schema.clone(); - TableMutationAggregator { - ctx, - schema: table.schema(), + let cluster_key_exprs = if fill_missing_cluster_stats { + table + .resolve_cluster_keys() + .map(|cluster_keys| { + parse_cluster_keys(ctx.clone(), Arc::new(table.clone()), cluster_keys) + }) + .transpose() + .expect("table cluster keys should be valid") + .unwrap_or_default() + } else { + vec![] + }; + let write_segment_ctx = WriteSegmentCtx { dal: table.get_operator(), location_gen: table.meta_location_generator().clone(), thresholds: table.get_block_thresholds(), - default_cluster_key_id: table.cluster_key_id(), + default_cluster_key: table.cluster_key_id(), + cluster_key_exprs: Arc::from(cluster_key_exprs.clone()), + schema: table.schema(), + kind, + table_meta_timestamps, + fill_missing_cluster_stats, + }; + TableMutationAggregator { + ctx, set_hilbert_level, mutations: HashMap::new(), extended_mutations: HashMap::new(), @@ -219,11 +233,10 @@ impl TableMutationAggregator { removed_segment_indexes, removed_statistics, hll: HashMap::new(), - kind, + write_segment_ctx, finished_tasks: 0, start_time: Instant::now(), table_id: table.get_id(), - table_meta_timestamps, } } @@ -234,7 +247,7 @@ impl TableMutationAggregator { { let status = format!( "{}: run tasks:{}, cost:{:?}", - self.kind, + self.write_segment_ctx.kind, self.finished_tasks, self.start_time.elapsed() ); @@ -273,7 +286,7 @@ impl TableMutationAggregator { merge_statistics_mut( &mut self.removed_statistics, &deleted_segment.summary, - self.default_cluster_key_id, + self.write_segment_ctx.default_cluster_key, ); } MutationLogEntry::AppendSegment { @@ -285,7 +298,7 @@ impl TableMutationAggregator { merge_statistics_mut( &mut self.appended_statistics, &summary, - self.default_cluster_key_id, + self.write_segment_ctx.default_cluster_key, ); merge_column_hll_mut(&mut self.hll, &hll); @@ -323,7 +336,7 @@ impl TableMutationAggregator { merge_statistics_mut( &mut self.removed_statistics, &extras.removed_segment_summary, - self.default_cluster_key_id, + self.write_segment_ctx.default_cluster_key, ); } MutationLogEntry::DoNothing => (), @@ -337,19 +350,17 @@ impl TableMutationAggregator { let mut merged_blocks = self.accumulate_merged_blocks()?; - if let Some(id) = self.default_cluster_key_id { + if let Some(id) = self.write_segment_ctx.default_cluster_key { // sort ascending. merged_blocks .sort_by(|a, b| sort_by_cluster_stats(&a.0.cluster_stats, &b.0.cluster_stats, id)); } let mut tasks = Vec::new(); - let segments_num = (merged_blocks.len() / self.thresholds.block_per_segment).max(1); + let segments_num = + (merged_blocks.len() / self.write_segment_ctx.thresholds.block_per_segment).max(1); let chunk_size = merged_blocks.len().div_ceil(segments_num); - let default_cluster_key = self.default_cluster_key_id; - let thresholds = self.thresholds; let set_hilbert_level = self.set_hilbert_level; - let kind = self.kind; for chunk in &merged_blocks.into_iter().chunks(chunk_size) { let (new_blocks, new_hlls): (Vec>, Vec>) = chunk.unzip(); @@ -364,23 +375,10 @@ impl TableMutationAggregator { }; let all_perfect = new_blocks.len() > 1; - let location_gen = self.location_gen.clone(); - let op = self.dal.clone(); - let table_meta_timestamps = self.table_meta_timestamps; + let ctx = self.write_segment_ctx.clone(); tasks.push(async move { - write_segment( - op, - location_gen, - new_blocks, - new_hlls, - thresholds, - default_cluster_key, - all_perfect, - kind, - set_hilbert_level, - table_meta_timestamps, - ) - .await + ctx.write_segment(new_blocks, new_hlls, all_perfect, set_hilbert_level) + .await }); } @@ -399,7 +397,7 @@ impl TableMutationAggregator { merge_statistics_mut( &mut self.appended_statistics, &stats, - self.default_cluster_key_id, + self.write_segment_ctx.default_cluster_key, ); self.appended_segments .push((location, SegmentInfo::VERSION)); @@ -434,7 +432,7 @@ impl TableMutationAggregator { merge_statistics_mut( &mut merged_statistics, &summary, - self.default_cluster_key_id, + self.write_segment_ctx.default_cluster_key, ); replaced_segments.insert(result.index, new_segment_loc); } else { @@ -445,7 +443,7 @@ impl TableMutationAggregator { merge_statistics_mut( &mut self.removed_statistics, &origin_summary, - self.default_cluster_key_id, + self.write_segment_ctx.default_cluster_key, ); } } @@ -455,7 +453,7 @@ impl TableMutationAggregator { count += chunk.len(); let status = format!( "{}: generate new segment files:{}/{}, cost:{:?}", - self.kind, + self.write_segment_ctx.kind, count, segment_indices.len(), start.elapsed() @@ -473,7 +471,7 @@ impl TableMutationAggregator { merge_statistics_mut( &mut merged_statistics, &appended_statistics, - self.default_cluster_key_id, + self.write_segment_ctx.default_cluster_key, ); Ok(ConflictResolveContext::ModifiedSegmentExistsInLatest( @@ -491,30 +489,31 @@ impl TableMutationAggregator { &mut self, segment_indices: Vec, ) -> Result> { - let thresholds = self.thresholds; - let default_cluster_key_id = self.default_cluster_key_id; - let kind = self.kind; let set_hilbert_level = self.set_hilbert_level; let mut tasks = Vec::with_capacity(segment_indices.len()); for index in segment_indices { let segment_mutation = self.mutations.remove(&index).unwrap(); let location = self.base_segments.get(index).cloned(); - let schema = self.schema.clone(); - let op = self.dal.clone(); - let location_gen = self.location_gen.clone(); - let table_meta_timestamps = self.table_meta_timestamps; + let write_segment_ctx = self.write_segment_ctx.clone(); tasks.push(async move { let mut all_perfect = false; let mut set_level = false; let (new_blocks, new_hlls, origin_summary) = if let Some(loc) = location { // read the old segment - let compact_segment_info = - SegmentsIO::read_compact_segment(op.clone(), loc, schema, false).await?; + let compact_segment_info = SegmentsIO::read_compact_segment( + write_segment_ctx.dal.clone(), + loc, + write_segment_ctx.schema.clone(), + false, + ) + .await?; let mut segment_info = SegmentInfo::try_from(compact_segment_info)?; let stats = match segment_info.summary.additional_stats_loc() { - Some(loc) => Some(read_segment_stats(op.clone(), loc).await?), + Some(loc) => { + Some(read_segment_stats(write_segment_ctx.dal.clone(), loc).await?) + } _ => None, }; @@ -553,7 +552,9 @@ impl TableMutationAggregator { .summary .cluster_stats .as_ref() - .is_some_and(|v| v.cluster_key_id == default_cluster_key_id.unwrap()); + .is_some_and(|v| { + v.cluster_key_id == write_segment_ctx.default_cluster_key.unwrap() + }); let stats = generate_segment_stats(new_hlls)?; (new_blocks, stats, Some(segment_info.summary)) } else { @@ -572,19 +573,9 @@ impl TableMutationAggregator { (new_blocks, stats, None) }; - let new_segment_info = write_segment( - op, - location_gen, - new_blocks, - new_hlls, - thresholds, - default_cluster_key_id, - all_perfect, - kind, - set_level, - table_meta_timestamps, - ) - .await?; + let new_segment_info = write_segment_ctx + .write_segment(new_blocks, new_hlls, all_perfect, set_level) + .await?; Ok(SegmentLite { index, @@ -615,7 +606,7 @@ impl TableMutationAggregator { let mut virtual_column_accumulator = VirtualColumnAccumulator::try_create( self.ctx.clone(), - &self.schema, + &self.write_segment_ctx.schema, &self.virtual_schema, ); @@ -685,7 +676,7 @@ impl TableMutationAggregator { fn accumulate_merged_blocks(&mut self) -> Result> { let mut virtual_column_accumulator = VirtualColumnAccumulator::try_create( self.ctx.clone(), - &self.schema, + &self.write_segment_ctx.schema, &self.virtual_schema, ); let extended_merged_blocks = std::mem::take(&mut self.merged_blocks); @@ -822,71 +813,123 @@ struct SegmentLite { origin_summary: Option, } -async fn write_segment( +#[derive(Clone)] +struct WriteSegmentCtx { dal: Operator, location_gen: TableMetaLocationGenerator, - blocks: Vec>, - stats: Option>, thresholds: BlockThresholds, default_cluster_key: Option, - all_perfect: bool, + cluster_key_exprs: Arc<[Expr]>, + schema: TableSchemaRef, kind: MutationKind, - set_hilbert_level: bool, table_meta_timestamps: TableMetaTimestamps, -) -> Result<(String, Statistics)> { - let location = location_gen.gen_segment_info_location(table_meta_timestamps, false); - let mut new_summary = reduce_block_metas(&blocks, thresholds, default_cluster_key); - if all_perfect { - // To fix issue #13217. - if new_summary.block_count > new_summary.perfect_block_count { - warn!( - "{}: generate new segment: {}, perfect_block_count: {}, block_count: {}", - kind, location, new_summary.perfect_block_count, new_summary.block_count, + fill_missing_cluster_stats: bool, +} + +impl WriteSegmentCtx { + async fn write_segment( + &self, + blocks: Vec>, + stats: Option>, + all_perfect: bool, + set_hilbert_level: bool, + ) -> Result<(String, Statistics)> { + let location = self + .location_gen + .gen_segment_info_location(self.table_meta_timestamps, false); + let mut new_summary = + reduce_block_metas(&blocks, self.thresholds, self.default_cluster_key); + if all_perfect { + // To fix issue #13217. + if new_summary.block_count > new_summary.perfect_block_count { + warn!( + "{}: generate new segment: {}, perfect_block_count: {}, block_count: {}", + self.kind, location, new_summary.perfect_block_count, new_summary.block_count, + ); + new_summary.perfect_block_count = new_summary.block_count; + } + } + if set_hilbert_level { + debug_assert!(new_summary.cluster_stats.is_none()); + let level = if self.thresholds.check_perfect_segment( + new_summary.block_count as usize, + new_summary.row_count as usize, + new_summary.uncompressed_byte_size as usize, + new_summary.compressed_byte_size as usize, + ) { + -1 + } else { + 0 + }; + new_summary.cluster_stats = Some(ClusterStatistics { + cluster_key_id: self.default_cluster_key.unwrap(), + min: vec![], + max: vec![], + level, + pages: None, + }); + } else if self.fill_missing_cluster_stats { + // Mutation paths may produce a new segment whose blocks do not all carry + // block-level cluster_stats for the current cluster key yet. In that case + // reduce_block_metas() leaves summary.cluster_stats empty. Reconstruct a + // segment-level min/max here from the merged summary col_stats so the new + // segment can participate in later recluster selection and clustering + // introspection without waiting for another rewrite. + fill_missing_segment_cluster_stats( + &mut new_summary, + self.default_cluster_key, + &self.cluster_key_exprs, + self.schema.as_ref(), ); - new_summary.perfect_block_count = new_summary.block_count; } + + if let Some(stats) = stats { + let segment_stats_location = + TableMetaLocationGenerator::gen_segment_stats_location_from_segment_location( + location.as_str(), + ); + let additional_stats_meta = AdditionalStatsMeta { + size: stats.len() as u64, + location: (segment_stats_location.clone(), SegmentStatistics::VERSION), + ..Default::default() + }; + self.dal.write(&segment_stats_location, stats).await?; + new_summary.additional_stats_meta = Some(additional_stats_meta); + } + + // create new segment info + let new_segment = SegmentInfo::new(blocks, new_summary.clone()); + new_segment + .write_meta_through_cache(&self.dal, &location) + .await?; + Ok((location, new_summary)) } - if set_hilbert_level { - debug_assert!(new_summary.cluster_stats.is_none()); - let level = if thresholds.check_perfect_segment( - new_summary.block_count as usize, - new_summary.row_count as usize, - new_summary.uncompressed_byte_size as usize, - new_summary.compressed_byte_size as usize, - ) { - -1 - } else { - 0 - }; - new_summary.cluster_stats = Some(ClusterStatistics { - cluster_key_id: default_cluster_key.unwrap(), - min: vec![], - max: vec![], - level, - pages: None, - }); - } +} - if let Some(stats) = stats { - let segment_stats_location = - TableMetaLocationGenerator::gen_segment_stats_location_from_segment_location( - location.as_str(), - ); - let additional_stats_meta = AdditionalStatsMeta { - size: stats.len() as u64, - location: (segment_stats_location.clone(), SegmentStatistics::VERSION), - ..Default::default() - }; - dal.write(&segment_stats_location, stats).await?; - new_summary.additional_stats_meta = Some(additional_stats_meta); +fn fill_missing_segment_cluster_stats( + summary: &mut Statistics, + default_cluster_key: Option, + cluster_key_exprs: &[Expr], + schema: &databend_common_expression::TableSchema, +) { + if summary.cluster_stats.is_some() { + return; + } + let Some(cluster_key_id) = default_cluster_key else { + return; + }; + if cluster_key_exprs.is_empty() { + return; } - // create new segment info - let new_segment = SegmentInfo::new(blocks, new_summary.clone()); - new_segment - .write_meta_through_cache(&dal, &location) - .await?; - Ok((location, new_summary)) + let (min, max) = get_min_max_stats( + cluster_key_exprs, + &summary.col_stats, + None, + Some(cluster_key_id), + schema, + ); + summary.cluster_stats = Some(ClusterStatistics::new(cluster_key_id, min, max, 0, None)); } fn generate_segment_stats(hlls: Vec>) -> Result>> { diff --git a/src/query/storages/fuse/src/operations/inverted_index.rs b/src/query/storages/fuse/src/operations/inverted_index.rs index c93ee0cbe5760..4b7a085132e02 100644 --- a/src/query/storages/fuse/src/operations/inverted_index.rs +++ b/src/query/storages/fuse/src/operations/inverted_index.rs @@ -97,8 +97,7 @@ impl FuseTable { // Read data here to keep the order of blocks in segment. let projection = Projection::Columns(field_indices); - let block_reader = - self.create_block_reader(ctx.clone(), projection, false, false, false)?; + let block_reader = self.create_block_reader(ctx.clone(), projection, false)?; let segment_reader = MetaReaders::segment_info_reader(self.get_operator(), table_schema); diff --git a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs index b88fde5cec9e4..217a149d73a49 100644 --- a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs +++ b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs @@ -113,8 +113,6 @@ impl MatchedAggregator { target_table_schema.clone(), projection, false, - update_stream_columns, - false, ) }?; diff --git a/src/query/storages/fuse/src/operations/mod.rs b/src/query/storages/fuse/src/operations/mod.rs index b725b56dad715..d110c5276630c 100644 --- a/src/query/storages/fuse/src/operations/mod.rs +++ b/src/query/storages/fuse/src/operations/mod.rs @@ -33,12 +33,10 @@ mod recluster; mod replace; mod replace_into; mod revert; +mod snapshot_hint; mod table_index; mod truncate; mod util; - -mod snapshot_hint; - mod vacuum; pub use agg_index_sink::AggIndexSink; diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs index 98eb556542861..6777848a31cd8 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; use std::collections::VecDeque; use std::sync::Arc; use std::time::Instant; @@ -26,8 +25,6 @@ use databend_common_catalog::plan::PartitionsShuffleKind; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::BlockThresholds; -use databend_common_expression::ColumnId; -use databend_common_expression::is_stream_column_id; use databend_common_metrics::storage::*; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::CompactSegmentInfo; @@ -113,12 +110,12 @@ impl BlockCompactMutator { self.operator.clone(), Arc::new(self.compact_params.base_snapshot.schema.clone()), ); - let mut checker = SegmentCompactChecker::new(self.thresholds, self.cluster_key_id); + let mut checker = SegmentCompactChecker::new(self.thresholds); let mut segment_idx = 0; let mut is_end = false; let mut stop_after_next = false; - let mut parts = Vec::new(); + let mut lazy_parts = Vec::new(); let chunk_size = max_threads * 4; for chunk in segment_locations.chunks(chunk_size) { // Read the segments information in parallel. @@ -151,7 +148,7 @@ impl BlockCompactMutator { for (segment_idx, compact_segment) in segment_infos.into_iter() { let segments_vec = checker.add(segment_idx, compact_segment); for segments in segments_vec { - checker.generate_part(segments, &mut parts); + checker.generate_part(segments, &mut lazy_parts); } if stop_after_next { @@ -192,13 +189,13 @@ impl BlockCompactMutator { } // finalize the compaction. - checker.finalize(&mut parts); + checker.finalize(&mut lazy_parts); // Status. let elapsed_time = start.elapsed(); self.ctx.set_status_info(&format!( "[BLOCK-COMPACT] Built lazy compact parts: {}, segments to compact: {}, elapsed: {:?}", - parts.len(), + lazy_parts.len(), checker.compacted_segment_cnt, elapsed_time )); @@ -208,29 +205,13 @@ impl BlockCompactMutator { let enable_distributed_compact = settings.get_enable_distributed_compact()?; let partitions = if !enable_distributed_compact || cluster.is_empty() - || parts.len() < cluster.nodes.len() * max_threads + || lazy_parts.len() < cluster.nodes.len() * max_threads { - // NOTE: The snapshot schema does not contain the stream column. - let column_ids = self - .compact_params - .base_snapshot - .schema - .to_leaf_column_id_set(); - let lazy_parts = parts - .into_iter() - .map(|v| { - v.as_any() - .downcast_ref::() - .unwrap() - .clone() - }) - .collect::>(); Partitions::create( PartitionsShuffleKind::Mod, BlockCompactMutator::build_compact_tasks( self.ctx.clone(), self.operator.clone(), - column_ids, self.cluster_key_id, self.thresholds, lazy_parts, @@ -238,7 +219,15 @@ impl BlockCompactMutator { .await?, ) } else { - Partitions::create(PartitionsShuffleKind::Mod, parts) + Partitions::create( + PartitionsShuffleKind::Mod, + lazy_parts + .into_iter() + .map(|part| { + CompactLazyPartInfo::create(part.segment_indices, part.compact_segments) + }) + .collect(), + ) }; Ok(partitions) } @@ -247,10 +236,9 @@ impl BlockCompactMutator { pub async fn build_compact_tasks( ctx: Arc, dal: Operator, - column_ids: HashSet, cluster_key_id: Option, thresholds: BlockThresholds, - mut lazy_parts: Vec, + lazy_parts: Vec, ) -> Result> { let start = Instant::now(); @@ -258,28 +246,30 @@ impl BlockCompactMutator { let max_concurrency = std::cmp::max(max_threads * 2, 10); let semaphore = Arc::new(Semaphore::new(max_concurrency)); - let mut remain = lazy_parts.len() % max_threads; - let batch_size = lazy_parts.len() / max_threads; + let total_parts = lazy_parts.len(); + let mut remain = total_parts % max_threads; + let batch_size = total_parts / max_threads; let mut works = Vec::with_capacity(max_threads); - while !lazy_parts.is_empty() { + let mut lazy_parts = lazy_parts.into_iter(); + let mut remaining_parts = total_parts; + while remaining_parts > 0 { let gap_size = std::cmp::min(1, remain); - let batch_size = batch_size + gap_size; + let current_batch_size = batch_size + gap_size; remain -= gap_size; + remaining_parts -= current_batch_size; - let column_ids = column_ids.clone(); let semaphore = semaphore.clone(); let dal = dal.clone(); - let batch = lazy_parts.drain(0..batch_size).collect::>(); + let batch = lazy_parts + .by_ref() + .take(current_batch_size) + .collect::>(); works.push(async move { let mut res = vec![]; for lazy_part in batch { - let mut builder = CompactTaskBuilder::new( - dal.clone(), - column_ids.clone(), - cluster_key_id, - thresholds, - ); + let mut builder = + CompactTaskBuilder::new(dal.clone(), cluster_key_id, thresholds); let parts = builder .build_tasks( lazy_part.segment_indices, @@ -332,35 +322,23 @@ pub struct SegmentCompactChecker { thresholds: BlockThresholds, segments: Vec<(SegmentIndex, Arc)>, total_block_count: u64, - cluster_key_id: Option, compacted_segment_cnt: usize, compacted_imperfect_block_cnt: u64, } impl SegmentCompactChecker { - pub fn new(thresholds: BlockThresholds, cluster_key_id: Option) -> Self { + pub fn new(thresholds: BlockThresholds) -> Self { Self { segments: vec![], total_block_count: 0, thresholds, - cluster_key_id, compacted_segment_cnt: 0, compacted_imperfect_block_cnt: 0, } } fn check_not_need_compact(&self, summary: &Statistics) -> bool { - let cluster_match = match (self.cluster_key_id, summary.cluster_stats.as_ref()) { - (Some(id), Some(stats)) => id == stats.cluster_key_id, - (None, _) => true, - _ => false, - }; - - if !cluster_match { - return false; - } - if summary.block_count == 1 { return true; } @@ -424,18 +402,18 @@ impl SegmentCompactChecker { pub fn generate_part( &mut self, segments: Vec<(SegmentIndex, Arc)>, - parts: &mut Vec, + parts: &mut Vec, ) { if !segments.is_empty() && self.check_for_compact(&segments) { let (segment_indices, compact_segments) = segments.into_iter().unzip(); - parts.push(CompactLazyPartInfo::create( + parts.push(CompactLazyPartInfo { segment_indices, compact_segments, - )); + }); } } - pub fn finalize(&mut self, parts: &mut Vec) { + pub fn finalize(&mut self, parts: &mut Vec) { let final_segments = std::mem::take(&mut self.segments); self.generate_part(final_segments, parts); } @@ -470,7 +448,6 @@ impl SegmentCompactChecker { struct CompactTaskBuilder { dal: Operator, - column_ids: HashSet, cluster_key_id: Option, thresholds: BlockThresholds, @@ -481,15 +458,9 @@ struct CompactTaskBuilder { } impl CompactTaskBuilder { - fn new( - dal: Operator, - column_ids: HashSet, - cluster_key_id: Option, - thresholds: BlockThresholds, - ) -> Self { + fn new(dal: Operator, cluster_key_id: Option, thresholds: BlockThresholds) -> Self { Self { dal, - column_ids, cluster_key_id, thresholds, blocks: vec![], @@ -558,7 +529,7 @@ impl CompactTaskBuilder { block_idx: BlockIndex, blocks: Vec, ) -> bool { - if blocks.len() == 1 && !self.check_compact(&blocks[0].0) { + if blocks.len() == 1 { unchanged_blocks.push((block_idx, blocks[0].clone())); true } else { @@ -568,28 +539,6 @@ impl CompactTaskBuilder { } } - fn check_compact(&self, block: &BlockMeta) -> bool { - // The snapshot schema does not contain stream columns, - // so the stream columns need to be filtered out. - let column_ids = block - .col_metas - .keys() - .filter(|id| !is_stream_column_id(**id)) - .cloned() - .collect::>(); - if self.column_ids == column_ids { - // Check if the block needs to be resort. - self.cluster_key_id.is_some_and(|key| { - block - .cluster_stats - .as_ref() - .is_none_or(|v| v.cluster_key_id != key) - }) - } else { - true - } - } - // Select the row_count >= min_rows_per_block or block_size >= max_bytes_per_block // as the perfect_block condition(N for short). Gets a set of segments, iterates // through the blocks, and finds the blocks >= N and blocks < 2N as a task. @@ -667,19 +616,16 @@ impl CompactTaskBuilder { if !self.is_empty() { let tail = self.take_blocks(); - if self.cluster_key_id.is_some() && latest_flag { - // The clustering table cannot compact different level blocks. - self.build_task(&mut tasks, &mut unchanged_blocks, block_idx, tail); + let mut blocks = if latest_flag { + unchanged_blocks.pop().map_or(vec![], |(_, v)| vec![v]) } else { - let mut blocks = if latest_flag { - unchanged_blocks.pop().map_or(vec![], |(_, v)| vec![v]) - } else { - tasks - .pop_back() - .map_or(vec![], |(_, v)| v.into_iter().map(|v| (v, None)).collect()) - }; + tasks + .pop_back() + .map_or(vec![], |(_, v)| v.into_iter().map(|v| (v, None)).collect()) + }; - let (total_rows, total_size, total_compressed) = blocks + let (total_rows, total_size, total_compressed) = + blocks .iter() .chain(tail.iter()) .fold((0, 0, 0), |mut acc, x| { @@ -688,14 +634,13 @@ impl CompactTaskBuilder { acc.2 += x.0.file_size as usize; acc }); - if self.check_for_compact(total_rows, total_size, total_compressed) { - blocks.extend(tail); - self.build_task(&mut tasks, &mut unchanged_blocks, block_idx, blocks); - } else { - // blocks >= 2N - self.build_task(&mut tasks, &mut unchanged_blocks, block_idx, blocks); - self.build_task(&mut tasks, &mut unchanged_blocks, block_idx + 1, tail); - } + if self.check_for_compact(total_rows, total_size, total_compressed) { + blocks.extend(tail); + self.build_task(&mut tasks, &mut unchanged_blocks, block_idx, blocks); + } else { + // blocks >= 2N + self.build_task(&mut tasks, &mut unchanged_blocks, block_idx, blocks); + self.build_task(&mut tasks, &mut unchanged_blocks, block_idx + 1, tail); } } diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/mod.rs b/src/query/storages/fuse/src/operations/mutation/mutator/mod.rs index 99cf4e8e08bd5..3e66cfc34c39c 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/mod.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/mod.rs @@ -18,8 +18,8 @@ mod segment_compact_mutator; pub use block_compact_mutator::BlockCompactMutator; pub use block_compact_mutator::SegmentCompactChecker; -pub use recluster_mutator::ReclusterMode; pub use recluster_mutator::ReclusterMutator; +pub(crate) use recluster_mutator::SelectedReclusterSegment; pub use segment_compact_mutator::SegmentCompactMutator; pub use segment_compact_mutator::SegmentCompactionState; pub use segment_compact_mutator::SegmentCompactor; diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs index de1bbaea6c112..b61d48725042e 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs @@ -21,33 +21,35 @@ use std::sync::Arc; use databend_common_base::runtime::GLOBAL_MEM_STAT; use databend_common_base::runtime::execute_futures_in_parallel; -use databend_common_catalog::plan::Partitions; -use databend_common_catalog::plan::PartitionsShuffleKind; use databend_common_catalog::plan::ReclusterParts; use databend_common_catalog::plan::ReclusterTask; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::BlockThresholds; +use databend_common_expression::Expr; use databend_common_expression::Scalar; use databend_common_expression::TableSchemaRef; use databend_common_expression::compare_scalars; use databend_common_expression::types::DataType; +use databend_common_sql::parse_cluster_keys; use databend_common_storage::ColumnNodes; use databend_storages_common_cache::LoadParams; use databend_storages_common_pruner::BlockMetaIndex; use databend_storages_common_table_meta::meta::BlockMeta; +use databend_storages_common_table_meta::meta::ClusterStatistics; use databend_storages_common_table_meta::meta::CompactSegmentInfo; use databend_storages_common_table_meta::meta::Location; use databend_storages_common_table_meta::meta::RawBlockHLL; use databend_storages_common_table_meta::meta::Statistics; +use databend_storages_common_table_meta::meta::StatisticsOfColumns; use databend_storages_common_table_meta::meta::TableSnapshot; use fastrace::Span; use fastrace::func_path; use fastrace::future::FutureExt; use indexmap::IndexSet; use log::debug; -use log::info; use log::warn; use opendal::Operator; @@ -56,11 +58,8 @@ use crate::FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD; use crate::FuseTable; use crate::SegmentLocation; use crate::io::MetaReaders; -use crate::operations::BlockCompactMutator; -use crate::operations::CompactLazyPartInfo; use crate::operations::common::BlockMetaIndex as BlockIndex; -use crate::operations::mutation::SegmentCompactChecker; -use crate::operations::mutation::mutator::block_compact_mutator::CompactLimitState; +use crate::statistics::get_min_max_stats; use crate::statistics::reducers::merge_statistics_mut; use crate::statistics::sort_by_cluster_stats; @@ -69,14 +68,25 @@ use crate::statistics::sort_by_cluster_stats; /// rarely improves data locality and may cause task churn. const MAX_RECLUSTER_LEVEL_FOR_TWO_BLOCKS: i32 = 2; -/// Maximum number of unclustered blocks to select for compaction in a single -/// recluster round. Keeps the compact phase bounded so recluster can make -/// incremental progress without excessive memory/time cost per invocation. -const MAX_UNCLUSTERED_BLOCKS_PER_RECLUSTER: u64 = 1000; +#[derive(Clone)] +pub struct SelectedReclusterSegment { + pub loc: SegmentLocation, + pub info: Arc, + pub stats: ClusterStatistics, +} -pub enum ReclusterMode { - Recluster, - Compact, +impl SelectedReclusterSegment { + pub(crate) fn create( + mutator: &ReclusterMutator, + loc: SegmentLocation, + info: Arc, + ) -> Self { + let stats = mutator.build_cluster_stats_for_recluster( + info.summary.cluster_stats.as_ref(), + &info.summary.col_stats, + ); + Self { loc, info, stats } + } } #[derive(Clone)] @@ -88,8 +98,8 @@ pub struct ReclusterMutator { pub(crate) cluster_key_id: u32, pub(crate) schema: TableSchemaRef, pub(crate) max_tasks: usize, + pub(crate) cluster_key_exprs: Vec>, pub(crate) cluster_key_types: Vec, - pub(crate) column_ids: HashSet, } impl ReclusterMutator { @@ -115,10 +125,19 @@ impl ReclusterMutator { max_tasks = cluster.nodes.len(); } - let cluster_key_types = table.cluster_key_types(ctx.clone()); - - // NOTE: The snapshot schema does not contain the stream column. - let column_ids = snapshot.schema.to_leaf_column_id_set(); + // safe to unwrap + let cluster_keys = table.resolve_cluster_keys().unwrap(); + let cluster_key_exprs = + parse_cluster_keys(ctx.clone(), Arc::new(table.clone()), cluster_keys)?; + if cluster_key_exprs.is_empty() { + return Err(ErrorCode::Internal( + "recluster requires non-empty cluster key expressions", + )); + } + let cluster_key_types = cluster_key_exprs + .iter() + .map(|v| v.data_type().clone()) + .collect::>(); Ok(Self { ctx, @@ -128,8 +147,8 @@ impl ReclusterMutator { block_thresholds, cluster_key_id, max_tasks, + cluster_key_exprs, cluster_key_types, - column_ids, }) } @@ -139,13 +158,20 @@ impl ReclusterMutator { ctx: Arc, operator: Operator, schema: TableSchemaRef, - cluster_key_types: Vec, + cluster_key_exprs: Vec>, depth_threshold: f64, block_thresholds: BlockThresholds, cluster_key_id: u32, max_tasks: usize, - column_ids: HashSet, ) -> Self { + assert!( + !cluster_key_exprs.is_empty(), + "recluster requires non-empty cluster key expressions" + ); + let cluster_key_types = cluster_key_exprs + .iter() + .map(|expr| expr.data_type().clone()) + .collect(); Self { ctx, operator, @@ -154,34 +180,21 @@ impl ReclusterMutator { block_thresholds, cluster_key_id, max_tasks, + cluster_key_exprs, cluster_key_types, - column_ids, } } #[async_backtrace::framed] pub async fn target_select( &self, - compact_segments: Vec<(SegmentLocation, Arc)>, - mode: ReclusterMode, + compact_segments: Vec, ) -> Result<(u64, ReclusterParts)> { - match mode { - ReclusterMode::Compact => self.generate_compact_tasks(compact_segments).await, - ReclusterMode::Recluster => self.generate_recluster_tasks(compact_segments).await, - } - } - - #[async_backtrace::framed] - pub async fn generate_recluster_tasks( - &self, - compact_segments: Vec<(SegmentLocation, Arc)>, - ) -> Result<(u64, ReclusterParts)> { - // Sort segments by cluster statistics let mut compact_segments = compact_segments; compact_segments.sort_by(|a, b| { sort_by_cluster_stats( - &a.1.summary.cluster_stats, - &b.1.summary.cluster_stats, + &Some(a.stats.clone()), + &Some(b.stats.clone()), self.cluster_key_id, ) }); @@ -192,29 +205,36 @@ impl ReclusterMutator { let mut selected_seg_stats = Vec::with_capacity(compact_segments.len()); let selected_segments = compact_segments .into_iter() - .map(|(loc, info)| { - selected_statistics.push(info.summary.clone()); - selected_segs_idx.push(loc.segment_idx); + .map(|segment| { + selected_statistics.push(segment.info.summary.clone()); + selected_segs_idx.push(segment.loc.segment_idx); selected_seg_stats.push(( - loc.segment_idx, - info.summary + segment.loc.segment_idx, + segment + .info + .summary .additional_stats_meta .as_ref() .map(|v| v.location.clone()), )); - (loc.segment_idx, info) + (segment.loc.segment_idx, segment.info) }) .collect::>(); // Gather blocks and create a block map categorized by clustering levels let blocks = self.gather_blocks(selected_segments).await?; + let block_stats = blocks + .iter() + .map(|(_, block)| { + self.build_cluster_stats_for_recluster( + block.cluster_stats.as_ref(), + &block.col_stats, + ) + }) + .collect::>(); let mut blocks_map: BTreeMap> = BTreeMap::new(); - for (idx, (_, block)) in blocks.iter().enumerate() { - if let Some(stats) = &block.cluster_stats { - if stats.cluster_key_id == self.cluster_key_id { - blocks_map.entry(stats.level).or_default().push(idx); - } - } + for (idx, stats) in block_stats.iter().enumerate() { + blocks_map.entry(stats.level).or_default().push(idx); } // Compute memory threshold and maximum number of blocks allowed for reclustering. @@ -265,10 +285,9 @@ impl ReclusterMutator { // Analyze each block's statistics and track min/max points for &i in indices.iter() { let block = &blocks[i]; - if let Some(stats) = &block.1.cluster_stats { - points_map.entry(stats.min().clone()).or_default().0.push(i); - points_map.entry(stats.max().clone()).or_default().1.push(i); - } + let stats = &block_stats[i]; + points_map.entry(stats.min().clone()).or_default().0.push(i); + points_map.entry(stats.max().clone()).or_default().1.push(i); // Track small blocks for potential compaction if self.block_thresholds.check_too_small( @@ -375,13 +394,17 @@ impl ReclusterMutator { break; } - // Determine if reclustering is needed + // Determine if reclustering is needed. + // Derived stats may participate in ordering/selection even when no block-level rewrite + // task is chosen. In that case, a zero-task result is still meaningful: commit will use + // `remained_blocks` to rebuild segments in the selected order, rather than treating the + // recluster as a no-op. let selected = if selected_blocks_idx.is_empty() { let unordered = || { - blocks.windows(2).any(|w| { + block_stats.windows(2).any(|w| { sort_by_cluster_stats( - &w[0].1.cluster_stats, - &w[1].1.cluster_stats, + &Some(w[0].clone()), + &Some(w[1].clone()), self.cluster_key_id, ) == Ordering::Greater }) @@ -401,10 +424,17 @@ impl ReclusterMutator { merge_statistics_mut(&mut removed_segment_summary, v, default_cluster_key_id) }); - let blocks_idx: IndexSet = IndexSet::from_iter(0..blocks.len()); - let remained_blocks = blocks_idx - .difference(&selected_blocks_idx) - .map(|&v| blocks[v].clone()) + let remained_blocks = blocks + .into_iter() + .enumerate() + .filter_map(|(idx, (block_index, block_meta))| { + if selected_blocks_idx.contains(&idx) { + return None; + } + let mut block_meta = Arc::unwrap_or_clone(block_meta); + block_meta.cluster_stats = Some(block_stats[idx].clone()); + Some((block_index, Arc::new(block_meta))) + }) .collect::>(); let hlls = self.gather_hlls(selected_seg_stats).await?; let remained_blocks = remained_blocks @@ -414,95 +444,19 @@ impl ReclusterMutator { (block_meta, hll) }) .collect(); - ReclusterParts::Recluster { + ReclusterParts { tasks, remained_blocks, removed_segment_indexes: selected_segs_idx, removed_segment_summary, } } else { - ReclusterParts::new_recluster_parts() + ReclusterParts::default() }; Ok((selected_blocks_idx.len() as u64, parts)) } - async fn generate_compact_tasks( - &self, - compact_segments: Vec<(SegmentLocation, Arc)>, - ) -> Result<(u64, ReclusterParts)> { - info!( - "recluster: found {} unclustered segments, compacting them before re-clustering", - compact_segments.len() - ); - let settings = self.ctx.get_settings(); - let num_block_limit = settings.get_compact_max_block_selection()? as usize; - let num_segment_limit = compact_segments.len(); - let mut recluster_blocks_count = 0; - - let mut parts = Vec::new(); - let mut checker = - SegmentCompactChecker::new(self.block_thresholds, Some(self.cluster_key_id)); - let mut stop_after_next = false; - for (loc, compact_segment) in compact_segments.into_iter() { - recluster_blocks_count += compact_segment.summary.block_count; - let segments_vec = checker.add(loc.segment_idx, compact_segment); - for segments in segments_vec { - checker.generate_part(segments, &mut parts); - } - - if stop_after_next { - break; - } - - match checker.is_limit_reached(num_segment_limit, num_block_limit) { - CompactLimitState::Continue => {} - CompactLimitState::ReachedBlockLimit => { - stop_after_next = true; - } - CompactLimitState::ReachedSegmentLimit => { - break; - } - } - } - // finalize the compaction. - checker.finalize(&mut parts); - - let cluster = self.ctx.get_cluster(); - let max_threads = settings.get_max_threads()? as usize; - let enable_distributed_compact = settings.get_enable_distributed_compact()?; - let partitions = if !enable_distributed_compact - || cluster.is_empty() - || parts.len() < cluster.nodes.len() * max_threads - { - let lazy_parts = parts - .into_iter() - .map(|v| { - v.as_any() - .downcast_ref::() - .unwrap() - .clone() - }) - .collect::>(); - Partitions::create( - PartitionsShuffleKind::Mod, - BlockCompactMutator::build_compact_tasks( - self.ctx.clone(), - self.operator.clone(), - self.column_ids.clone(), - Some(self.cluster_key_id), - self.block_thresholds, - lazy_parts, - ) - .await?, - ) - } else { - Partitions::create(PartitionsShuffleKind::Mod, parts) - }; - - Ok((recluster_blocks_count, ReclusterParts::Compact(partitions))) - } - fn generate_task( &self, block_metas: &[(Option, Arc)], @@ -539,79 +493,45 @@ impl ReclusterMutator { &self, compact_segments: &[(SegmentLocation, Arc)], max_len: usize, - ) -> Result<(ReclusterMode, IndexSet)> { + ) -> Result> { let mut blocks_num = 0; let mut indices = IndexSet::new(); + let mut segments = vec![None; compact_segments.len()]; let mut points_map: HashMap, (Vec, Vec)> = HashMap::new(); - let mut unclustered_segments = IndexSet::new(); - let mut unclustered_block_num = 0; let mut small_segments = IndexSet::new(); - let block_per_seg = self.block_thresholds.block_per_segment; - let max_uncluster_blocks = std::cmp::min( - self.ctx.get_settings().get_compact_max_block_selection()?, - MAX_UNCLUSTERED_BLOCKS_PER_RECLUSTER, - ); // Iterate over all segments for (i, (loc, compact_segment)) in compact_segments.iter().enumerate() { - let mut level = -1; - // Check if the segment is clustered - let is_clustered = compact_segment - .summary - .cluster_stats - .as_ref() - .is_some_and(|v| { - level = v.level; - v.cluster_key_id == self.cluster_key_id - }); - - // If not clustered, mark for compaction - if !is_clustered { - debug!( - "recluster: segment '{}' is unclustered, needs to be compacted", - loc.location.0 - ); - unclustered_block_num += compact_segment.summary.block_count; - unclustered_segments.insert(i); - if unclustered_block_num >= max_uncluster_blocks { - break; - } - continue; - } + let segment = + SelectedReclusterSegment::create(self, loc.clone(), compact_segment.clone()); + let level = segment.stats.level; // Skip if segment has more blocks than required and no reclustering is needed if level < 0 && compact_segment.summary.block_count as usize >= block_per_seg { continue; } - // Process clustered segment - if let Some(stats) = &compact_segment.summary.cluster_stats { - blocks_num += compact_segment.summary.block_count as usize; - // Track small segments for special handling later - if blocks_num < block_per_seg { - small_segments.insert(i); - } - // Add to indices for potential reclustering - indices.insert(i); - // Update points_map with min and max points of the segment - points_map - .entry(stats.min().clone()) - .and_modify(|v| v.0.push(i)) - .or_insert((vec![i], vec![])); - points_map - .entry(stats.max().clone()) - .and_modify(|v| v.1.push(i)) - .or_insert((vec![], vec![i])); + blocks_num += compact_segment.summary.block_count as usize; + // Track small segments for special handling later + if blocks_num < block_per_seg { + small_segments.insert(i); } + // Add to indices for potential reclustering + indices.insert(i); + // Update points_map with min and max points of the segment + points_map + .entry(segment.stats.min().clone()) + .and_modify(|v| v.0.push(i)) + .or_insert((vec![i], vec![])); + points_map + .entry(segment.stats.max().clone()) + .and_modify(|v| v.1.push(i)) + .or_insert((vec![], vec![i])); + segments[i] = Some(segment); } - // If there are unclustered segments, return early for compaction - if !unclustered_segments.is_empty() { - return Ok((ReclusterMode::Compact, unclustered_segments)); - } - - let selected_segments = if indices.len() > 1 && blocks_num > block_per_seg { + let selected_indices = if indices.len() > 1 && blocks_num > block_per_seg { let selected = self.fetch_max_depth(points_map, 1.0, max_len)?; if selected.is_empty() && small_segments.len() > 1 { // If no segments were selected but small segments exist, use those. @@ -623,17 +543,32 @@ impl ReclusterMutator { indices }; - Ok((ReclusterMode::Recluster, selected_segments)) + Ok(selected_indices + .into_iter() + .filter_map(|i| segments[i].take()) + .collect()) } - pub fn segment_can_recluster(&self, summary: &Statistics) -> bool { - if let Some(stats) = &summary.cluster_stats { - stats.cluster_key_id == self.cluster_key_id - && (stats.level >= 0 - || (summary.block_count as usize) < self.block_thresholds.block_per_segment) - } else { - false + fn build_cluster_stats_for_recluster( + &self, + cluster_stats: Option<&ClusterStatistics>, + col_stats: &StatisticsOfColumns, + ) -> ClusterStatistics { + if let Some(stats) = cluster_stats { + if stats.cluster_key_id == self.cluster_key_id { + return stats.clone(); + } } + + let (min_stats, max_stats) = get_min_max_stats( + &self.cluster_key_exprs, + col_stats, + cluster_stats, + Some(self.cluster_key_id), + self.schema.as_ref(), + ); + + ClusterStatistics::new(self.cluster_key_id, min_stats, max_stats, 0, None) } #[async_backtrace::framed] @@ -749,8 +684,8 @@ impl ReclusterMutator { ) -> Result> { let mut max_depth = 0; let mut max_point = 0; - let mut interval_depths = HashMap::new(); - let mut point_overlaps: Vec> = Vec::new(); + let mut interval_depths = HashMap::with_capacity(points_map.len()); + let mut point_overlaps: Vec> = Vec::with_capacity(points_map.len()); let mut unfinished_intervals = BTreeMap::new(); let (keys, values): (Vec<_>, Vec<_>) = points_map.into_iter().unzip(); let indices = compare_scalars(keys, &self.cluster_key_types)?; @@ -785,7 +720,7 @@ impl ReclusterMutator { }); } - let mut selected_idx = IndexSet::new(); + let mut selected_idx = IndexSet::with_capacity(max_len); if !unfinished_intervals.is_empty() { warn!( "Recluster: unfinished_intervals is not empty after calculate the blocks overlaps" diff --git a/src/query/storages/fuse/src/operations/mutation/processors/mutation_source.rs b/src/query/storages/fuse/src/operations/mutation/processors/mutation_source.rs index c2b55ba1d1d06..df7116e2eef3a 100644 --- a/src/query/storages/fuse/src/operations/mutation/processors/mutation_source.rs +++ b/src/query/storages/fuse/src/operations/mutation/processors/mutation_source.rs @@ -85,6 +85,7 @@ pub struct MutationSource { operators: Vec, storage_format: FuseStorageFormat, action: MutationAction, + update_stream_columns: bool, index: BlockMetaIndex, stats_type: ClusterStatsGenType, @@ -101,6 +102,7 @@ impl MutationSource { remain_reader: Arc>, operators: Vec, storage_format: FuseStorageFormat, + update_stream_columns: bool, ) -> Result { Ok(ProcessorPtr::create(Box::new(MutationSource { state: State::ReadData(None), @@ -112,6 +114,7 @@ impl MutationSource { operators, storage_format, action, + update_stream_columns, index: BlockMetaIndex::default(), stats_type: ClusterStatsGenType::Generally, }))) @@ -224,7 +227,7 @@ impl Processor for MutationSource { DataBlock::empty_with_meta(meta), ); } else { - if self.block_reader.update_stream_columns { + if self.update_stream_columns { let row_num = build_origin_block_row_num(rows); data_block.add_entry(row_num); } @@ -316,7 +319,7 @@ impl Processor for MutationSource { let inner_meta = Box::new(SerializeDataMeta::SerializeBlock( SerializeBlock::create(self.index.clone(), self.stats_type.clone()), )); - let meta: BlockMetaInfoPtr = if self.block_reader.update_stream_columns() { + let meta: BlockMetaInfoPtr = if self.update_stream_columns { Box::new(gen_mutation_stream_meta(Some(inner_meta), &path)?) } else { inner_meta diff --git a/src/query/storages/fuse/src/operations/mutation_source.rs b/src/query/storages/fuse/src/operations/mutation_source.rs index ae4c51f6c2e56..12c12ea7f3b78 100644 --- a/src/query/storages/fuse/src/operations/mutation_source.rs +++ b/src/query/storages/fuse/src/operations/mutation_source.rs @@ -66,8 +66,7 @@ impl FuseTable { }; let projection = Projection::Columns(col_indices.clone()); let update_stream_columns = self.change_tracking_enabled(); - let block_reader = - self.create_block_reader(ctx.clone(), projection, false, update_stream_columns, false)?; + let block_reader = self.create_block_reader(ctx.clone(), projection, false)?; let schema = block_reader.schema().as_ref().clone(); let filter_expr = Arc::new(filter.map(|v| { @@ -96,8 +95,6 @@ impl FuseTable { ctx.clone(), Projection::Columns(remain_column_indices), false, - update_stream_columns, - false, )?) .clone(), )) @@ -123,6 +120,7 @@ impl FuseTable { remain_reader.clone(), ops.clone(), self.storage_format, + update_stream_columns, ) }, max_threads, diff --git a/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs index 717c9c74fd2b7..dc1297fbbd7cf 100644 --- a/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs @@ -65,8 +65,7 @@ pub fn row_fetch_processor( .iter() .map(|field| DataType::from(field.data_type())) .collect::>(); - let block_reader = - fuse_table.create_block_reader(ctx.clone(), projection.clone(), false, false, true)?; + let block_reader = fuse_table.create_block_reader(ctx.clone(), projection.clone(), true)?; match &fuse_table.storage_format { FuseStorageFormat::Native => unreachable!(), diff --git a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs index 8a5646232629b..fc47138b9f2f0 100644 --- a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs @@ -23,6 +23,7 @@ use databend_common_base::base::Progress; use databend_common_base::base::ProgressValues; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; +use databend_common_catalog::plan::BlockMetaOptions; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::plan::PushDownInfo; @@ -65,7 +66,6 @@ use roaring::RoaringTreemap; use super::native_data_source::NativeDataSource; use super::read_data_source::ReadDataSource; use super::util::add_data_block_meta; -use super::util::need_reserve_block_info; use crate::DEFAULT_ROW_PER_PAGE; use crate::fuse_part::FuseBlockPartInfo; use crate::io::AggIndexReader; @@ -232,7 +232,7 @@ pub struct NativeDeserializeDataTransform { skipped_pages: usize, // for merge_into target build. - need_reserve_block_info: bool, + block_meta_options: BlockMetaOptions, } #[derive(Clone)] @@ -255,7 +255,6 @@ impl NativeDeserializeDataTransform { index_reader: Arc>, ) -> Result { let scan_progress = ctx.get_scan_progress(); - let (need_reserve_block_info, _) = need_reserve_block_info(ctx.clone(), plan.table_index); let src_schema: DataSchema = (block_reader.schema().as_ref()).into(); let mut prewhere_columns: Vec = @@ -335,7 +334,7 @@ impl NativeDeserializeDataTransform { base_block_ids: plan.base_block_ids.clone(), bloom_runtime_filter: None, read_state: ReadPartState::new(), - need_reserve_block_info, + block_meta_options: plan.block_meta_options.clone(), }, ))) } @@ -448,9 +447,7 @@ impl NativeDeserializeDataTransform { fuse_part, None, self.base_block_ids.clone(), - self.block_reader.update_stream_columns(), - self.block_reader.query_internal_columns(), - self.need_reserve_block_info, + &self.block_meta_options, )?; data_block.resort(&self.src_schema, &self.output_schema) @@ -827,7 +824,7 @@ impl NativeDeserializeDataTransform { // `TransformAddInternalColumns` will generate internal columns using `InternalColumnMeta` in next pipeline. let mut block = block.resort(&self.src_schema, &self.output_schema)?; let fuse_part = FuseBlockPartInfo::from_part(&self.parts[0])?; - let offsets = if self.block_reader.query_internal_columns() { + let offsets = if self.block_meta_options.query_internal_columns { let offset = self.read_state.offset as u64; let offsets = if let Some(count) = self.read_state.filtered_count { let filter_executor = self.filter_executor.as_mut().unwrap(); @@ -849,9 +846,7 @@ impl NativeDeserializeDataTransform { fuse_part, offsets, self.base_block_ids.clone(), - self.block_reader.update_stream_columns(), - self.block_reader.query_internal_columns(), - self.need_reserve_block_info, + &self.block_meta_options, )?; self.read_state.offset += origin_num_rows; @@ -955,9 +950,7 @@ impl Processor for NativeDeserializeDataTransform { fuse_part, None, self.base_block_ids.clone(), - self.block_reader.update_stream_columns(), - self.block_reader.query_internal_columns(), - self.need_reserve_block_info, + &self.block_meta_options, )?; self.finish_partition(); diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs index a8d44088cd22b..1eea90fe01b68 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs @@ -20,6 +20,7 @@ use databend_common_base::base::Progress; use databend_common_base::base::ProgressValues; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; +use databend_common_catalog::plan::BlockMetaOptions; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::plan::PrewhereInfo; @@ -43,7 +44,6 @@ use super::parquet_data_source::ParquetDataSource; use super::read_data_source::ReadDataSource; use super::read_state::ReadState; use super::util::add_data_block_meta; -use super::util::need_reserve_block_info; use crate::fuse_part::FuseBlockPartInfo; use crate::io::AggIndexReader; use crate::io::BlockReader; @@ -68,7 +68,7 @@ pub struct DeserializeDataTransform { virtual_reader: Arc>, base_block_ids: Option, - need_reserve_block_info: bool, + block_meta_options: BlockMetaOptions, prewhere_info: Option, read_state: Option, @@ -111,7 +111,6 @@ impl DeserializeDataTransform { .and_then(|p| p.prewhere.as_ref()) .cloned(); - let (need_reserve_block_info, _) = need_reserve_block_info(ctx.clone(), plan.table_index); Ok(ProcessorPtr::create(Box::new(DeserializeDataTransform { ctx: ctx.clone(), scan_id: plan.scan_id, @@ -127,7 +126,7 @@ impl DeserializeDataTransform { index_reader, virtual_reader, base_block_ids: plan.base_block_ids.clone(), - need_reserve_block_info, + block_meta_options: plan.block_meta_options.clone(), prewhere_info, read_state: None, }))) @@ -267,7 +266,7 @@ impl Processor for DeserializeDataTransform { // Fill `BlockMetaIndex` as `DataBlock.meta` if query internal columns, // `TransformAddInternalColumns` will generate internal columns using `BlockMetaIndex` in next pipeline. - let offsets = if self.block_reader.query_internal_columns() { + let offsets = if self.block_meta_options.query_internal_columns { bitmap_selection.as_ref().map(|bitmap| { RoaringTreemap::from_sorted_iter( (0..bitmap.len()) @@ -285,9 +284,7 @@ impl Processor for DeserializeDataTransform { part, offsets, self.base_block_ids.clone(), - self.block_reader.update_stream_columns(), - self.block_reader.query_internal_columns(), - self.need_reserve_block_info, + &self.block_meta_options, )?; self.output_data = Some(data_block); diff --git a/src/query/storages/fuse/src/operations/read/util.rs b/src/query/storages/fuse/src/operations/read/util.rs index 9bfd71c8b080e..bed89ee8b7baa 100644 --- a/src/query/storages/fuse/src/operations/read/util.rs +++ b/src/query/storages/fuse/src/operations/read/util.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use databend_common_catalog::merge_into_join::MergeIntoJoinType; +use databend_common_catalog::plan::BlockMetaOptions; use databend_common_catalog::plan::InternalColumnMeta; use databend_common_catalog::plan::gen_mutation_stream_meta; use databend_common_catalog::table_context::TableContext; @@ -43,13 +44,11 @@ pub(crate) fn add_data_block_meta( fuse_part: &FuseBlockPartInfo, offsets: Option, base_block_ids: Option, - update_stream_columns: bool, - query_internal_columns: bool, - need_reserve_block_info: bool, + block_meta_options: &BlockMetaOptions, ) -> Result { // for merge into target build let mut meta: Option = - if need_reserve_block_info && fuse_part.block_meta_index.is_some() { + if block_meta_options.reserve_block_index && fuse_part.block_meta_index.is_some() { let block_meta_index = fuse_part.block_meta_index.as_ref().unwrap(); Some(Box::new(BlockMetaIndex { segment_idx: block_meta_index.segment_idx, @@ -59,13 +58,13 @@ pub(crate) fn add_data_block_meta( None }; - if update_stream_columns { + if block_meta_options.update_stream_columns { // Fill `BlockMetaInfoPtr` if update stream columns let stream_meta = gen_mutation_stream_meta(meta, &fuse_part.location)?; meta = Some(Box::new(stream_meta)); } - if query_internal_columns { + if block_meta_options.query_internal_columns { // Fill `BlockMetaInfoPtr` if query internal columns let block_meta = fuse_part.block_meta_index().unwrap(); diff --git a/src/query/storages/fuse/src/operations/read_data.rs b/src/query/storages/fuse/src/operations/read_data.rs index f50feff7a912d..8c7c6af601ed5 100644 --- a/src/query/storages/fuse/src/operations/read_data.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -37,8 +37,6 @@ impl FuseTable { &self, ctx: Arc, projection: Projection, - query_internal_columns: bool, - update_stream_columns: bool, put_cache: bool, ) -> Result> { let table_schema = self.schema_with_stream(); @@ -47,8 +45,6 @@ impl FuseTable { self.operator.clone(), table_schema, projection, - query_internal_columns, - update_stream_columns, put_cache, ) } @@ -66,8 +62,6 @@ impl FuseTable { &self.schema_with_stream(), plan.push_downs.as_ref(), ), - plan.internal_columns.is_some(), - plan.update_stream_columns, put_cache, ) } diff --git a/src/query/storages/fuse/src/operations/recluster.rs b/src/query/storages/fuse/src/operations/recluster.rs index 81a8618442288..12f9a750af8eb 100644 --- a/src/query/storages/fuse/src/operations/recluster.rs +++ b/src/query/storages/fuse/src/operations/recluster.rs @@ -16,12 +16,10 @@ use std::collections::HashSet; use std::sync::Arc; use std::time::Instant; -use databend_common_base::runtime::GlobalIORuntime; use databend_common_catalog::plan::PushDownInfo; use databend_common_catalog::plan::ReclusterParts; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::TableSchemaRef; use databend_common_metrics::storage::metrics_inc_recluster_build_task_milliseconds; @@ -30,17 +28,15 @@ use databend_common_sql::BloomIndexColumns; use databend_storages_common_table_meta::meta::CompactSegmentInfo; use databend_storages_common_table_meta::meta::TableSnapshot; use databend_storages_common_table_meta::table::ClusterType; +use futures::stream::FuturesUnordered; +use futures::stream::StreamExt; use log::warn; use opendal::Operator; -use tokio::select; -use tokio::sync::Semaphore; -use tokio::sync::mpsc; use crate::FuseTable; use crate::SegmentLocation; use crate::operations::ReclusterMutator; -use crate::operations::acquire_task_permit; -use crate::operations::mutation::ReclusterMode; +use crate::operations::SelectedReclusterSegment; use crate::pruning::PruningContext; use crate::pruning::SegmentPruner; use crate::pruning::create_segment_location_vector; @@ -90,7 +86,7 @@ impl FuseTable { let mut recluster_seg_num = 0; let mut recluster_blocks_count = 0; - let mut parts = ReclusterParts::new_recluster_parts(); + let mut parts = ReclusterParts::default(); let number_segments = segment_locations.len(); let mut segment_idx = 0; @@ -123,8 +119,7 @@ impl FuseTable { } // select the segments with the highest depth. - let (recluster_mode, selected_segs) = - mutator.select_segments(&compact_segments, max_seg_num)?; + let selected_segs = mutator.select_segments(&compact_segments, max_seg_num)?; // select the blocks with the highest depth. if selected_segs.is_empty() { let result = @@ -136,13 +131,7 @@ impl FuseTable { } } else { selected_seg_num = selected_segs.len() as u64; - let selected_segments = selected_segs - .into_iter() - .map(|i| compact_segments[i].clone()) - .collect(); - (recluster_blocks_count, parts) = mutator - .target_select(selected_segments, recluster_mode) - .await?; + (recluster_blocks_count, parts) = mutator.target_select(selected_segs).await?; } if !parts.is_empty() || limit.is_some() { @@ -172,49 +161,68 @@ impl FuseTable { let mut block_count = 0; let max_threads = mutator.ctx.get_settings().get_max_threads()? as usize; - let semaphore = Arc::new(Semaphore::new(max_threads)); - let (tx, mut rx) = mpsc::channel(1); - let runtime = GlobalIORuntime::instance(); - let mut handles = Vec::new(); + let mut segment_batches = Vec::new(); let latest = compact_segments.len() - 1; for (idx, compact_segment) in compact_segments.into_iter().enumerate() { - if !mutator.segment_can_recluster(&compact_segment.1.summary) { + let segment = + SelectedReclusterSegment::create(&mutator, compact_segment.0, compact_segment.1); + if !(segment.stats.level >= 0 + || (segment.info.summary.block_count as usize) + < mutator.block_thresholds.block_per_segment) + { continue; } - block_count += compact_segment.1.summary.block_count as usize; - selected_segs.push(compact_segment); + block_count += segment.info.summary.block_count as usize; + selected_segs.push(segment); if block_count >= mutator.block_thresholds.block_per_segment || idx == latest { - let selected_segs = std::mem::take(&mut selected_segs); - let mutator_clone = mutator.clone(); - let tx_clone = tx.clone(); - let permit = acquire_task_permit(semaphore.clone()).await?; - let handle = runtime.spawn(async move { - let seg_num = selected_segs.len() as u64; - let (block_num, parts) = mutator_clone - .target_select(selected_segs, ReclusterMode::Recluster) - .await?; - drop(permit); - if !parts.is_empty() { - let _ = tx_clone.send((seg_num, block_num, parts)).await; - } - Ok::<_, ErrorCode>(()) - }); - handles.push(handle); + segment_batches.push(std::mem::take(&mut selected_segs)); block_count = 0; } } - drop(tx); - let result = select! { - res = rx.recv() => res, - _ = async { - futures::future::join_all(handles).await; - None::<(usize, u64, ReclusterParts)> - } => None, + if segment_batches.is_empty() { + return Ok(None); + } + + let evaluate_batch = |selected_segs: Vec| { + let mutator = mutator.clone(); + async move { + let seg_num = selected_segs.len() as u64; + let (block_num, parts) = mutator.target_select(selected_segs).await?; + Ok::<_, databend_common_exception::ErrorCode>((seg_num, block_num, parts)) + } }; - Ok(result) + + if segment_batches.len() == 1 { + let selected_segs = segment_batches.pop().unwrap(); + let (seg_num, block_num, parts) = evaluate_batch(selected_segs).await?; + return Ok((!parts.is_empty()).then_some((seg_num, block_num, parts))); + } + + let concurrency = max_threads.min(segment_batches.len()); + let mut batches = segment_batches.into_iter(); + let mut pending = FuturesUnordered::new(); + + for _ in 0..concurrency { + if let Some(selected_segs) = batches.next() { + pending.push(evaluate_batch(selected_segs)); + } + } + + while let Some(result) = pending.next().await { + let (seg_num, block_num, parts) = result?; + if !parts.is_empty() { + return Ok(Some((seg_num, block_num, parts))); + } + + if let Some(selected_segs) = batches.next() { + pending.push(evaluate_batch(selected_segs)); + } + } + + Ok(None) } pub async fn segment_pruning( diff --git a/src/query/storages/fuse/src/operations/replace_into/mutator/replace_into_operation_agg.rs b/src/query/storages/fuse/src/operations/replace_into/mutator/replace_into_operation_agg.rs index c3e07412433e6..f3c8cb69f8e4f 100644 --- a/src/query/storages/fuse/src/operations/replace_into/mutator/replace_into_operation_agg.rs +++ b/src/query/storages/fuse/src/operations/replace_into/mutator/replace_into_operation_agg.rs @@ -157,8 +157,6 @@ impl ReplaceIntoOperationAggregator { table_schema.clone(), projection, false, - update_stream_columns, - false, ) }?; @@ -173,8 +171,6 @@ impl ReplaceIntoOperationAggregator { table_schema.clone(), projection, false, - update_stream_columns, - false, )?; Some(reader) } diff --git a/src/query/storages/fuse/src/operations/table_index.rs b/src/query/storages/fuse/src/operations/table_index.rs index 937a8dd772013..35346d1982c40 100644 --- a/src/query/storages/fuse/src/operations/table_index.rs +++ b/src/query/storages/fuse/src/operations/table_index.rs @@ -129,8 +129,7 @@ pub async fn do_refresh_table_index( // Read data here to keep the order of blocks in segment. let projection = Projection::Columns(field_indices); - let block_reader = - fuse_table.create_block_reader(ctx.clone(), projection, false, false, false)?; + let block_reader = fuse_table.create_block_reader(ctx.clone(), projection, false)?; let meta_locations = fuse_table.meta_location_generator().clone(); let segment_reader = MetaReaders::segment_info_reader(fuse_table.get_operator(), table_schema); diff --git a/src/query/storages/fuse/src/statistics/cluster_statistics.rs b/src/query/storages/fuse/src/statistics/cluster_statistics.rs index 274e651470585..1e3e7c15f0fb3 100644 --- a/src/query/storages/fuse/src/statistics/cluster_statistics.rs +++ b/src/query/storages/fuse/src/statistics/cluster_statistics.rs @@ -16,12 +16,21 @@ use std::cmp::Ordering; use databend_common_exception::Result; use databend_common_expression::BlockThresholds; +use databend_common_expression::ConstantFolder; use databend_common_expression::DataBlock; use databend_common_expression::DataField; +use databend_common_expression::Domain; +use databend_common_expression::Expr; use databend_common_expression::FunctionContext; use databend_common_expression::Scalar; +use databend_common_expression::TableSchema; +use databend_common_expression::types::DataType; +use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_sql::evaluator::BlockOperator; +use databend_storages_common_index::statistics_to_domain; use databend_storages_common_table_meta::meta::ClusterStatistics; +use databend_storages_common_table_meta::meta::StatisticsOfColumns; +use log::warn; #[derive(Clone, Default)] pub struct ClusterStatsGenerator { @@ -128,6 +137,14 @@ impl ClusterStatsGenerator { max.push(right); } + debug_assert!( + min.iter() + .map(Scalar::as_ref) + .cmp(max.iter().map(Scalar::as_ref)) + != Ordering::Greater, + "cluster statistics: min > max, data may not be sorted by cluster key" + ); + let level = if min == max && self .block_thresholds @@ -191,3 +208,128 @@ pub fn sort_by_cluster_stats( _ => Ordering::Equal, } } + +pub fn get_min_max_stats( + exprs: &[Expr], + col_stats: &StatisticsOfColumns, + cluster_stats: Option<&ClusterStatistics>, + default_key_id: Option, + schema: &TableSchema, +) -> (Vec, Vec) { + if let Some(default_key_id) = default_key_id { + if let Some(v) = cluster_stats { + if v.cluster_key_id == default_key_id { + // Cluster stats min/max are guaranteed when generated; reuse them directly. + return (v.min().clone(), v.max().clone()); + } + } + } + + let func_ctx = FunctionContext::default(); + let mut mins = Vec::with_capacity(exprs.len()); + let mut maxs = Vec::with_capacity(exprs.len()); + for expr in exprs { + // Since the hilbert index does not calc domain, set min max directly. + if expr.data_type().remove_nullable() == DataType::Binary { + mins.push(Scalar::Binary(vec![])); + maxs.push(Scalar::Binary(vec![0xFF; 40])); + continue; + } + + let input_domains = expr + .column_refs() + .into_iter() + .map(|(index, ty)| { + let column_ids = schema.field(index).leaf_column_ids(); + let stats = column_ids + .iter() + .filter_map(|column_id| col_stats.get(column_id)) + .collect(); + let domain = statistics_to_domain(stats, &ty); + (index, domain) + }) + .collect(); + + let (_, domain_opt) = + ConstantFolder::fold_with_domain(expr, &input_domains, &func_ctx, &BUILTIN_FUNCTIONS); + let domain = domain_opt.unwrap_or_else(|| Domain::full(expr.data_type())); + let (mut min, mut max) = domain.to_minmax(); + if min.as_ref().cmp(&max.as_ref()) == Ordering::Greater { + warn!("invalid cluster key expression range, fallback to full domain"); + (min, max) = Domain::full(expr.data_type()).to_minmax(); + } + mins.push(min); + maxs.push(max); + } + + (mins, maxs) +} + +#[cfg(test)] +mod tests { + use databend_common_expression::ColumnRef; + use databend_common_expression::TableDataType; + use databend_common_expression::TableField; + use databend_common_expression::types::NumberDataType; + use databend_common_expression::types::number::NumberScalar; + use databend_storages_common_table_meta::meta::ColumnStatistics; + + use super::*; + + fn int32_scalar(value: i32) -> Scalar { + Scalar::Number(NumberScalar::Int32(value)) + } + + fn int32_column_expr(index: usize, name: &str) -> Expr { + Expr::ColumnRef(ColumnRef { + span: None, + id: index, + data_type: DataType::Number(NumberDataType::Int32), + display_name: name.to_string(), + }) + } + + fn int32_schema(names: &[&str]) -> TableSchema { + TableSchema::new( + names + .iter() + .map(|name| TableField::new(name, TableDataType::Number(NumberDataType::Int32))) + .collect(), + ) + } + + fn int32_col_stats(ranges: &[(u32, i32, i32)]) -> StatisticsOfColumns { + let mut col_stats = StatisticsOfColumns::new(); + for (column_id, min, max) in ranges { + col_stats.insert( + *column_id, + ColumnStatistics::new(int32_scalar(*min), int32_scalar(*max), 0, 0, None), + ); + } + col_stats + } + + #[test] + fn test_get_min_max_stats_expands_multi_column_range() { + let schema = int32_schema(&["a", "b"]); + let exprs = vec![int32_column_expr(0, "a"), int32_column_expr(1, "b")]; + let col_stats = int32_col_stats(&[(0, 1, 3), (1, 2, 5)]); + + let (min, max) = get_min_max_stats(&exprs, &col_stats, None, Some(0), &schema); + + assert_eq!(min, vec![int32_scalar(1), int32_scalar(2)]); + assert_eq!(max, vec![int32_scalar(3), int32_scalar(5)]); + } + + #[test] + fn test_get_min_max_stats_falls_back_on_invalid_expression_range() { + let schema = int32_schema(&["a"]); + let exprs = vec![int32_column_expr(0, "a")]; + let col_stats = int32_col_stats(&[(0, 10, 1)]); + + let (min, max) = get_min_max_stats(&exprs, &col_stats, None, Some(0), &schema); + + assert_eq!(min, vec![int32_scalar(i32::MIN)]); + assert_eq!(max, vec![int32_scalar(i32::MAX)]); + } +} diff --git a/src/query/storages/fuse/src/statistics/mod.rs b/src/query/storages/fuse/src/statistics/mod.rs index 34f5250bfcfbd..6604123360a39 100644 --- a/src/query/storages/fuse/src/statistics/mod.rs +++ b/src/query/storages/fuse/src/statistics/mod.rs @@ -24,6 +24,7 @@ pub use accumulator::ColumnHLLAccumulator; pub use accumulator::RowOrientedSegmentBuilder; pub use accumulator::VirtualColumnAccumulator; pub use cluster_statistics::ClusterStatsGenerator; +pub use cluster_statistics::get_min_max_stats; pub use cluster_statistics::sort_by_cluster_stats; pub use column_statistic::END_OF_UNICODE_RANGE; pub use column_statistic::STATS_STRING_PREFIX_LEN; diff --git a/src/query/storages/fuse/src/table_functions/clustering_information.rs b/src/query/storages/fuse/src/table_functions/clustering_information.rs index 9dd19bc9d0e6e..fece74b8c976e 100644 --- a/src/query/storages/fuse/src/table_functions/clustering_information.rs +++ b/src/query/storages/fuse/src/table_functions/clustering_information.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::cmp; -use std::cmp::Ordering; use std::collections::BTreeMap; use std::collections::HashMap; use std::sync::Arc; @@ -25,11 +24,7 @@ use databend_common_catalog::table_args::parse_table_name; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::BlockEntry; -use databend_common_expression::ConstantFolder; use databend_common_expression::DataBlock; -use databend_common_expression::Domain; -use databend_common_expression::Expr; -use databend_common_expression::FunctionContext; use databend_common_expression::Scalar; use databend_common_expression::TableDataType; use databend_common_expression::TableField; @@ -41,21 +36,19 @@ use databend_common_expression::types::DataType; use databend_common_expression::types::StringType; use databend_common_expression::types::TimestampType; use databend_common_expression::types::VariantType; -use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_sql::analyze_cluster_keys; -use databend_storages_common_index::statistics_to_domain; -use databend_storages_common_table_meta::meta::BlockMeta; +use databend_common_sql::parse_cluster_keys; use databend_storages_common_table_meta::meta::CompactSegmentInfo; use databend_storages_common_table_meta::meta::SegmentInfo; use databend_storages_common_table_meta::table::ClusterType; use jsonb::Value as JsonbValue; -use log::warn; use serde::Serialize; use crate::FuseTable; use crate::Table; use crate::io::SegmentsIO; use crate::sessions::TableContext; +use crate::statistics::get_min_max_stats; use crate::table_functions::SimpleArgFunc; use crate::table_functions::SimpleArgFuncTemplate; use crate::table_functions::parse_db_tb_opt_args; @@ -171,17 +164,8 @@ impl<'a> ClusteringInformationImpl<'a> { let (cluster_key, exprs) = analyze_cluster_keys(self.ctx.clone(), Arc::new(self.table.clone()), b)?; let exprs = exprs - .iter() - .map(|k| { - k.project_column_ref(|index| { - Ok(self - .table - .schema() - .field(index.as_usize()) - .name() - .to_string()) - }) - }) + .into_iter() + .map(|expr| expr.project_column_ref(|index| Ok(index.as_usize()))) .collect::>>()?; if a.is_some() && a.unwrap() == cluster_key { default_cluster_key_id = self.table.cluster_key_id(); @@ -189,11 +173,12 @@ impl<'a> ClusteringInformationImpl<'a> { (cluster_key, exprs) } (Some(a), None) => { - let exprs = self.table.linear_cluster_keys(self.ctx.clone()); - let exprs = exprs - .iter() - .map(|k| k.as_expr(&BUILTIN_FUNCTIONS)) - .collect(); + let cluster_keys = self.table.resolve_cluster_keys().unwrap(); + let exprs = parse_cluster_keys( + self.ctx.clone(), + Arc::new(self.table.clone()), + cluster_keys, + )?; default_cluster_key_id = self.table.cluster_key_id(); (a.to_string(), exprs) } @@ -242,26 +227,17 @@ impl<'a> ClusteringInformationImpl<'a> { for segment in segments.into_iter().flatten() { for block in segment.blocks { - let (min, max) = - get_min_max_stats(&exprs, &block, schema.clone(), default_cluster_key_id); + let (min, max) = get_min_max_stats( + &exprs, + &block.col_stats, + block.cluster_stats.as_ref(), + default_cluster_key_id, + schema.as_ref(), + ); assert_eq!(min.len(), max.len()); - let (min, max) = match min - .iter() - .map(Scalar::as_ref) - .cmp(max.iter().map(Scalar::as_ref)) - { - Ordering::Equal => { - constant_block_count += 1; - (min, max) - } - Ordering::Less => (min, max), - Ordering::Greater => { - warn!( - "clustering_information: please check your data and perform recluster to resort." - ); - (max, min) - } - }; + if min == max { + constant_block_count += 1; + } points_map .entry(min) @@ -486,56 +462,6 @@ struct HilbertClusterStatistics { unclustered_block_count: u64, } -fn get_min_max_stats( - exprs: &[Expr], - block: &BlockMeta, - schema: Arc, - default_key_id: Option, -) -> (Vec, Vec) { - if let Some(default_key_id) = default_key_id { - if let Some(v) = &block.cluster_stats { - if v.cluster_key_id == default_key_id { - return (v.min().clone(), v.max().clone()); - } - } - } - - let func_ctx = FunctionContext::default(); - let mut mins = Vec::with_capacity(exprs.len()); - let mut maxs = Vec::with_capacity(exprs.len()); - let col_stats = &block.col_stats; - for expr in exprs { - // Since the hilbert index does not calc domain, set min max directly. - if expr.data_type().remove_nullable() == DataType::Binary { - mins.push(Scalar::Binary(vec![])); - maxs.push(Scalar::Binary(vec![0xFF, 40])); - continue; - } - let input_domains = expr - .column_refs() - .into_iter() - .map(|(name, ty)| { - let column_ids = schema.leaf_columns_of(&name); - let stats = column_ids - .iter() - .filter_map(|column_id| col_stats.get(column_id)) - .collect(); - - let domain = statistics_to_domain(stats, &ty); - (name, domain) - }) - .collect(); - - let (_, domain_opt) = - ConstantFolder::fold_with_domain(expr, &input_domains, &func_ctx, &BUILTIN_FUNCTIONS); - let domain = domain_opt.unwrap_or_else(|| Domain::full(expr.data_type())); - let (min, max) = domain.to_minmax(); - mins.push(min); - maxs.push(max); - } - (mins, maxs) -} - /// The histogram contains buckets with widths: /// 1 to 16 with increments of 1. /// For buckets larger than 16, increments of twice the width of the previous bucket (e.g. 32, 64, 128, …). diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test index 35bb5b464d33f..a0304862e5647 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test @@ -114,40 +114,6 @@ select * from t order by a 9 10 -statement ok -create table m(a uint64 not null) Engine = Fuse - -statement ok -optimize table m all - -statement ok -optimize table m purge - -statement ok -optimize table m compact - -statement ok -drop table m - -statement ok -create table m(a uint64 not null) engine=Fuse - -statement ok -insert into m values(1) - -statement ok -insert into m values(2) - -statement ok -optimize table m all - -statement ok -optimize table m purge - -statement ok -optimize table m compact - - statement ok create table t1(a uint64) @@ -323,15 +289,10 @@ insert into t5 values(8) statement ok optimize table t5 compact -query I -select count() from t5 ----- -9 - query II -select segment_count, block_count from fuse_snapshot('db_09_0008', 't5') limit 1 +select segment_count, row_count from fuse_snapshot('db_09_0008', 't5') limit 1 ---- -1 3 +1 9 @@ -441,18 +402,14 @@ alter table t8 add column b int not null statement ok insert into t8 values(4,4),(5,5),(6,6) -query IIII -select segment_count, block_count, row_count, bytes_uncompressed from fuse_snapshot('db_09_0008', 't8') limit 1 ----- -2 2 6 36 - statement ok optimize table t8 compact query IIII -select segment_count, block_count, row_count, bytes_uncompressed from fuse_snapshot('db_09_0008', 't8') limit 1 +select segment_count, block_count, row_count, bytes_uncompressed from fuse_snapshot('db_09_0008', 't8') limit 2 ---- -1 2 6 48 +1 2 6 36 +2 2 6 36 query II select * from t8 order by a @@ -573,24 +530,6 @@ select * exclude(timestamp) from clustering_information('db_09_0008','t10') ---- (abs(a)) linear {"average_depth":2.75,"average_overlaps":2.0,"block_depth_histogram":{"00002":1,"00003":3},"constant_block_count":1,"total_block_count":4} -# recluster the unclustered block. -statement ok -alter table t10 recluster - -query T -select info:average_depth from clustering_information('db_09_0008','t10') ----- -2.75 - -# compact -statement ok -optimize table t10 compact - -query TTT -select * exclude(timestamp) from clustering_information('db_09_0008','t10') ----- -(abs(a)) linear {"average_depth":3.0,"average_overlaps":2.0,"block_depth_histogram":{"00003":3},"constant_block_count":0,"total_block_count":3} - statement ok alter table t10 recluster @@ -602,7 +541,7 @@ select info:average_depth from clustering_information('db_09_0008','t10') query I select count() from fuse_snapshot('db_09_0008', 't10') ---- -8 +6 statement ok optimize table t10 purge limit 2 @@ -610,7 +549,7 @@ optimize table t10 purge limit 2 query I select count() from fuse_snapshot('db_09_0008', 't10') ---- -6 +4 @@ -797,45 +736,43 @@ select * exclude(timestamp) from clustering_information('db_09_0008','t14') ---- (a, b) linear {"average_depth":1.0,"average_overlaps":0.0,"block_depth_histogram":{"00001":1},"constant_block_count":0,"total_block_count":1} +statement ok +create table t15(a int, b int) cluster by(a) row_per_block=2 statement ok -create table t15(a int not null) row_per_block=3 +insert into t15 values(1, 1), (2, 2); statement ok -insert into t15 values(0),(1),(4) +insert into t15 values(3, 9), (4, 10); statement ok -insert into t15 values(3) +insert into t15 values(5, 3), (6, 4); statement ok -insert into t15 values(-6),(-8) +insert into t15 values(7, 5), (8, 6); statement ok -ALTER TABLE t15 cluster by(abs(a)) +alter table t15 cluster by(b); statement ok -insert into t15 values(2),(5),(-7) +alter table t15 recluster final; -query TTT -select * exclude(timestamp) from clustering_information('db_09_0008','t15') +query III +select segment_count, block_count, row_count from fuse_snapshot('db_09_0008','t15') limit 2; ---- -(abs(a)) linear {"average_depth":2.75,"average_overlaps":2.0,"block_depth_histogram":{"00002":1,"00003":3},"constant_block_count":1,"total_block_count":4} +1 4 8 +4 4 8 -statement ok -alter table t15 recluster - -query TTT -select * exclude(timestamp) from clustering_information('db_09_0008','t15') +query TTI +select min, max, level from clustering_statistics('db_09_0008','t15'); ---- -(abs(a)) linear {"average_depth":3.0,"average_overlaps":2.0,"block_depth_histogram":{"00003":3},"constant_block_count":0,"total_block_count":3} +[1] [2] 0 +[3] [4] 0 +[5] [6] 0 +[9] [10] 0 statement ok -alter table t15 recluster - -query TTTT -select cluster_key, type, info:average_overlaps, info:average_depth from clustering_information('db_09_0008','t15') ----- -(abs(a)) linear 0.0 1.0 +drop table t15 all statement ok create table t16(a int, b int) file_size = 100; @@ -871,4 +808,39 @@ statement ok drop table t16 all; statement ok -DROP DATABASE db_09_0008 \ No newline at end of file +create table t17(a int not null) row_per_block=3 block_per_segment=1; + +statement ok +insert into t17 values(1), (2), (3); + +query I +select count() from fuse_snapshot('db_09_0008', 't17'); +---- +1 + +statement ok +alter table t17 add column b int not null; + +query I +select count() from fuse_snapshot('db_09_0008', 't17'); +---- +2 + +statement ok +optimize table t17 compact; + +query I +select count() from fuse_snapshot('db_09_0008', 't17'); +---- +2 + +query II +select segment_count, block_count from fuse_snapshot('db_09_0008', 't17') limit 1; +---- +1 1 + +statement ok +drop table t17 all; + +statement ok +DROP DATABASE db_09_0008 diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0016_remote_alter_recluster.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0016_remote_alter_recluster.test index 54c81ddd2b7d2..2bd0eb6cec296 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0016_remote_alter_recluster.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0016_remote_alter_recluster.test @@ -108,56 +108,6 @@ insert into t4 select number % 2, to_string(number) from numbers(500) statement ok alter table t4 recluster final -## test recluster with compact task. -statement ok -create table t5(a int, b int); - -statement ok -set compact_max_block_selection = 2; - -statement ok -insert into t5 values(1, 1),(6, 6); - -statement ok -insert into t5 values(0, 0),(3, 3); - -statement ok -insert into t5 values(2, 2),(4, 4); - -statement ok -alter table t5 cluster by(a); - -statement ok -alter table t5 recluster; - -query T -select info from clustering_information('db_09_0016','t5'); ----- -{"average_depth":2.0,"average_overlaps":1.0,"block_depth_histogram":{"00002":2},"constant_block_count":0,"total_block_count":2} - -query TTI -select min, max, level from clustering_statistics('db_09_0016','t5') order by segment_name; ----- -NULL NULL NULL -[0] [4] 0 - -statement ok -alter table t5 recluster; - -query TTI -select min, max, level from clustering_statistics('db_09_0016','t5') order by segment_name; ----- -[0] [4] 0 -[1] [6] 0 - -statement ok -alter table t5 recluster; - -query TTI -select min, max, level from clustering_statistics('db_09_0016','t5') order by segment_name; ----- -[0] [6] 1 - statement ok DROP DATABASE db_09_0016 diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0041_auto_compaction.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0041_auto_compaction.test index ce6316e06681f..c00505905c6fc 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0041_auto_compaction.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0041_auto_compaction.test @@ -100,15 +100,12 @@ select info:average_depth from clustering_information('i15760', 't1') ---- 1.0 -statement ok -drop table t1 all; - #ISSUE 18859 statement ok create table t2(a int) row_per_block=5; statement ok -insert into t2 select number from numbers(2); +insert into t2 values(0); statement ok insert into t2 select number from numbers(12); @@ -121,7 +118,7 @@ select block_count, row_count from fuse_segment('i15760', 't2'); ---- 1 2 2 12 -1 2 +1 1 statement ok insert into t2 select number from numbers(2); @@ -130,7 +127,7 @@ insert into t2 select number from numbers(2); query II select block_count, row_count from fuse_segment('i15760', 't2'); ---- -3 18 +3 17 statement ok -drop table t2 all; +drop database i15760;