Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions src/query/catalog/src/plan/datasource/datasource_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,33 @@ use crate::plan::PushDownInfo;
use crate::plan::datasource::datasource_info::DataSourceInfo;
use crate::table_args::TableArgs;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)]
pub struct BlockMetaOptions {
// for merge_into target build.
pub reserve_block_index: bool,
// Whether to update stream columns.
pub update_stream_columns: bool,
// used to query internal columns.
pub query_internal_columns: bool,
}

impl BlockMetaOptions {
pub fn set_reserve_block_index(mut self, reserve_block_index: bool) -> Self {
self.reserve_block_index = reserve_block_index;
self
}

pub fn set_update_stream_columns(mut self, update_stream_columns: bool) -> Self {
self.update_stream_columns = update_stream_columns;
self
}

pub fn set_query_internal_columns(mut self, query_internal_columns: bool) -> Self {
self.query_internal_columns = query_internal_columns;
self
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct DataSourcePlan {
pub source_info: DataSourceInfo,
Expand All @@ -39,8 +66,7 @@ pub struct DataSourcePlan {
pub push_downs: Option<PushDownInfo>,
pub internal_columns: Option<BTreeMap<FieldIndex, InternalColumn>>,
pub base_block_ids: Option<Scalar>,
// used for recluster to update stream columns
pub update_stream_columns: bool,
pub block_meta_options: BlockMetaOptions,

pub table_index: usize,
pub scan_id: usize,
Expand Down
1 change: 1 addition & 0 deletions src/query/catalog/src/plan/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ mod datasource_info;
mod datasource_plan;

pub use datasource_info::*;
pub use datasource_plan::BlockMetaOptions;
pub use datasource_plan::DataSourcePlan;
50 changes: 9 additions & 41 deletions src/query/catalog/src/plan/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,53 +371,21 @@ pub struct ReclusterTask {

pub type BlockMetaWithHLL = (Arc<BlockMeta>, Option<RawBlockHLL>);

#[derive(Clone)]
pub enum ReclusterParts {
Recluster {
tasks: Vec<ReclusterTask>,
remained_blocks: Vec<BlockMetaWithHLL>,
removed_segment_indexes: Vec<usize>,
removed_segment_summary: Statistics,
},
Compact(Partitions),
#[derive(Clone, Default)]
pub struct ReclusterParts {
pub tasks: Vec<ReclusterTask>,
pub remained_blocks: Vec<BlockMetaWithHLL>,
pub removed_segment_indexes: Vec<usize>,
pub removed_segment_summary: Statistics,
}

impl ReclusterParts {
pub fn is_empty(&self) -> bool {
match self {
ReclusterParts::Recluster {
tasks,
remained_blocks,
..
} => tasks.is_empty() && remained_blocks.is_empty(),
ReclusterParts::Compact(parts) => parts.is_empty(),
}
}

pub fn new_recluster_parts() -> Self {
Self::Recluster {
tasks: vec![],
remained_blocks: vec![],
removed_segment_indexes: vec![],
removed_segment_summary: Statistics::default(),
}
}

pub fn new_compact_parts() -> Self {
Self::Compact(Partitions::default())
self.tasks.is_empty() && self.remained_blocks.is_empty()
}

pub fn is_distributed(&self, ctx: Arc<dyn TableContext>) -> bool {
match self {
ReclusterParts::Recluster { tasks, .. } => tasks.len() > 1,
ReclusterParts::Compact(_) => {
(!ctx.get_cluster().is_empty())
&& ctx
.get_settings()
.get_enable_distributed_compact()
.unwrap_or(false)
}
}
pub fn is_distributed(&self, _ctx: Arc<dyn TableContext>) -> bool {
self.tasks.len() > 1
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/query/ee/src/storages/fuse/operations/virtual_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ pub async fn prepare_refresh_virtual_column(
let virtual_column_builder = VirtualColumnBuilder::try_create(ctx.clone(), source_schema)?;

let projection = Projection::Columns(field_indices);
let block_reader =
fuse_table.create_block_reader(ctx.clone(), projection, false, false, false)?;
let block_reader = fuse_table.create_block_reader(ctx.clone(), projection, false)?;

let segment_reader = MetaReaders::segment_info_reader(fuse_table.get_operator(), table_schema);

Expand Down
18 changes: 0 additions & 18 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,24 +689,6 @@ impl DataBlock {
res
}

pub fn split_by_rows_if_needed_no_tail(&self, rows_per_block: usize) -> Vec<Self> {
// Since rows_per_block represents the expected number of rows per block,
// and the minimum number of rows per block is 0.8 * rows_per_block,
// the maximum is taken as 1.8 * rows_per_block.
let max_rows_per_block = (rows_per_block * 9).div_ceil(5);
let mut res = vec![];
let mut offset = 0;
let mut remain_rows = self.num_rows;
while remain_rows >= max_rows_per_block {
let cut = self.slice(offset..(offset + rows_per_block));
res.push(cut);
offset += rows_per_block;
remain_rows -= rows_per_block;
}
res.push(self.slice(offset..(offset + remain_rows)));
res
}

#[inline]
pub fn merge_block(&mut self, block: DataBlock) {
self.entries.reserve(block.num_columns());
Expand Down
102 changes: 70 additions & 32 deletions src/query/expression/src/utils/block_thresholds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use databend_common_io::constants::DEFAULT_BLOCK_COMPRESSED_SIZE;
use databend_common_io::constants::DEFAULT_BLOCK_PER_SEGMENT;
use databend_common_io::constants::DEFAULT_BLOCK_ROW_COUNT;

const MAX_BYTES_PER_BLOCK_FACTOR: usize = 2;

#[derive(Clone, Copy, Debug, serde::Serialize, serde::Deserialize)]
pub struct BlockThresholds {
pub max_rows_per_block: usize,
Expand All @@ -35,11 +37,11 @@ impl Default for BlockThresholds {
fn default() -> BlockThresholds {
BlockThresholds {
max_rows_per_block: DEFAULT_BLOCK_ROW_COUNT,
min_rows_per_block: (DEFAULT_BLOCK_ROW_COUNT * 4).div_ceil(5),
max_bytes_per_block: DEFAULT_BLOCK_BUFFER_SIZE * 2,
min_bytes_per_block: (DEFAULT_BLOCK_BUFFER_SIZE * 4).div_ceil(5),
min_rows_per_block: Self::min_block_threshold(DEFAULT_BLOCK_ROW_COUNT),
max_bytes_per_block: DEFAULT_BLOCK_BUFFER_SIZE * MAX_BYTES_PER_BLOCK_FACTOR,
min_bytes_per_block: Self::min_block_threshold(DEFAULT_BLOCK_BUFFER_SIZE),
max_compressed_per_block: DEFAULT_BLOCK_COMPRESSED_SIZE,
min_compressed_per_block: (DEFAULT_BLOCK_COMPRESSED_SIZE * 4).div_ceil(5),
min_compressed_per_block: Self::min_block_threshold(DEFAULT_BLOCK_COMPRESSED_SIZE),
block_per_segment: DEFAULT_BLOCK_PER_SEGMENT,
}
}
Expand All @@ -54,15 +56,34 @@ impl BlockThresholds {
) -> Self {
BlockThresholds {
max_rows_per_block,
min_rows_per_block: (max_rows_per_block * 4).div_ceil(5),
max_bytes_per_block: bytes_per_block * 2,
min_bytes_per_block: (bytes_per_block * 4).div_ceil(5),
min_rows_per_block: Self::min_block_threshold(max_rows_per_block),
max_bytes_per_block: bytes_per_block * MAX_BYTES_PER_BLOCK_FACTOR,
min_bytes_per_block: Self::min_block_threshold(bytes_per_block),
max_compressed_per_block,
min_compressed_per_block: (max_compressed_per_block * 4).div_ceil(5),
min_compressed_per_block: Self::min_block_threshold(max_compressed_per_block),
block_per_segment,
}
}

#[inline]
pub fn set_rows_per_block(mut self, rows_per_block: usize) -> Self {
self.max_rows_per_block = rows_per_block;
self.min_rows_per_block = Self::min_block_threshold(rows_per_block);
self
}

#[inline]
pub fn set_bytes_per_block(mut self, bytes_per_block: usize) -> Self {
self.max_bytes_per_block = bytes_per_block * MAX_BYTES_PER_BLOCK_FACTOR;
self.min_bytes_per_block = Self::min_block_threshold(bytes_per_block);
self
}

#[inline]
pub fn min_block_threshold(value: usize) -> usize {
(value * 4).div_ceil(5)
}

#[inline]
pub fn check_perfect_block(
&self,
Expand Down Expand Up @@ -112,63 +133,80 @@ impl BlockThresholds {
}

#[inline]
pub fn calc_rows_for_compact(&self, total_bytes: usize, total_rows: usize) -> usize {
if self.check_for_compact(total_rows, total_bytes) {
return total_rows;
}
pub fn calc_compact_block_num(&self, total_rows: usize, total_bytes: usize) -> usize {
let block_num_by_rows = if total_rows >= 2 * self.min_rows_per_block {
(total_rows / self.max_rows_per_block).max(2)
} else {
1
};

let block_num_by_rows = std::cmp::max(total_rows / self.min_rows_per_block, 1);
let block_num_by_size = total_bytes / self.min_bytes_per_block;
if block_num_by_rows >= block_num_by_size {
return self.max_rows_per_block;
}
total_rows.div_ceil(block_num_by_size)
let bytes_per_block = self.max_bytes_per_block / MAX_BYTES_PER_BLOCK_FACTOR;
let block_num_by_bytes = if total_bytes >= 2 * self.min_bytes_per_block {
(total_bytes / bytes_per_block).max(2)
} else {
1
};

block_num_by_rows.max(block_num_by_bytes).min(total_rows)
}

/// Calculates the optimal number of rows per block based on total data size and row count.
/// Calculates the optimal rows and bytes per block based on total data size and row count.
///
/// # Parameters
/// - `total_bytes`: The total size of the data in bytes.
/// - `total_rows`: The total number of rows in the data.
/// - `total_compressed`: The total compressed size of the data in bytes.
///
/// # Returns
/// - The calculated number of rows per block that satisfies the thresholds.
/// - `(rows_per_block, bytes_per_block)`: rows are used as the sort block size,
/// and bytes are used by ordered compact to keep post-sort blocks near the
/// recluster target.
#[inline]
pub fn calc_rows_for_recluster(
&self,
total_rows: usize,
total_bytes: usize,
total_compressed: usize,
) -> usize {
) -> (usize, usize) {
debug_assert!(total_rows > 0);

let default_bytes_per_block = self
.max_bytes_per_block
.div_ceil(MAX_BYTES_PER_BLOCK_FACTOR);
// Check if the data is compact enough to skip further calculations.
if self.check_for_compact(total_rows, total_bytes)
&& total_compressed < 2 * self.min_compressed_per_block
{
return total_rows;
return (total_rows, default_bytes_per_block);
}

let block_num_by_rows = std::cmp::max(total_rows / self.min_rows_per_block, 1);
let block_num_by_compressed = total_compressed.div_ceil(self.max_compressed_per_block);
// If row-based block count exceeds compressed-based block count, use max rows per block.
if block_num_by_rows >= block_num_by_compressed {
return self.max_rows_per_block;
return (self.max_rows_per_block, default_bytes_per_block);
}

let bytes_per_block = total_bytes.div_ceil(block_num_by_compressed);
// Adjust the number of blocks based on block size thresholds.
let max_bytes_per_block = self.max_bytes_per_block.min(400 * 1024 * 1024);
let min_bytes_per_block = (self.min_bytes_per_block / 2).min(50 * 1024 * 1024);
let block_nums = if bytes_per_block > max_bytes_per_block {
let max_bytes_per_block =
default_bytes_per_block + default_bytes_per_block.min(DEFAULT_BLOCK_BUFFER_SIZE);
if bytes_per_block > max_bytes_per_block {
// Case 1: If the block size is too bigger.
total_bytes.div_ceil(max_bytes_per_block)
} else if bytes_per_block < min_bytes_per_block {
let bytes_per_block = max_bytes_per_block;
let block_nums = total_bytes.div_ceil(bytes_per_block);
(total_rows.div_ceil(block_nums).max(1), bytes_per_block)
} else if bytes_per_block < self.min_bytes_per_block {
// Case 2: If the block size is too smaller.
total_bytes / min_bytes_per_block
let bytes_per_block = self.min_bytes_per_block;
let block_nums = std::cmp::max(total_bytes / bytes_per_block, 1);
(total_rows.div_ceil(block_nums).max(1), bytes_per_block)
} else {
// Case 3: Otherwise, use the compressed-based block count.
block_num_by_compressed
};
total_rows.div_ceil(block_nums.max(1)).max(1)
(
total_rows.div_ceil(block_num_by_compressed).max(1),
bytes_per_block,
)
}
}
}
15 changes: 0 additions & 15 deletions src/query/expression/tests/it/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,6 @@ use databend_common_expression::types::StringType;
use databend_common_expression::types::number::NumberScalar;
use databend_common_expression::types::string::StringColumnBuilder;

#[test]
fn test_split_block() {
let value = "abc";
let n = 10;
let block = DataBlock::new_from_columns(vec![Column::String(
StringColumnBuilder::repeat(value, n).build(),
)]);
let sizes = block
.split_by_rows_if_needed_no_tail(3)
.iter()
.map(|b| b.num_rows())
.collect::<Vec<_>>();
assert_eq!(sizes, vec![3, 3, 4]);
}

#[test]
fn test_box_render_block() {
let value = "abc";
Expand Down
32 changes: 15 additions & 17 deletions src/query/expression/tests/it/block_thresholds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,43 +72,41 @@ fn test_check_too_small() {
}

#[test]
fn test_calc_rows_for_compact() {
fn test_calc_compact_block_num() {
let t = default_thresholds();

assert_eq!(t.calc_rows_for_compact(500_000, 1000), 1000);

// Block number by rows wins → max_rows_per_block
assert_eq!(
t.calc_rows_for_compact(2_000_000, 10_000),
t.max_rows_per_block
);

// Size-based block number wins
assert_eq!(t.calc_rows_for_compact(4_000_000, 2000), 400);
assert_eq!(t.calc_compact_block_num(1000, 500_000), 1);
assert_eq!(t.calc_compact_block_num(1800, 500_000), 2);
assert_eq!(t.calc_compact_block_num(10_000, 500_000), 10);
assert_eq!(t.calc_compact_block_num(1000, 1_800_000), 2);
assert_eq!(t.calc_compact_block_num(1000, 4_000_000), 4);
}

#[test]
fn test_calc_rows_for_recluster() {
let t = default_thresholds();

// compact enough to skip further calculations
assert_eq!(t.calc_rows_for_recluster(1000, 500_000, 100_000), 1000);
assert_eq!(
t.calc_rows_for_recluster(1000, 500_000, 100_000),
(1000, 1_000_000)
);

// row-based block count exceeds compressed-based block count, use max rows per block.
assert_eq!(
t.calc_rows_for_recluster(10_000, 2_000_000, 100_000),
t.max_rows_per_block
(t.max_rows_per_block, 1_000_000)
);

// Case 1: If the block size is too bigger.
let result = t.calc_rows_for_recluster(4_000, 30_000_000, 600_000);
assert_eq!(result, 267);
assert_eq!(result, (267, 2_000_000));

// Case 2: If the block size is too smaller.
let result = t.calc_rows_for_recluster(4_000, 2_000_000, 600_000);
assert_eq!(result, 800);
let result = t.calc_rows_for_recluster(4_000, 1_600_000, 600_000);
assert_eq!(result, (2000, 800_000));

// Case 3: use the compressed-based block count.
let result = t.calc_rows_for_recluster(4_000, 10_000_000, 600_000);
assert_eq!(result, 667);
assert_eq!(result, (667, 1_666_667));
}
Loading
Loading