Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions src/query/catalog/src/plan/datasource/datasource_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,40 @@ use crate::plan::PushDownInfo;
use crate::plan::datasource::datasource_info::DataSourceInfo;
use crate::table_args::TableArgs;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)]
pub struct BlockMetaOptions {
// for merge_into target build.
pub reserve_block_index: bool,
// Whether to update stream columns.
pub update_stream_columns: bool,
// used to query internal columns.
pub query_internal_columns: bool,
// used to keep recluster source meta on parts.
pub recluster_source_meta: bool,
}

impl BlockMetaOptions {
pub fn set_reserve_block_index(mut self, reserve_block_index: bool) -> Self {
self.reserve_block_index = reserve_block_index;
self
}

pub fn set_update_stream_columns(mut self, update_stream_columns: bool) -> Self {
self.update_stream_columns = update_stream_columns;
self
}

pub fn set_query_internal_columns(mut self, query_internal_columns: bool) -> Self {
self.query_internal_columns = query_internal_columns;
self
}

pub fn set_recluster_source_meta(mut self, recluster_source_meta: bool) -> Self {
self.recluster_source_meta = recluster_source_meta;
self
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct DataSourcePlan {
pub source_info: DataSourceInfo,
Expand All @@ -39,8 +73,7 @@ pub struct DataSourcePlan {
pub push_downs: Option<PushDownInfo>,
pub internal_columns: Option<BTreeMap<FieldIndex, InternalColumn>>,
pub base_block_ids: Option<Scalar>,
// used for recluster to update stream columns
pub update_stream_columns: bool,
pub block_meta_options: BlockMetaOptions,

pub table_index: usize,
pub scan_id: usize,
Expand Down
1 change: 1 addition & 0 deletions src/query/catalog/src/plan/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ mod datasource_info;
mod datasource_plan;

pub use datasource_info::*;
pub use datasource_plan::BlockMetaOptions;
pub use datasource_plan::DataSourcePlan;
50 changes: 9 additions & 41 deletions src/query/catalog/src/plan/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,53 +371,21 @@ pub struct ReclusterTask {

pub type BlockMetaWithHLL = (Arc<BlockMeta>, Option<RawBlockHLL>);

#[derive(Clone)]
pub enum ReclusterParts {
Recluster {
tasks: Vec<ReclusterTask>,
remained_blocks: Vec<BlockMetaWithHLL>,
removed_segment_indexes: Vec<usize>,
removed_segment_summary: Statistics,
},
Compact(Partitions),
#[derive(Clone, Default)]
pub struct ReclusterParts {
pub tasks: Vec<ReclusterTask>,
pub remained_blocks: Vec<BlockMetaWithHLL>,
pub removed_segment_indexes: Vec<usize>,
pub removed_segment_summary: Statistics,
}

impl ReclusterParts {
pub fn is_empty(&self) -> bool {
match self {
ReclusterParts::Recluster {
tasks,
remained_blocks,
..
} => tasks.is_empty() && remained_blocks.is_empty(),
ReclusterParts::Compact(parts) => parts.is_empty(),
}
}

pub fn new_recluster_parts() -> Self {
Self::Recluster {
tasks: vec![],
remained_blocks: vec![],
removed_segment_indexes: vec![],
removed_segment_summary: Statistics::default(),
}
}

pub fn new_compact_parts() -> Self {
Self::Compact(Partitions::default())
self.tasks.is_empty() && self.remained_blocks.is_empty()
}

pub fn is_distributed(&self, ctx: Arc<dyn TableContext>) -> bool {
match self {
ReclusterParts::Recluster { tasks, .. } => tasks.len() > 1,
ReclusterParts::Compact(_) => {
(!ctx.get_cluster().is_empty())
&& ctx
.get_settings()
.get_enable_distributed_compact()
.unwrap_or(false)
}
}
pub fn is_distributed(&self, _ctx: Arc<dyn TableContext>) -> bool {
self.tasks.len() > 1
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/query/ee/src/storages/fuse/operations/virtual_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ pub async fn prepare_refresh_virtual_column(
let virtual_column_builder = VirtualColumnBuilder::try_create(ctx.clone(), source_schema)?;

let projection = Projection::Columns(field_indices);
let block_reader =
fuse_table.create_block_reader(ctx.clone(), projection, false, false, false)?;
let block_reader = fuse_table.create_block_reader(ctx.clone(), projection, false)?;

let segment_reader = MetaReaders::segment_info_reader(fuse_table.get_operator(), table_schema);

Expand Down
18 changes: 0 additions & 18 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,24 +689,6 @@ impl DataBlock {
res
}

pub fn split_by_rows_if_needed_no_tail(&self, rows_per_block: usize) -> Vec<Self> {
// Since rows_per_block represents the expected number of rows per block,
// and the minimum number of rows per block is 0.8 * rows_per_block,
// the maximum is taken as 1.8 * rows_per_block.
let max_rows_per_block = (rows_per_block * 9).div_ceil(5);
let mut res = vec![];
let mut offset = 0;
let mut remain_rows = self.num_rows;
while remain_rows >= max_rows_per_block {
let cut = self.slice(offset..(offset + rows_per_block));
res.push(cut);
offset += rows_per_block;
remain_rows -= rows_per_block;
}
res.push(self.slice(offset..(offset + remain_rows)));
res
}

#[inline]
pub fn merge_block(&mut self, block: DataBlock) {
self.entries.reserve(block.num_columns());
Expand Down
6 changes: 4 additions & 2 deletions src/query/expression/src/utils/block_thresholds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,10 @@ impl BlockThresholds {

let bytes_per_block = total_bytes.div_ceil(block_num_by_compressed);
// Adjust the number of blocks based on block size thresholds.
let max_bytes_per_block = self.max_bytes_per_block.min(400 * 1024 * 1024);
let min_bytes_per_block = (self.min_bytes_per_block / 2).min(50 * 1024 * 1024);
let default_bytes_per_block = self.max_bytes_per_block.div_ceil(2);
let max_bytes_per_block =
default_bytes_per_block + default_bytes_per_block.min(DEFAULT_BLOCK_BUFFER_SIZE);
let min_bytes_per_block = self.min_bytes_per_block / 2;
let block_nums = if bytes_per_block > max_bytes_per_block {
// Case 1: If the block size is too bigger.
total_bytes.div_ceil(max_bytes_per_block)
Expand Down
15 changes: 0 additions & 15 deletions src/query/expression/tests/it/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,6 @@ use databend_common_expression::types::StringType;
use databend_common_expression::types::number::NumberScalar;
use databend_common_expression::types::string::StringColumnBuilder;

#[test]
fn test_split_block() {
let value = "abc";
let n = 10;
let block = DataBlock::new_from_columns(vec![Column::String(
StringColumnBuilder::repeat(value, n).build(),
)]);
let sizes = block
.split_by_rows_if_needed_no_tail(3)
.iter()
.map(|b| b.num_rows())
.collect::<Vec<_>>();
assert_eq!(sizes, vec![3, 3, 4]);
}

#[test]
fn test_box_render_block() {
let value = "abc";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,18 @@ impl BlockMetaTransform<BlockCompactMeta> for TransformCompactBlock {
impl TransformCompactBlock {
fn split_blocks(blocks: Vec<DataBlock>, rows_per_block: usize) -> Result<Vec<DataBlock>> {
debug_assert!(!blocks.is_empty());
if blocks.len() == 1 {
return Ok(blocks[0].split_by_rows_if_needed_no_tail(rows_per_block));
}

// Allow the last output block to absorb in most one minimum-sized tail block.
// This stays aligned with BlockThresholds, where min_rows_per_block is 0.8x.
let max_rows_per_block = (rows_per_block * 9).div_ceil(5);
let mut total_rows: usize = blocks.iter().map(DataBlock::num_rows).sum();
let mut blocks = blocks.into_iter();
let mut current = blocks.next();
let mut offset = 0;
let mut output = Vec::new();

// Mirror split_by_rows_if_needed_no_tail, but consume a sequence of blocks
// while preserving their original order. Like the original helper, this
// treats rows_per_block as a target and allows a slightly larger block to
// avoid emitting a tiny tail block.
// Split a sequence of blocks while preserving row order. This treats
// rows_per_block as a target and allows a slightly larger tail block to
// avoid emitting a tiny final block.
while total_rows >= max_rows_per_block {
let mut remain_rows = rows_per_block;
let mut pieces = vec![];
Expand Down Expand Up @@ -124,7 +121,10 @@ impl TransformCompactBlock {
}
}

output.push(DataBlock::concat(&pieces)?);
output.push(match pieces.len() {
1 => pieces.pop().unwrap(),
_ => DataBlock::concat(&pieces)?,
});
total_rows -= rows_per_block;
}

Expand All @@ -133,7 +133,10 @@ impl TransformCompactBlock {
let mut tail = Vec::new();
tail.push(block.slice(offset..block.num_rows()));
tail.extend(blocks);
output.push(DataBlock::concat(&tail)?);
output.push(match tail.len() {
1 => tail.pop().unwrap(),
_ => DataBlock::concat(&tail)?,
});
}

Ok(output)
Expand Down Expand Up @@ -163,48 +166,63 @@ mod tests {
.collect()
}

fn assert_split_matches_reference(blocks: Vec<DataBlock>, rows_per_block: usize) -> Result<()> {
fn assert_split_result(
blocks: Vec<DataBlock>,
rows_per_block: usize,
expected_sizes: &[usize],
expected_values: &[Vec<i32>],
) -> Result<()> {
let actual = TransformCompactBlock::split_blocks(blocks.clone(), rows_per_block)?;
let expected = DataBlock::concat(&blocks)?.split_by_rows_if_needed_no_tail(rows_per_block);

assert_eq!(
actual.iter().map(DataBlock::num_rows).collect::<Vec<_>>(),
expected.iter().map(DataBlock::num_rows).collect::<Vec<_>>()
expected_sizes
);
assert_eq!(
actual.iter().map(block_values).collect::<Vec<_>>(),
expected.iter().map(block_values).collect::<Vec<_>>()
expected_values
);
Ok(())
}

#[test]
fn test_split_blocks_matches_reference_across_block_boundaries() -> Result<()> {
assert_split_matches_reference(
assert_split_result(vec![block_with_range(0, 10)], 3, &[3, 3, 4], &[
vec![0, 1, 2],
vec![3, 4, 5],
vec![6, 7, 8, 9],
])?;
assert_split_result(
vec![
block_with_range(0, 2),
block_with_range(2, 6),
block_with_range(6, 10),
],
3,
&[3, 3, 4],
&[vec![0, 1, 2], vec![3, 4, 5], vec![6, 7, 8, 9]],
)?;
assert_split_matches_reference(
assert_split_result(
vec![
block_with_range(0, 1),
block_with_range(1, 2),
block_with_range(2, 3),
block_with_range(3, 10),
],
4,
&[4, 6],
&[vec![0, 1, 2, 3], vec![4, 5, 6, 7, 8, 9]],
)?;
assert_split_matches_reference(
assert_split_result(
vec![
block_with_range(0, 2),
block_with_range(2, 4),
block_with_range(4, 6),
block_with_range(6, 8),
],
5,
&[8],
&[vec![0, 1, 2, 3, 4, 5, 6, 7]],
)?;
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl OrderedBlockCompactBuilder {
if blocks.total_rows > 2 * thresholds.min_rows_per_block {
block_nums = block_nums.max(blocks.total_rows / thresholds.min_rows_per_block);
}
if blocks.total_bytes > thresholds.max_bytes_per_block {
if blocks.total_bytes > 2 * thresholds.min_bytes_per_block {
block_nums = block_nums.max(blocks.total_bytes / thresholds.min_bytes_per_block);
}
// rows_per_block is a split target for the downstream no-tail splitter rather than
Expand Down
Loading
Loading