Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/query/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3481,6 +3481,14 @@ pub struct SpillConfig {
/// TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes.
#[clap(long, value_name = "PERCENT", default_value = "0")]
pub result_set_spilling_disk_quota_ratio: u64,

/// Total memory for the spill buffer pool in bytes.
#[clap(long, value_name = "VALUE", default_value = "209715200")]
pub spill_buffer_pool_memory: u64,
Comment thread
zhang2014 marked this conversation as resolved.

/// Number of worker tasks in the spill buffer pool.
#[clap(long, value_name = "VALUE", default_value = "2")]
pub spill_buffer_pool_workers: usize,
}

impl Default for SpillConfig {
Expand Down Expand Up @@ -3594,6 +3602,8 @@ mod config_converters {
window_partition_spilling_disk_quota_ratio: spill
.window_partition_spilling_disk_quota_ratio,
result_set_spilling_disk_quota_ratio: spill.result_set_spilling_disk_quota_ratio,
buffer_pool_memory: spill.spill_buffer_pool_memory,
buffer_pool_workers: spill.spill_buffer_pool_workers,
})
}

Expand All @@ -3616,6 +3626,8 @@ mod config_converters {
window_partition_spilling_disk_quota_ratio: value
.window_partition_spilling_disk_quota_ratio,
result_set_spilling_disk_quota_ratio: value.result_set_spilling_disk_quota_ratio,
spill_buffer_pool_memory: value.buffer_pool_memory,
spill_buffer_pool_workers: value.buffer_pool_workers,
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions src/query/config/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,12 @@ pub struct SpillConfig {
/// Maximum percentage of the global local spill quota that HTTP
/// result-set spilling may use for one query.
pub result_set_spilling_disk_quota_ratio: u64,

/// Total memory for the spill buffer pool in bytes.
pub buffer_pool_memory: u64,

/// Number of worker tasks in the spill buffer pool.
pub buffer_pool_workers: usize,
}

impl SpillConfig {
Expand Down Expand Up @@ -503,6 +509,8 @@ impl SpillConfig {
window_partition_spilling_disk_quota_ratio: 60,
// TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes.
result_set_spilling_disk_quota_ratio: 0,
buffer_pool_memory: 200 * 1024 * 1024,
buffer_pool_workers: 2,
}
}
}
Expand All @@ -519,6 +527,8 @@ impl Default for SpillConfig {
window_partition_spilling_disk_quota_ratio: 60,
// TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes.
result_set_spilling_disk_quota_ratio: 0,
buffer_pool_memory: 200 * 1024 * 1024,
buffer_pool_workers: 2,
}
}
}
8 changes: 6 additions & 2 deletions src/query/config/src/mask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ impl SpillConfig {
sort_spilling_disk_quota_ratio,
window_partition_spilling_disk_quota_ratio,
result_set_spilling_disk_quota_ratio,
spill_buffer_pool_memory,
spill_buffer_pool_workers,
} = *self;

Self {
Expand All @@ -227,8 +229,9 @@ impl SpillConfig {
storage: storage.as_ref().map(|storage| storage.mask_display()),
sort_spilling_disk_quota_ratio,
window_partition_spilling_disk_quota_ratio,
// TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes.
result_set_spilling_disk_quota_ratio,
spill_buffer_pool_memory,
spill_buffer_pool_workers,
}
}
}
Expand Down Expand Up @@ -384,8 +387,9 @@ mod tests {
spill_local_disk_max_bytes: 10,
sort_spilling_disk_quota_ratio: 60,
window_partition_spilling_disk_quota_ratio: 30,
// TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes.
result_set_spilling_disk_quota_ratio: 0,
spill_buffer_pool_memory: 209715200,
spill_buffer_pool_workers: 2,
storage: Some(StorageConfig {
typ: "s3".to_string(),
s3: S3StorageConfig {
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/global_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl GlobalServices {
// 4. cluster discovery init.
ClusterDiscovery::init(config, version).await?;

SpillsBufferPool::init();
SpillsBufferPool::init(&config.spill)?;
// TODO(xuanwo):
//
// This part is a bit complex because catalog are used widely in different
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,10 @@ struct PayloadWriter {
impl PayloadWriter {
fn try_create(prefix: &str) -> Result<Self> {
let data_operator = DataOperator::instance();
let target = SpillTarget::from_storage_params(data_operator.spill_params());
let operator = data_operator.spill_operator();
let buffer_pool = SpillsBufferPool::instance();
let file_path = format!("{}/{}", prefix, GlobalUniq::unique());
let spills_data_writer = buffer_pool.writer(operator, file_path.clone(), target)?;
let spills_data_writer = buffer_pool.writer(operator, file_path.clone())?;

Ok(PayloadWriter {
path: file_path,
Expand Down Expand Up @@ -414,10 +413,11 @@ fn restore_payload(
data_schema.clone(),
vec![row_group.clone()],
target,
read_setting,
)?;

let instant = Instant::now();
let data_block = reader.read(read_setting)?;
let data_block = reader.read()?;
let elapsed = instant.elapsed();

let read_size = reader.read_bytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,10 @@ impl<T: GraceMemoryJoin> GraceHashJoin<T> {
self.desc.build_schema.clone(),
data.row_groups,
target,
self.read_settings,
)?;

while let Some(data_block) = reader.read(self.read_settings)? {
while let Some(data_block) = reader.read()? {
self.memory_hash_join.add_block(Some(data_block))?;
}
}
Expand Down Expand Up @@ -417,12 +418,11 @@ pub struct GraceJoinPartition {
impl GraceJoinPartition {
pub fn create(prefix: &str) -> Result<GraceJoinPartition> {
let data_operator = DataOperator::instance();
let target = SpillTarget::from_storage_params(data_operator.spill_params());

let operator = data_operator.spill_operator();
let buffer_pool = SpillsBufferPool::instance();
let file_path = format!("{}/{}", prefix, GlobalUniq::unique());
let spills_data_writer = buffer_pool.writer(operator, file_path.clone(), target)?;
let spills_data_writer = buffer_pool.writer(operator, file_path.clone())?;

Ok(GraceJoinPartition {
path: file_path,
Expand Down Expand Up @@ -500,6 +500,7 @@ impl<'a, T: GraceMemoryJoin> RestoreProbeStream<'a, T> {
self.join.desc.probe_schema.clone(),
data.row_groups,
target,
self.join.read_settings,
)?;
self.spills_reader = Some(reader);
break;
Expand All @@ -511,7 +512,7 @@ impl<'a, T: GraceMemoryJoin> RestoreProbeStream<'a, T> {
}

if let Some(mut spills_reader) = self.spills_reader.take() {
if let Some(v) = spills_reader.read(self.join.read_settings)? {
if let Some(v) = spills_reader.read()? {
self.spills_reader = Some(spills_reader);
return Ok(Some(v));
}
Expand Down
Loading
Loading