diff --git a/src/query/config/src/config.rs b/src/query/config/src/config.rs index f1c235e982e5e..464894447ad28 100644 --- a/src/query/config/src/config.rs +++ b/src/query/config/src/config.rs @@ -3481,6 +3481,14 @@ pub struct SpillConfig { /// TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes. #[clap(long, value_name = "PERCENT", default_value = "0")] pub result_set_spilling_disk_quota_ratio: u64, + + /// Total memory for the spill buffer pool in bytes. + #[clap(long, value_name = "VALUE", default_value = "209715200")] + pub spill_buffer_pool_memory: u64, + + /// Number of worker tasks in the spill buffer pool. + #[clap(long, value_name = "VALUE", default_value = "2")] + pub spill_buffer_pool_workers: usize, } impl Default for SpillConfig { @@ -3594,6 +3602,8 @@ mod config_converters { window_partition_spilling_disk_quota_ratio: spill .window_partition_spilling_disk_quota_ratio, result_set_spilling_disk_quota_ratio: spill.result_set_spilling_disk_quota_ratio, + buffer_pool_memory: spill.spill_buffer_pool_memory, + buffer_pool_workers: spill.spill_buffer_pool_workers, }) } @@ -3616,6 +3626,8 @@ mod config_converters { window_partition_spilling_disk_quota_ratio: value .window_partition_spilling_disk_quota_ratio, result_set_spilling_disk_quota_ratio: value.result_set_spilling_disk_quota_ratio, + spill_buffer_pool_memory: value.buffer_pool_memory, + spill_buffer_pool_workers: value.buffer_pool_workers, } } } diff --git a/src/query/config/src/inner.rs b/src/query/config/src/inner.rs index 57b40ce7c8617..fba85be41b145 100644 --- a/src/query/config/src/inner.rs +++ b/src/query/config/src/inner.rs @@ -433,6 +433,12 @@ pub struct SpillConfig { /// Maximum percentage of the global local spill quota that HTTP /// result-set spilling may use for one query. pub result_set_spilling_disk_quota_ratio: u64, + + /// Total memory for the spill buffer pool in bytes. + pub buffer_pool_memory: u64, + + /// Number of worker tasks in the spill buffer pool. + pub buffer_pool_workers: usize, } impl SpillConfig { @@ -503,6 +509,8 @@ impl SpillConfig { window_partition_spilling_disk_quota_ratio: 60, // TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes. result_set_spilling_disk_quota_ratio: 0, + buffer_pool_memory: 200 * 1024 * 1024, + buffer_pool_workers: 2, } } } @@ -519,6 +527,8 @@ impl Default for SpillConfig { window_partition_spilling_disk_quota_ratio: 60, // TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes. result_set_spilling_disk_quota_ratio: 0, + buffer_pool_memory: 200 * 1024 * 1024, + buffer_pool_workers: 2, } } } diff --git a/src/query/config/src/mask.rs b/src/query/config/src/mask.rs index 5bfe26f62d13f..8a9f0466cb78d 100644 --- a/src/query/config/src/mask.rs +++ b/src/query/config/src/mask.rs @@ -218,6 +218,8 @@ impl SpillConfig { sort_spilling_disk_quota_ratio, window_partition_spilling_disk_quota_ratio, result_set_spilling_disk_quota_ratio, + spill_buffer_pool_memory, + spill_buffer_pool_workers, } = *self; Self { @@ -227,8 +229,9 @@ impl SpillConfig { storage: storage.as_ref().map(|storage| storage.mask_display()), sort_spilling_disk_quota_ratio, window_partition_spilling_disk_quota_ratio, - // TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes. result_set_spilling_disk_quota_ratio, + spill_buffer_pool_memory, + spill_buffer_pool_workers, } } } @@ -384,8 +387,9 @@ mod tests { spill_local_disk_max_bytes: 10, sort_spilling_disk_quota_ratio: 60, window_partition_spilling_disk_quota_ratio: 30, - // TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes. result_set_spilling_disk_quota_ratio: 0, + spill_buffer_pool_memory: 209715200, + spill_buffer_pool_workers: 2, storage: Some(StorageConfig { typ: "s3".to_string(), s3: S3StorageConfig { diff --git a/src/query/service/src/global_services.rs b/src/query/service/src/global_services.rs index 3b8de007a57dd..9d02b1c315fd4 100644 --- a/src/query/service/src/global_services.rs +++ b/src/query/service/src/global_services.rs @@ -115,7 +115,7 @@ impl GlobalServices { // 4. cluster discovery init. ClusterDiscovery::init(config, version).await?; - SpillsBufferPool::init(); + SpillsBufferPool::init(&config.spill)?; // TODO(xuanwo): // // This part is a bit complex because catalog are used widely in different diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs index 9d1e5f08d7c7c..3ddfe76212940 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs @@ -54,11 +54,10 @@ struct PayloadWriter { impl PayloadWriter { fn try_create(prefix: &str) -> Result { let data_operator = DataOperator::instance(); - let target = SpillTarget::from_storage_params(data_operator.spill_params()); let operator = data_operator.spill_operator(); let buffer_pool = SpillsBufferPool::instance(); let file_path = format!("{}/{}", prefix, GlobalUniq::unique()); - let spills_data_writer = buffer_pool.writer(operator, file_path.clone(), target)?; + let spills_data_writer = buffer_pool.writer(operator, file_path.clone())?; Ok(PayloadWriter { path: file_path, @@ -414,10 +413,11 @@ fn restore_payload( data_schema.clone(), vec![row_group.clone()], target, + read_setting, )?; let instant = Instant::now(); - let data_block = reader.read(read_setting)?; + let data_block = reader.read()?; let elapsed = instant.elapsed(); let read_size = reader.read_bytes(); diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs index 01b3288fb2dbb..48459ec982d9f 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs @@ -278,9 +278,10 @@ impl GraceHashJoin { self.desc.build_schema.clone(), data.row_groups, target, + self.read_settings, )?; - while let Some(data_block) = reader.read(self.read_settings)? { + while let Some(data_block) = reader.read()? { self.memory_hash_join.add_block(Some(data_block))?; } } @@ -417,12 +418,11 @@ pub struct GraceJoinPartition { impl GraceJoinPartition { pub fn create(prefix: &str) -> Result { let data_operator = DataOperator::instance(); - let target = SpillTarget::from_storage_params(data_operator.spill_params()); let operator = data_operator.spill_operator(); let buffer_pool = SpillsBufferPool::instance(); let file_path = format!("{}/{}", prefix, GlobalUniq::unique()); - let spills_data_writer = buffer_pool.writer(operator, file_path.clone(), target)?; + let spills_data_writer = buffer_pool.writer(operator, file_path.clone())?; Ok(GraceJoinPartition { path: file_path, @@ -500,6 +500,7 @@ impl<'a, T: GraceMemoryJoin> RestoreProbeStream<'a, T> { self.join.desc.probe_schema.clone(), data.row_groups, target, + self.join.read_settings, )?; self.spills_reader = Some(reader); break; @@ -511,7 +512,7 @@ impl<'a, T: GraceMemoryJoin> RestoreProbeStream<'a, T> { } if let Some(mut spills_reader) = self.spills_reader.take() { - if let Some(v) = spills_reader.read(self.join.read_settings)? { + if let Some(v) = spills_reader.read()? { self.spills_reader = Some(spills_reader); return Ok(Some(v)); } diff --git a/src/query/service/src/spillers/async_buffer.rs b/src/query/service/src/spillers/async_buffer.rs index 24f6b52bf30e2..4e949cce3290a 100644 --- a/src/query/service/src/spillers/async_buffer.rs +++ b/src/query/service/src/spillers/async_buffer.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cell::Cell; -use std::collections::VecDeque; use std::io; use std::io::Write; use std::ops::Range; @@ -21,14 +19,18 @@ use std::sync::Arc; use std::sync::Condvar; use std::sync::Mutex; use std::sync::PoisonError; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::time::Duration; use std::time::Instant; use arrow_schema::Schema; use bytes::Bytes; use bytes::BytesMut; use databend_common_base::base::GlobalInstance; -use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::Runtime; +use databend_common_base::runtime::spawn; +use databend_common_config::SpillConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::DataBlock; @@ -56,7 +58,6 @@ use parquet::file::properties::EnabledStatistics; use parquet::file::properties::WriterProperties; use super::record_read_profile; -use super::record_write_profile; const CHUNK_SIZE: usize = 4 * 1024 * 1024; #[derive(Clone, Copy)] @@ -70,12 +71,6 @@ impl SpillTarget { matches!(self, SpillTarget::Local) } - /// Derive spill target (local vs remote) from storage params. - /// - /// Today we only treat `StorageParams::Fs` as local, everything - /// else (S3, Azure, memory, etc.) is considered remote. Centralizing - /// this decision here keeps higher-level operators simpler and avoids - /// duplicating the matching logic at each call site. pub fn from_storage_params(params: Option<&StorageParams>) -> Self { match params { Some(StorageParams::Fs(_)) => SpillTarget::Local, @@ -84,78 +79,34 @@ impl SpillTarget { } } -/// Buffer Pool Workflow for Spill Operations: -/// -/// Context: During query execution, when memory pressure is high, intermediate -/// results (hash tables, sorted data, aggregation states) need to be spilled -/// to disk/object storage to free up memory. This buffer pool provides a -/// synchronous Write interface with async I/O underneath, specifically designed -/// for spill scenarios where backpressure control is more important than latency. -/// -/// 1. Initialization: -/// - Create a fixed number of BytesMut buffers (4MB each) based on memory limit -/// - Spawn worker threads that listen for write operations via async channels -/// - Pre-allocate buffers are placed in the available_write_buffers channel -/// -/// 2. Spill Write Operation: -/// - During spill, BufferWriter.write() fills the current buffer with serialized data -/// - When buffer is full, it's frozen to Bytes and added to pending_buffers queue -/// - Writer tries to acquire a new buffer from the pool (non-blocking first) -/// - If no buffer available, triggers async write operation and BLOCKS (crucial for spill) -/// -/// 3. Async Spill Write Process: -/// - Background worker receives BufferWriteOperator containing: -/// - OpenDAL Writer instance (to disk/S3/etc.) -/// - Queue of Bytes buffers containing spilled data -/// - Response channel for completion notification -/// - Worker writes all buffers sequentially to storage via writer.write(buffers) -/// - After writing, attempts to recycle buffers back to pool: -/// - Converts Bytes back to BytesMut (if unique reference) -/// - Clears buffer content and returns to available pool -/// - Notifies completion via condvar to unblock waiting spill thread -/// -/// 4. Spill Backpressure Control (Key Feature): -/// - When pool is exhausted, spill write() BLOCKS until async write completes -/// - This throttles spill speed to match storage I/O capacity -/// - Prevents memory explosion during high-volume spill operations -/// - Query execution naturally pauses when spill storage is slower than data generation -/// -/// 5. Buffer Lifecycle in Spill: -/// - BytesMut (mutable) -> fill with spill data -> freeze to Bytes (immutable) -/// - Bytes -> async write to spill storage -> try_into_mut() -> clear -> back to pool -/// -/// 6. Spill Resource Cleanup: -/// - flush() ensures all pending spill data is written before spill operation completes -/// - Drop trait recycles any remaining buffers back to pool -/// - Critical for spill scenarios where partial writes could corrupt spilled data -/// -/// Spill-Specific Benefits: -/// - Memory-bounded operation prevents OOM during large spills -/// - Synchronous blocking behavior allows query threads to naturally pause -/// - Buffer reuse minimizes GC pressure during intensive spill operations -/// - Automatic flow control matches spill rate to storage bandwidth -/// - Works with any OpenDAL-supported storage (local disk, S3, etc.) pub struct SpillsBufferPool { + _runtime: Arc, working_queue: async_channel::Sender, available_write_buffers: async_channel::Receiver, available_write_buffers_tx: async_channel::Sender, + blocking_nanos: Arc, + blocking_count: Arc, } impl SpillsBufferPool { - pub fn init() { - // TODO: config + pub fn init(config: &SpillConfig) -> Result<()> { GlobalInstance::set(SpillsBufferPool::create( - GlobalIORuntime::instance(), - 200 * 1024 * 1024, - 2, - )) + config.buffer_pool_memory as usize, + config.buffer_pool_workers, + )?); + Ok(()) } pub fn instance() -> Arc { GlobalInstance::get() } - pub fn create(executor: Arc, memory: usize, workers: usize) -> Arc { + pub fn create(memory: usize, workers: usize) -> Result> { + let runtime = Arc::new(Runtime::with_worker_threads( + workers, + Some("spill-worker".to_owned()), + )?); + let (working_tx, working_rx) = async_channel::unbounded(); let (buffers_tx, buffers_rx) = async_channel::unbounded(); @@ -169,10 +120,9 @@ impl SpillsBufferPool { for _ in 0..workers { let working_queue: async_channel::Receiver = working_rx.clone(); - let available_write_buffers = buffers_tx.clone(); - executor.spawn( + runtime.spawn( async_backtrace::location!(String::from("async_buffer")).frame(async move { - let mut background = Background::create(available_write_buffers); + let mut background = Background::create(); while let Ok(op) = working_queue.recv().await { let span = Span::enter_with_parent("Background::recv", op.span()); background.recv(op).in_span(span).await; @@ -181,25 +131,55 @@ impl SpillsBufferPool { ); } - Arc::new(SpillsBufferPool { + let blocking_nanos = Arc::new(AtomicU64::new(0)); + let blocking_count = Arc::new(AtomicU64::new(0)); + + { + let blocking_nanos = Arc::clone(&blocking_nanos); + let blocking_count = Arc::clone(&blocking_count); + runtime.spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(60)).await; + let count = blocking_count.swap(0, Ordering::Relaxed); + let nanos = blocking_nanos.swap(0, Ordering::Relaxed); + if count > 0 { + log::info!( + "SpillsBufferPool alloc blocked {} times, total {:.2}ms in last 60s", + count, + nanos as f64 / 1_000_000.0, + ); + } + } + }); + } + + Ok(Arc::new(SpillsBufferPool { + _runtime: runtime, working_queue: working_tx, available_write_buffers: buffers_rx, available_write_buffers_tx: buffers_tx, - }) - } - - pub(crate) fn try_alloc_buffer(&self) -> Option { - self.available_write_buffers.try_recv().ok() + blocking_nanos, + blocking_count, + })) } pub(crate) fn alloc_buffer(&self) -> std::io::Result { - match self.available_write_buffers.recv_blocking() { + if let Ok(buf) = self.available_write_buffers.try_recv() { + return Ok(buf); + } + + let start = Instant::now(); + let result = match self.available_write_buffers.recv_blocking() { Ok(buf) => Ok(buf), Err(_) => Err(std::io::Error::new( std::io::ErrorKind::BrokenPipe, "buffer pool is closed", )), - } + }; + self.blocking_nanos + .fetch_add(start.elapsed().as_nanos() as u64, Ordering::Relaxed); + self.blocking_count.fetch_add(1, Ordering::Relaxed); + result } pub(crate) fn operator(&self, op: BufferOperator) { @@ -208,21 +188,29 @@ impl SpillsBufferPool { .expect("Buffer pool working queue need unbounded."); } - pub fn buffer_write( - self: &Arc, - writer: Writer, - target: SpillTarget, - ) -> BufferWriter { - BufferWriter::new(writer, self.clone(), target) + pub fn buffer_write(self: &Arc, writer: Writer) -> BufferWriter { + let (buffer_tx, buffer_rx) = async_channel::unbounded::(); + + let response = BufferOperatorResp::pending(); + + self.operator(BufferOperator::WriterTask(BufferWriterTaskOperator { + writer, + buffer_rx, + response: response.clone(), + available_buffers: self.available_write_buffers_tx.clone(), + span: Span::enter_with_local_parent("BufferWriterTask"), + })); + + BufferWriter { + buffer_tx, + current_bytes: None, + buffer_pool: self.clone(), + response, + } } - pub fn writer( - self: &Arc, - op: Operator, - path: String, - target: SpillTarget, - ) -> Result { - let writer = self.buffer_writer(op, path, target)?; + pub fn writer(self: &Arc, op: Operator, path: String) -> Result { + let writer = self.buffer_writer(op, path)?; Ok(SpillsDataWriter::Uninitialize(Some(writer))) } @@ -230,7 +218,6 @@ impl SpillsBufferPool { self: &Arc, op: Operator, path: String, - target: SpillTarget, ) -> Result { let pending_response = BufferOperatorResp::pending(); @@ -245,7 +232,7 @@ impl SpillsBufferPool { .try_send(operator) .expect("Buffer pool working queue need unbounded."); - Ok(self.buffer_write(pending_response.wait_and_take()?, target)) + Ok(self.buffer_write(pending_response.wait_and_take()?)) } pub fn reader( @@ -255,8 +242,17 @@ impl SpillsBufferPool { data_schema: DataSchemaRef, row_groups: Vec, target: SpillTarget, + settings: ReadSettings, ) -> Result { - SpillsDataReader::create(path, op, data_schema, row_groups, self.clone(), target) + SpillsDataReader::create( + path, + op, + data_schema, + row_groups, + self.clone(), + target, + settings, + ) } pub fn fetch_ranges( @@ -288,112 +284,43 @@ impl SpillsBufferPool { } pub struct BufferWriter { - writer: Option, - current_bytes: Option, - buffer_pool: Arc, - pending_buffers: VecDeque, - pending_response: Option>>, - target: SpillTarget, + buffer_tx: async_channel::Sender, + response: Arc>>, } impl BufferWriter { - pub fn new( - writer: Writer, - buffer_pool: Arc, - target: SpillTarget, - ) -> BufferWriter { - BufferWriter { - buffer_pool, - writer: Some(writer), - current_bytes: None, - pending_buffers: VecDeque::new(), - pending_response: None, - target, - } - } - - fn write_buffer(&mut self, wait: bool) -> std::io::Result<()> { - if let Some(pending_response) = self.pending_response.take() { - if wait { - let mut response = pending_response.wait_and_take(); - self.writer = Some(response.writer); - - if let Some(last_error) = response.error.take() { - return Err(last_error); - } - } else { - let locked = pending_response.mutex.lock(); - let mut locked = locked.unwrap_or_else(PoisonError::into_inner); - - if let Some(mut response) = locked.take() { - self.writer = Some(response.writer); - if let Some(last_error) = response.error.take() { - return Err(last_error); - } - } else { - drop(locked); - self.pending_response = Some(pending_response); - } + pub fn close(mut self) -> io::Result { + if let Some(b) = self.current_bytes.take() { + if self.buffer_tx.try_send(b.freeze()).is_err() { + return Err(io::ErrorKind::BrokenPipe.into()); } } - if let Some(writer) = self.writer.take() { - assert!(self.pending_response.is_none()); - - let pending_response = BufferOperatorResp::pending(); - - self.pending_response = Some(pending_response.clone()); - - let operator = BufferOperator::Write(BufferWriteOperator { - span: Span::enter_with_local_parent("BufferWriteOperator"), - writer, - response: pending_response, - buffers: std::mem::take(&mut self.pending_buffers), - target: self.target, - start: Instant::now(), - }); + self.buffer_tx.close(); + self.response.wait_and_take() + } - self.buffer_pool.operator(operator); + pub(super) fn finish(&mut self) -> std::io::Result { + if let Some(b) = self.current_bytes.take() { + if self.buffer_tx.try_send(b.freeze()).is_err() { + return Err(io::ErrorKind::BrokenPipe.into()); + } } - Ok(()) + self.buffer_tx.close(); + self.response.wait_and_take() } - pub fn close(mut self) -> std::io::Result { - self.flush()?; - - if let Some(writer) = self.writer.take() { - let pending_response = BufferOperatorResp::pending(); - - let close_operator = BufferOperator::Close(BufferCloseOperator { - span: Span::enter_with_local_parent("BufferCloseOperator"), - writer, - response: pending_response.clone(), - }); - - self.buffer_pool.operator(close_operator); + fn last_error(&mut self) -> io::Result<()> { + let locked = self.response.mutex.lock(); + let mut locked = locked.unwrap_or_else(PoisonError::into_inner); - return pending_response.wait_and_take().res; + match locked.take_if(|x| x.is_err()) { + Some(Err(err)) => Err(err), + _ => Ok(()), } - - Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Writer already closed", - )) - } - - pub(super) fn finish(&mut self) -> std::io::Result { - std::mem::replace(self, Self { - writer: None, - current_bytes: None, - buffer_pool: self.buffer_pool.clone(), - pending_buffers: Default::default(), - pending_response: None, - target: self.target, - }) - .close() } } @@ -403,6 +330,15 @@ impl io::Write for BufferWriter { return Ok(0); } + { + let locked = self.response.mutex.lock(); + let mut locked = locked.unwrap_or_else(PoisonError::into_inner); + + if let Some(Err(cause)) = locked.take_if(|x| x.is_err()) { + return Err(cause); + } + } + let mut current_bytes = match self.current_bytes.take() { Some(bytes) => bytes, None => self.buffer_pool.alloc_buffer()?, @@ -414,83 +350,47 @@ impl io::Write for BufferWriter { let mut available_space = current_bytes.capacity() - current_bytes.len(); if available_space == 0 { - let pending_bytes = current_bytes.freeze(); - self.pending_buffers.push_back(pending_bytes); - - current_bytes = match self.buffer_pool.try_alloc_buffer() { - Some(buffer) => buffer, - None => { - self.write_buffer(true)?; - self.buffer_pool.alloc_buffer()? - } - }; + if self.buffer_tx.try_send(current_bytes.freeze()).is_err() { + return Err(io::ErrorKind::BrokenPipe.into()); + } + current_bytes = self.buffer_pool.alloc_buffer()?; available_space = current_bytes.capacity() - current_bytes.len(); } let to_write = std::cmp::min(remaining.len(), available_space); current_bytes.extend_from_slice(&remaining[..to_write]); - written += to_write; remaining = &remaining[to_write..]; } if current_bytes.capacity() - current_bytes.len() == 0 { - self.pending_buffers.push_back(current_bytes.freeze()); + if self.buffer_tx.try_send(current_bytes.freeze()).is_err() { + return Err(io::ErrorKind::BrokenPipe.into()); + } } else { self.current_bytes = Some(current_bytes); } - if !self.pending_buffers.is_empty() { - self.write_buffer(false)?; - } - Ok(written) } fn flush(&mut self) -> io::Result<()> { - if let Some(current_bytes) = self.current_bytes.take_if(|bytes| !bytes.is_empty()) { - self.pending_buffers.push_back(current_bytes.freeze()); - } - - if !self.pending_buffers.is_empty() { - self.write_buffer(true)?; - } - - if let Some(pending_response) = self.pending_response.take() { - let BufferWriteResp { writer, mut error } = pending_response.wait_and_take(); - self.writer = Some(writer); - - if let Some(error) = error.take() { - return Err(error); + if let Some(b) = self.current_bytes.take() { + if self.buffer_tx.try_send(b.freeze()).is_err() { + return Err(io::ErrorKind::BrokenPipe.into()); } } - Ok(()) + self.last_error() } } impl Drop for BufferWriter { fn drop(&mut self) { - let pending_buffers = std::mem::take(&mut self.pending_buffers); - - for pending_buffer in pending_buffers { - match pending_buffer.try_into_mut() { - Ok(mut buf) if buf.capacity() == CHUNK_SIZE => { - buf.clear(); - self.buffer_pool.release_buffer(buf); - } - _ => { - log::warn!("Failed to recycle buffer, creating new one"); - let new_buf = BytesMut::with_capacity(CHUNK_SIZE); - self.buffer_pool.release_buffer(new_buf); - } - } - } - - if let Some(mut current_bytes) = self.current_bytes.take() { - current_bytes.clear(); - self.buffer_pool.release_buffer(current_bytes); + if let Some(mut b) = self.current_bytes.take() { + b.clear(); + self.buffer_pool.release_buffer(b); } } } @@ -576,10 +476,7 @@ impl SpillsDataWriter { } pub struct SpillsDataReader { - location: String, - operator: Operator, - row_groups: VecDeque, - spills_buffer_pool: Arc, + receiver: async_channel::Receiver>, data_schema: DataSchemaRef, field_levels: FieldLevels, read_bytes: usize, @@ -594,6 +491,7 @@ impl SpillsDataReader { row_groups: Vec, spills_buffer_pool: Arc, target: SpillTarget, + settings: ReadSettings, ) -> Result { if row_groups.is_empty() { return Err(ErrorCode::Internal( @@ -607,13 +505,20 @@ impl SpillsDataReader { None, )?; - Ok(SpillsDataReader { + let (tx, rx) = async_channel::bounded(1); + spills_buffer_pool.operator(BufferOperator::ReaderTask(ReaderTaskOperator { + span: Span::enter_with_local_parent("ReaderTask"), + op: operator, location, - operator, - spills_buffer_pool, + row_groups, + settings, + sender: tx, + })); + + Ok(SpillsDataReader { + receiver: rx, data_schema, field_levels, - row_groups: VecDeque::from(row_groups), read_bytes: 0, target, }) @@ -623,42 +528,30 @@ impl SpillsDataReader { self.read_bytes } - pub fn read(&mut self, settings: ReadSettings) -> Result> { - let Some(row_group) = self.row_groups.pop_front() else { - return Ok(None); - }; - + pub fn read(&mut self) -> Result> { let start = Instant::now(); - let mut row_group = RowGroupCore::new(row_group, None); - - let read_bytes = Cell::new(0usize); - - row_group.fetch(&ProjectionMask::all(), None, |fetch_ranges| { - let chunk_data = self.spills_buffer_pool.fetch_ranges( - self.operator.clone(), - self.location.clone(), - fetch_ranges, - settings, - )?; - let bytes_read = chunk_data.iter().map(|c| c.len()).sum::(); - read_bytes.set(read_bytes.get() + bytes_read); + let fetched = match self.receiver.recv_blocking() { + Ok(Ok(v)) => v, + Ok(Err(e)) => return Err(e), + Err(_) => return Ok(None), + }; - Ok(chunk_data) - })?; + self.read_bytes += fetched.read_bytes; - self.read_bytes += read_bytes.get(); + let mut rg_core = RowGroupCore::new(fetched.metadata, None); + rg_core.fetch(&ProjectionMask::all(), None, |_| Ok(fetched.chunks))?; - let num_rows = row_group.num_rows(); + let num_rows = rg_core.num_rows(); let mut reader = ParquetRecordBatchReader::try_new_with_row_groups( &self.field_levels, - &row_group, + &rg_core, num_rows, None, )?; let batch = reader.next().transpose()?.unwrap(); debug_assert!(reader.next().is_none()); - record_read_profile(self.target, &start, read_bytes.get()); + record_read_profile(self.target, &start, fetched.read_bytes); Ok(Some(DataBlock::from_record_batch( &self.data_schema, &batch, @@ -666,31 +559,6 @@ impl SpillsDataReader { } } -pub struct BufferWriteResp { - writer: Writer, - error: Option, -} - -pub struct BufferWriteOperator { - span: Span, - writer: Writer, - buffers: VecDeque, - response: Arc>, - target: SpillTarget, - start: Instant, -} - -pub struct BufferCloseResp { - _writer: Writer, - res: std::io::Result, -} - -pub struct BufferCloseOperator { - span: Span, - writer: Writer, - response: Arc>, -} - pub struct CreateWriterOperator { span: Span, op: Operator, @@ -707,6 +575,29 @@ pub struct FetchOperator { response: Arc>>>, } +pub struct BufferWriterTaskOperator { + span: Span, + writer: Writer, + buffer_rx: async_channel::Receiver, + available_buffers: async_channel::Sender, + response: Arc>>, +} + +pub struct FetchedRowGroup { + pub metadata: RowGroupMetaData, + pub chunks: Vec, + pub read_bytes: usize, +} + +pub struct ReaderTaskOperator { + span: Span, + op: Operator, + location: String, + row_groups: Vec, + settings: ReadSettings, + sender: async_channel::Sender>, +} + #[derive(Default)] pub struct BufferOperatorResp { condvar: Condvar, @@ -740,89 +631,131 @@ impl BufferOperatorResp { } pub enum BufferOperator { - Write(BufferWriteOperator), - Close(BufferCloseOperator), + WriterTask(BufferWriterTaskOperator), CreateWriter(CreateWriterOperator), Fetch(FetchOperator), + ReaderTask(ReaderTaskOperator), } impl BufferOperator { fn span(&self) -> &Span { match self { - BufferOperator::Write(op) => &op.span, - BufferOperator::Close(op) => &op.span, + BufferOperator::WriterTask(op) => &op.span, BufferOperator::CreateWriter(op) => &op.span, BufferOperator::Fetch(op) => &op.span, + BufferOperator::ReaderTask(op) => &op.span, } } } -pub struct Background { - available_buffers: async_channel::Sender, -} +pub struct Background; impl Background { - pub fn create(available_buffers: async_channel::Sender) -> Background { - Background { available_buffers } + pub fn create() -> Background { + Background } pub async fn recv(&mut self, op: BufferOperator) { match op { - BufferOperator::Write(mut op) => { - let start = op.start; - let bytes = op.buffers.clone(); - let bytes_len = bytes.iter().map(|b| b.len()).sum(); - let mut error = op - .writer - .write(op.buffers) - .await - .map_err(std::io::Error::from) - .err(); - for bytes in bytes { - let bytes = match bytes.try_into_mut() { - Ok(mut buf) if buf.capacity() == CHUNK_SIZE => { - buf.clear(); - buf - } - _ => { - log::warn!("Failed to recycle buffer, creating new one"); - BytesMut::with_capacity(CHUNK_SIZE) - } - }; - - if self.available_buffers.send(bytes).await.is_err() && error.is_none() { - error = Some(std::io::Error::new( - std::io::ErrorKind::BrokenPipe, - "buffer pool is closed", - )); - } - } + BufferOperator::WriterTask(op) => { + spawn( + async_backtrace::location!(String::from("writer_task")) + .frame(async move { writer_task_loop(op).await }), + ); + } + BufferOperator::CreateWriter(op) => { + spawn( + async_backtrace::location!(String::from("create_writer_task")).frame( + async move { + let writer = op.op.writer(&op.path).await; + op.response.done(writer); + }, + ), + ); + } + BufferOperator::Fetch(op) => { + spawn( + async_backtrace::location!(String::from("fetch_task")).frame(async move { + let res = + get_ranges(&op.fetch_ranges, &op.settings, &op.location, &op.op).await; + op.response.done(res.map(|(chunks, _)| chunks)); + }), + ); + } + BufferOperator::ReaderTask(op) => { + spawn( + async_backtrace::location!(String::from("reader_task")) + .frame(async move { reader_task_loop(op).await }), + ); + } + } + } +} - op.response.done(BufferWriteResp { - error, - writer: op.writer, - }); +async fn writer_task_loop(mut op: BufferWriterTaskOperator) { + let mut has_error = false; + while let Ok(buf) = op.buffer_rx.recv().await { + if !buf.is_empty() + && let Err(e) = op.writer.write(buf.clone()).await + { + has_error = true; + op.buffer_rx.close(); + op.response.done(Err(io::Error::from(e))); + } - record_write_profile(op.target, &start, bytes_len); - } - BufferOperator::Close(mut op) => { - let res = op.writer.close().await; + let mut release_buf = Some(buf); - op.response.done(BufferCloseResp { - _writer: op.writer, - res: res.map_err(std::io::Error::from), - }); + while let Some(buf) = release_buf.take() { + let buf = match buf.try_into_mut() { + Ok(mut b) if b.capacity() == CHUNK_SIZE => { + b.clear(); + b + } + _ => { + log::warn!("Failed to recycle buffer, creating new one"); + BytesMut::with_capacity(CHUNK_SIZE) + } + }; + + if op.available_buffers.send(buf).await.is_err() { + op.buffer_rx.close(); + op.response.done(Err(io::Error::new( + io::ErrorKind::BrokenPipe, + "buffer pool is closed", + ))); + return; } - BufferOperator::CreateWriter(op) => { - let writer = op.op.writer(&op.path).await; - op.response.done(writer); + if has_error { + release_buf = op.buffer_rx.try_recv().ok(); } - BufferOperator::Fetch(op) => { - let res = get_ranges(&op.fetch_ranges, &op.settings, &op.location, &op.op).await; + } - op.response.done(res.map(|(chunks, _)| chunks)); - } + if has_error { + return; + } + } + + op.response + .done(op.writer.close().await.map_err(io::Error::from)); +} + +async fn reader_task_loop(op: ReaderTaskOperator) { + for row_group_meta in op.row_groups { + let result = async { + let rg_core = RowGroupCore::new(row_group_meta.clone(), None); + let ranges = rg_core.fetch_ranges(&ProjectionMask::all()); + let (chunks, _) = get_ranges(&ranges, &op.settings, &op.location, &op.op).await?; + let read_bytes = chunks.iter().map(|c| c.len()).sum::(); + Ok(FetchedRowGroup { + metadata: row_group_meta, + chunks, + read_bytes, + }) + }; + + if op.sender.send(result.await).await.is_err() { + break; } } } @@ -841,36 +774,24 @@ mod tests { fn create_test_operator() -> std::io::Result { let builder = opendal::services::Fs::default().root("/tmp"); - Ok(Operator::new(builder)?.finish()) } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_buffer_pool_creation() { - let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let memory = 16 * 1024 * 1024; // 16MB - let workers = 2; - - let pool = SpillsBufferPool::create(runtime.clone(), memory, workers); - - // Should be able to allocate buffers - let buffer1 = pool.try_alloc_buffer(); - assert!(buffer1.is_some()); + let pool = SpillsBufferPool::create(16 * 1024 * 1024, 2).unwrap(); + let buffer1 = pool.alloc_buffer(); + assert!(buffer1.is_ok()); assert_eq!(buffer1.unwrap().capacity(), CHUNK_SIZE); - - let buffer2 = pool.try_alloc_buffer(); - assert!(buffer2.is_some()); - assert_eq!(buffer2.unwrap().capacity(), CHUNK_SIZE); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_buffer_writer_basic_write() { - let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = SpillsBufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); + let pool = SpillsBufferPool::create(8 * 1024 * 1024, 1).unwrap(); let operator = create_test_operator().unwrap(); let writer = operator.writer("test_file").await.unwrap(); - let mut buffer_writer = BufferWriter::new(writer, pool, SpillTarget::Remote); + let mut buffer_writer = pool.buffer_write(writer); let data = b"Hello, World!"; let written = buffer_writer.write(data).unwrap(); @@ -881,17 +802,15 @@ mod tests { assert!(metadata.content_length() > 0); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_buffer_writer_large_write() { - let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = SpillsBufferPool::create(runtime.clone(), 16 * 1024 * 1024, 2); + let pool = SpillsBufferPool::create(16 * 1024 * 1024, 2).unwrap(); let operator = create_test_operator().unwrap(); let writer = operator.writer("large_file").await.unwrap(); - let mut buffer_writer = BufferWriter::new(writer, pool, SpillTarget::Remote); + let mut buffer_writer = pool.buffer_write(writer); - // Write data larger than single buffer - let large_data = vec![0u8; 8 * 1024 * 1024]; // 8MB + let large_data = vec![0u8; 8 * 1024 * 1024]; let written = buffer_writer.write(&large_data).unwrap(); assert_eq!(written, large_data.len()); @@ -900,14 +819,13 @@ mod tests { assert_eq!(metadata.content_length(), large_data.len() as u64); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_buffer_writer_multiple_writes() { - let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = SpillsBufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); + let pool = SpillsBufferPool::create(8 * 1024 * 1024, 1).unwrap(); let operator = create_test_operator().unwrap(); let writer = operator.writer("multi_write_file").await.unwrap(); - let mut buffer_writer = BufferWriter::new(writer, pool, SpillTarget::Remote); + let mut buffer_writer = pool.buffer_write(writer); let mut total_written = 0; for i in 0..100 { @@ -921,22 +839,18 @@ mod tests { assert_eq!(metadata.content_length(), total_written as u64); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_buffer_pool_exhaustion_and_backpressure() { - let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - // Create pool with only 1 buffer to test backpressure - let pool = SpillsBufferPool::create(runtime.clone(), CHUNK_SIZE, 1); + let pool = SpillsBufferPool::create(CHUNK_SIZE, 1).unwrap(); let operator = create_test_operator().unwrap(); let writer = operator.writer("backpressure_test").await.unwrap(); - let mut buffer_writer = BufferWriter::new(writer, pool.clone(), SpillTarget::Remote); + let mut buffer_writer = pool.buffer_write(writer); - // Fill the first buffer let data = vec![0u8; CHUNK_SIZE]; let written = buffer_writer.write(&data).unwrap(); assert_eq!(written, data.len()); - // This should trigger backpressure and eventually succeed let written2 = buffer_writer.write(b"extra data").unwrap(); assert_eq!(written2, 10); @@ -944,45 +858,13 @@ mod tests { let _metadata = buffer_writer.close().unwrap(); } - #[tokio::test] - async fn test_buffer_reuse() { - let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = SpillsBufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); - - // Allocate all buffers - let mut buffers = Vec::new(); - while let Some(buffer) = pool.try_alloc_buffer() { - buffers.push(buffer); - } - - let initial_count = buffers.len(); - assert!(initial_count > 0); - - // Should be no more buffers available - assert!(pool.try_alloc_buffer().is_none()); - - // Release all buffers - for buffer in buffers { - pool.release_buffer(buffer); - } - - // Should be able to allocate the same number again - let mut new_buffers = Vec::new(); - while let Some(buffer) = pool.try_alloc_buffer() { - new_buffers.push(buffer); - } - - assert_eq!(new_buffers.len(), initial_count); - } - - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_empty_write() { - let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = SpillsBufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); + let pool = SpillsBufferPool::create(8 * 1024 * 1024, 1).unwrap(); let operator = create_test_operator().unwrap(); let writer = operator.writer("empty_test").await.unwrap(); - let mut buffer_writer = BufferWriter::new(writer, pool, SpillTarget::Remote); + let mut buffer_writer = pool.buffer_write(writer); let written = buffer_writer.write(b"").unwrap(); assert_eq!(written, 0); @@ -992,24 +874,20 @@ mod tests { assert_eq!(metadata.content_length(), 0); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_close_without_writes() { - let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = SpillsBufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); + let pool = SpillsBufferPool::create(8 * 1024 * 1024, 1).unwrap(); let operator = create_test_operator().unwrap(); let writer = operator.writer("no_write_test").await.unwrap(); - let buffer_writer = BufferWriter::new(writer, pool, SpillTarget::Remote); - - // Should be able to close without any writes + let buffer_writer = pool.buffer_write(writer); let metadata = buffer_writer.close().unwrap(); assert_eq!(metadata.content_length(), 0); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_concurrent_writers() { - let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = SpillsBufferPool::create(runtime.clone(), 16 * 1024 * 1024, 4); + let pool = SpillsBufferPool::create(16 * 1024 * 1024, 4).unwrap(); let operator = create_test_operator().unwrap(); let write_count = Arc::new(AtomicUsize::new(0)); @@ -1025,7 +903,7 @@ mod tests { .writer(&format!("concurrent_{}", i)) .await .unwrap(); - let mut buffer_writer = BufferWriter::new(writer, pool_clone, SpillTarget::Remote); + let mut buffer_writer = pool_clone.buffer_write(writer); for j in 0..10 { let data = format!("Writer {} - Line {}\n", i, j); @@ -1040,35 +918,10 @@ mod tests { handles.push(handle); } - // Wait for all writers to complete for handle in handles { let _metadata = handle.await.unwrap(); } assert_eq!(write_count.load(Ordering::Relaxed), 40); } - - #[tokio::test] - async fn test_writer_close_error_handling() { - let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = SpillsBufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); - let operator = create_test_operator().unwrap(); - let writer = operator.writer("error_test").await.unwrap(); - - let mut buffer_writer = pool.buffer_write(writer, SpillTarget::Remote); - buffer_writer.write_all(b"test data").unwrap(); - - // Close once - let _metadata = buffer_writer.close().unwrap(); - - // Create another writer and try to close it twice - let writer2 = operator.writer("error_test2").await.unwrap(); - let buffer_writer2 = pool.buffer_write(writer2, SpillTarget::Remote); - let _metadata = buffer_writer2.close().unwrap(); - - // Second close should return error (create new writer for this test) - let writer3 = operator.writer("error_test3").await.unwrap(); - let buffer_writer3 = pool.buffer_write(writer3, SpillTarget::Remote); - let _metadata = buffer_writer3.close().unwrap(); - } } diff --git a/src/query/service/src/spillers/row_group_encoder.rs b/src/query/service/src/spillers/row_group_encoder.rs index 6feeac15750c1..83f72d1b42bb2 100644 --- a/src/query/service/src/spillers/row_group_encoder.rs +++ b/src/query/service/src/spillers/row_group_encoder.rs @@ -57,7 +57,6 @@ use super::Location; use super::SpillerInner; use super::SpillsBufferPool; use super::async_buffer::BufferWriter; -use super::async_buffer::SpillTarget; pub struct Properties { schema: Arc, @@ -467,8 +466,7 @@ impl SpillerInner { }; let remote_location = self.create_unique_location(); - let remote = - pool.buffer_writer(op.clone(), remote_location.clone(), SpillTarget::Remote)?; + let remote = pool.buffer_writer(op.clone(), remote_location.clone())?; Ok(AnyFileWriter::Remote { path: remote_location.clone(), diff --git a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt index 08942d2bfafd8..ce3100de4c439 100644 --- a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt +++ b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt @@ -236,6 +236,8 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo | 'query' | 'warehouse_id' | 'test_warehouse' | '' | | 'spill' | 'result_set_spilling_disk_quota_ratio' | '0' | '' | | 'spill' | 'sort_spilling_disk_quota_ratio' | '60' | '' | +| 'spill' | 'spill_buffer_pool_memory' | '209715200' | '' | +| 'spill' | 'spill_buffer_pool_workers' | '2' | '' | | 'spill' | 'spill_local_disk_max_bytes' | '18446744073709551615' | '' | | 'spill' | 'spill_local_disk_path' | '' | '' | | 'spill' | 'spill_local_disk_reserved_space_percentage' | '10.0' | '' | diff --git a/src/query/storages/parquet/src/parquet_reader/row_group.rs b/src/query/storages/parquet/src/parquet_reader/row_group.rs index 46da387c0e2ac..925b2b245bf14 100644 --- a/src/query/storages/parquet/src/parquet_reader/row_group.rs +++ b/src/query/storages/parquet/src/parquet_reader/row_group.rs @@ -226,7 +226,7 @@ impl RowGroupCore { &mut self, projection: &ProjectionMask, selection: Option<&RowSelection>, - get_ranges: impl Fn(Vec>) -> Result>, + get_ranges: impl FnOnce(Vec>) -> Result>, ) -> Result<()> { if let Some((selection, page_locations)) = selection.zip(self.page_locations.as_ref()) { // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the @@ -318,6 +318,10 @@ impl RowGroupCore { } } + pub fn fetch_ranges(&self, projection: &ProjectionMask) -> Vec> { + self.get_fetch_ranges_without_index(projection) + } + fn get_fetch_ranges_without_index(&self, projection: &ProjectionMask) -> Vec> { self.column_chunks .iter()