From 0ffaf5413d6a14af7b23d9eee7b8bcad8f0b2857 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Mon, 27 Apr 2026 22:04:37 +0800 Subject: [PATCH 01/10] refactor(query): simplify SpillsBufferPool with owned runtime and configurable settings - SpillsBufferPool now owns a dedicated Runtime instead of borrowing GlobalIORuntime - Remove SpillTarget from public buffer pool APIs; callers no longer derive it - Add spill_buffer_pool_memory and spill_buffer_pool_workers to SpillConfig - Track buffer pool blocking time via atomic counters for observability - Simplify BufferWriter by removing SpillTarget and redundant comments - Clean up tests to use multi_thread tokio flavor --- src/query/config/src/config.rs | 12 + src/query/config/src/inner.rs | 10 + src/query/config/src/mask.rs | 8 +- src/query/service/src/global_services.rs | 2 +- .../new_aggregate/new_aggregate_spiller.rs | 3 +- .../new_hash_join/grace/grace_join.rs | 3 +- .../service/src/spillers/async_buffer.rs | 622 ++++++------------ .../service/src/spillers/row_group_encoder.rs | 4 +- 8 files changed, 232 insertions(+), 432 deletions(-) diff --git a/src/query/config/src/config.rs b/src/query/config/src/config.rs index f1c235e982e5e..464894447ad28 100644 --- a/src/query/config/src/config.rs +++ b/src/query/config/src/config.rs @@ -3481,6 +3481,14 @@ pub struct SpillConfig { /// TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes. #[clap(long, value_name = "PERCENT", default_value = "0")] pub result_set_spilling_disk_quota_ratio: u64, + + /// Total memory for the spill buffer pool in bytes. + #[clap(long, value_name = "VALUE", default_value = "209715200")] + pub spill_buffer_pool_memory: u64, + + /// Number of worker tasks in the spill buffer pool. + #[clap(long, value_name = "VALUE", default_value = "2")] + pub spill_buffer_pool_workers: usize, } impl Default for SpillConfig { @@ -3594,6 +3602,8 @@ mod config_converters { window_partition_spilling_disk_quota_ratio: spill .window_partition_spilling_disk_quota_ratio, result_set_spilling_disk_quota_ratio: spill.result_set_spilling_disk_quota_ratio, + buffer_pool_memory: spill.spill_buffer_pool_memory, + buffer_pool_workers: spill.spill_buffer_pool_workers, }) } @@ -3616,6 +3626,8 @@ mod config_converters { window_partition_spilling_disk_quota_ratio: value .window_partition_spilling_disk_quota_ratio, result_set_spilling_disk_quota_ratio: value.result_set_spilling_disk_quota_ratio, + spill_buffer_pool_memory: value.buffer_pool_memory, + spill_buffer_pool_workers: value.buffer_pool_workers, } } } diff --git a/src/query/config/src/inner.rs b/src/query/config/src/inner.rs index 57b40ce7c8617..fba85be41b145 100644 --- a/src/query/config/src/inner.rs +++ b/src/query/config/src/inner.rs @@ -433,6 +433,12 @@ pub struct SpillConfig { /// Maximum percentage of the global local spill quota that HTTP /// result-set spilling may use for one query. pub result_set_spilling_disk_quota_ratio: u64, + + /// Total memory for the spill buffer pool in bytes. + pub buffer_pool_memory: u64, + + /// Number of worker tasks in the spill buffer pool. + pub buffer_pool_workers: usize, } impl SpillConfig { @@ -503,6 +509,8 @@ impl SpillConfig { window_partition_spilling_disk_quota_ratio: 60, // TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes. result_set_spilling_disk_quota_ratio: 0, + buffer_pool_memory: 200 * 1024 * 1024, + buffer_pool_workers: 2, } } } @@ -519,6 +527,8 @@ impl Default for SpillConfig { window_partition_spilling_disk_quota_ratio: 60, // TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes. result_set_spilling_disk_quota_ratio: 0, + buffer_pool_memory: 200 * 1024 * 1024, + buffer_pool_workers: 2, } } } diff --git a/src/query/config/src/mask.rs b/src/query/config/src/mask.rs index 5bfe26f62d13f..8a9f0466cb78d 100644 --- a/src/query/config/src/mask.rs +++ b/src/query/config/src/mask.rs @@ -218,6 +218,8 @@ impl SpillConfig { sort_spilling_disk_quota_ratio, window_partition_spilling_disk_quota_ratio, result_set_spilling_disk_quota_ratio, + spill_buffer_pool_memory, + spill_buffer_pool_workers, } = *self; Self { @@ -227,8 +229,9 @@ impl SpillConfig { storage: storage.as_ref().map(|storage| storage.mask_display()), sort_spilling_disk_quota_ratio, window_partition_spilling_disk_quota_ratio, - // TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes. result_set_spilling_disk_quota_ratio, + spill_buffer_pool_memory, + spill_buffer_pool_workers, } } } @@ -384,8 +387,9 @@ mod tests { spill_local_disk_max_bytes: 10, sort_spilling_disk_quota_ratio: 60, window_partition_spilling_disk_quota_ratio: 30, - // TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes. result_set_spilling_disk_quota_ratio: 0, + spill_buffer_pool_memory: 209715200, + spill_buffer_pool_workers: 2, storage: Some(StorageConfig { typ: "s3".to_string(), s3: S3StorageConfig { diff --git a/src/query/service/src/global_services.rs b/src/query/service/src/global_services.rs index 04cf1683bea57..94fab3989fc37 100644 --- a/src/query/service/src/global_services.rs +++ b/src/query/service/src/global_services.rs @@ -115,7 +115,7 @@ impl GlobalServices { // 4. cluster discovery init. ClusterDiscovery::init(config, version).await?; - SpillsBufferPool::init(); + SpillsBufferPool::init(&config.spill)?; // TODO(xuanwo): // // This part is a bit complex because catalog are used widely in different diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs index 9d1e5f08d7c7c..d3b097e2f10c7 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs @@ -54,11 +54,10 @@ struct PayloadWriter { impl PayloadWriter { fn try_create(prefix: &str) -> Result { let data_operator = DataOperator::instance(); - let target = SpillTarget::from_storage_params(data_operator.spill_params()); let operator = data_operator.spill_operator(); let buffer_pool = SpillsBufferPool::instance(); let file_path = format!("{}/{}", prefix, GlobalUniq::unique()); - let spills_data_writer = buffer_pool.writer(operator, file_path.clone(), target)?; + let spills_data_writer = buffer_pool.writer(operator, file_path.clone())?; Ok(PayloadWriter { path: file_path, diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs index 01b3288fb2dbb..32a12eb0dc5cc 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs @@ -417,12 +417,11 @@ pub struct GraceJoinPartition { impl GraceJoinPartition { pub fn create(prefix: &str) -> Result { let data_operator = DataOperator::instance(); - let target = SpillTarget::from_storage_params(data_operator.spill_params()); let operator = data_operator.spill_operator(); let buffer_pool = SpillsBufferPool::instance(); let file_path = format!("{}/{}", prefix, GlobalUniq::unique()); - let spills_data_writer = buffer_pool.writer(operator, file_path.clone(), target)?; + let spills_data_writer = buffer_pool.writer(operator, file_path.clone())?; Ok(GraceJoinPartition { path: file_path, diff --git a/src/query/service/src/spillers/async_buffer.rs b/src/query/service/src/spillers/async_buffer.rs index 24f6b52bf30e2..755949313e936 100644 --- a/src/query/service/src/spillers/async_buffer.rs +++ b/src/query/service/src/spillers/async_buffer.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cell::Cell; -use std::collections::VecDeque; use std::io; use std::io::Write; use std::ops::Range; @@ -21,14 +19,18 @@ use std::sync::Arc; use std::sync::Condvar; use std::sync::Mutex; use std::sync::PoisonError; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::time::Duration; use std::time::Instant; use arrow_schema::Schema; use bytes::Bytes; use bytes::BytesMut; use databend_common_base::base::GlobalInstance; -use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::Runtime; +use databend_common_base::runtime::spawn; +use databend_common_config::SpillConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::DataBlock; @@ -56,7 +58,6 @@ use parquet::file::properties::EnabledStatistics; use parquet::file::properties::WriterProperties; use super::record_read_profile; -use super::record_write_profile; const CHUNK_SIZE: usize = 4 * 1024 * 1024; #[derive(Clone, Copy)] @@ -70,12 +71,6 @@ impl SpillTarget { matches!(self, SpillTarget::Local) } - /// Derive spill target (local vs remote) from storage params. - /// - /// Today we only treat `StorageParams::Fs` as local, everything - /// else (S3, Azure, memory, etc.) is considered remote. Centralizing - /// this decision here keeps higher-level operators simpler and avoids - /// duplicating the matching logic at each call site. pub fn from_storage_params(params: Option<&StorageParams>) -> Self { match params { Some(StorageParams::Fs(_)) => SpillTarget::Local, @@ -84,78 +79,34 @@ impl SpillTarget { } } -/// Buffer Pool Workflow for Spill Operations: -/// -/// Context: During query execution, when memory pressure is high, intermediate -/// results (hash tables, sorted data, aggregation states) need to be spilled -/// to disk/object storage to free up memory. This buffer pool provides a -/// synchronous Write interface with async I/O underneath, specifically designed -/// for spill scenarios where backpressure control is more important than latency. -/// -/// 1. Initialization: -/// - Create a fixed number of BytesMut buffers (4MB each) based on memory limit -/// - Spawn worker threads that listen for write operations via async channels -/// - Pre-allocate buffers are placed in the available_write_buffers channel -/// -/// 2. Spill Write Operation: -/// - During spill, BufferWriter.write() fills the current buffer with serialized data -/// - When buffer is full, it's frozen to Bytes and added to pending_buffers queue -/// - Writer tries to acquire a new buffer from the pool (non-blocking first) -/// - If no buffer available, triggers async write operation and BLOCKS (crucial for spill) -/// -/// 3. Async Spill Write Process: -/// - Background worker receives BufferWriteOperator containing: -/// - OpenDAL Writer instance (to disk/S3/etc.) -/// - Queue of Bytes buffers containing spilled data -/// - Response channel for completion notification -/// - Worker writes all buffers sequentially to storage via writer.write(buffers) -/// - After writing, attempts to recycle buffers back to pool: -/// - Converts Bytes back to BytesMut (if unique reference) -/// - Clears buffer content and returns to available pool -/// - Notifies completion via condvar to unblock waiting spill thread -/// -/// 4. Spill Backpressure Control (Key Feature): -/// - When pool is exhausted, spill write() BLOCKS until async write completes -/// - This throttles spill speed to match storage I/O capacity -/// - Prevents memory explosion during high-volume spill operations -/// - Query execution naturally pauses when spill storage is slower than data generation -/// -/// 5. Buffer Lifecycle in Spill: -/// - BytesMut (mutable) -> fill with spill data -> freeze to Bytes (immutable) -/// - Bytes -> async write to spill storage -> try_into_mut() -> clear -> back to pool -/// -/// 6. Spill Resource Cleanup: -/// - flush() ensures all pending spill data is written before spill operation completes -/// - Drop trait recycles any remaining buffers back to pool -/// - Critical for spill scenarios where partial writes could corrupt spilled data -/// -/// Spill-Specific Benefits: -/// - Memory-bounded operation prevents OOM during large spills -/// - Synchronous blocking behavior allows query threads to naturally pause -/// - Buffer reuse minimizes GC pressure during intensive spill operations -/// - Automatic flow control matches spill rate to storage bandwidth -/// - Works with any OpenDAL-supported storage (local disk, S3, etc.) pub struct SpillsBufferPool { + _runtime: Arc, working_queue: async_channel::Sender, available_write_buffers: async_channel::Receiver, available_write_buffers_tx: async_channel::Sender, + blocking_nanos: Arc, + blocking_count: Arc, } impl SpillsBufferPool { - pub fn init() { - // TODO: config + pub fn init(config: &SpillConfig) -> Result<()> { GlobalInstance::set(SpillsBufferPool::create( - GlobalIORuntime::instance(), - 200 * 1024 * 1024, - 2, - )) + config.buffer_pool_memory as usize, + config.buffer_pool_workers, + )?); + Ok(()) } pub fn instance() -> Arc { GlobalInstance::get() } - pub fn create(executor: Arc, memory: usize, workers: usize) -> Arc { + pub fn create(memory: usize, workers: usize) -> Result> { + let runtime = Arc::new(Runtime::with_worker_threads( + workers, + Some("spill-worker".to_owned()), + )?); + let (working_tx, working_rx) = async_channel::unbounded(); let (buffers_tx, buffers_rx) = async_channel::unbounded(); @@ -170,7 +121,7 @@ impl SpillsBufferPool { for _ in 0..workers { let working_queue: async_channel::Receiver = working_rx.clone(); let available_write_buffers = buffers_tx.clone(); - executor.spawn( + runtime.spawn( async_backtrace::location!(String::from("async_buffer")).frame(async move { let mut background = Background::create(available_write_buffers); while let Ok(op) = working_queue.recv().await { @@ -181,25 +132,55 @@ impl SpillsBufferPool { ); } - Arc::new(SpillsBufferPool { + let blocking_nanos = Arc::new(AtomicU64::new(0)); + let blocking_count = Arc::new(AtomicU64::new(0)); + + { + let blocking_nanos = Arc::clone(&blocking_nanos); + let blocking_count = Arc::clone(&blocking_count); + runtime.spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(60)).await; + let count = blocking_count.swap(0, Ordering::Relaxed); + let nanos = blocking_nanos.swap(0, Ordering::Relaxed); + if count > 0 { + log::info!( + "SpillsBufferPool alloc blocked {} times, total {:.2}ms in last 60s", + count, + nanos as f64 / 1_000_000.0, + ); + } + } + }); + } + + Ok(Arc::new(SpillsBufferPool { + _runtime: runtime, working_queue: working_tx, available_write_buffers: buffers_rx, available_write_buffers_tx: buffers_tx, - }) - } - - pub(crate) fn try_alloc_buffer(&self) -> Option { - self.available_write_buffers.try_recv().ok() + blocking_nanos, + blocking_count, + })) } pub(crate) fn alloc_buffer(&self) -> std::io::Result { - match self.available_write_buffers.recv_blocking() { + if let Ok(buf) = self.available_write_buffers.try_recv() { + return Ok(buf); + } + + let start = Instant::now(); + let result = match self.available_write_buffers.recv_blocking() { Ok(buf) => Ok(buf), Err(_) => Err(std::io::Error::new( std::io::ErrorKind::BrokenPipe, "buffer pool is closed", )), - } + }; + self.blocking_nanos + .fetch_add(start.elapsed().as_nanos() as u64, Ordering::Relaxed); + self.blocking_count.fetch_add(1, Ordering::Relaxed); + result } pub(crate) fn operator(&self, op: BufferOperator) { @@ -208,21 +189,29 @@ impl SpillsBufferPool { .expect("Buffer pool working queue need unbounded."); } - pub fn buffer_write( - self: &Arc, - writer: Writer, - target: SpillTarget, - ) -> BufferWriter { - BufferWriter::new(writer, self.clone(), target) + pub fn buffer_write(self: &Arc, writer: Writer) -> BufferWriter { + let (buffer_tx, buffer_rx) = async_channel::unbounded::(); + + let response = BufferOperatorResp::pending(); + + self.operator(BufferOperator::WriterTask(BufferWriterTaskOperator { + writer, + buffer_rx, + response: response.clone(), + available_buffers: self.available_write_buffers_tx.clone(), + span: Span::enter_with_local_parent("BufferWriterTask"), + })); + + BufferWriter { + buffer_tx, + current_bytes: None, + buffer_pool: self.clone(), + response, + } } - pub fn writer( - self: &Arc, - op: Operator, - path: String, - target: SpillTarget, - ) -> Result { - let writer = self.buffer_writer(op, path, target)?; + pub fn writer(self: &Arc, op: Operator, path: String) -> Result { + let writer = self.buffer_writer(op, path)?; Ok(SpillsDataWriter::Uninitialize(Some(writer))) } @@ -230,7 +219,6 @@ impl SpillsBufferPool { self: &Arc, op: Operator, path: String, - target: SpillTarget, ) -> Result { let pending_response = BufferOperatorResp::pending(); @@ -245,7 +233,7 @@ impl SpillsBufferPool { .try_send(operator) .expect("Buffer pool working queue need unbounded."); - Ok(self.buffer_write(pending_response.wait_and_take()?, target)) + Ok(self.buffer_write(pending_response.wait_and_take()?)) } pub fn reader( @@ -288,112 +276,33 @@ impl SpillsBufferPool { } pub struct BufferWriter { - writer: Option, - current_bytes: Option, - buffer_pool: Arc, - pending_buffers: VecDeque, - pending_response: Option>>, - target: SpillTarget, + buffer_tx: async_channel::Sender, + response: Arc>>, } impl BufferWriter { - pub fn new( - writer: Writer, - buffer_pool: Arc, - target: SpillTarget, - ) -> BufferWriter { - BufferWriter { - buffer_pool, - writer: Some(writer), - current_bytes: None, - pending_buffers: VecDeque::new(), - pending_response: None, - target, - } - } - - fn write_buffer(&mut self, wait: bool) -> std::io::Result<()> { - if let Some(pending_response) = self.pending_response.take() { - if wait { - let mut response = pending_response.wait_and_take(); - self.writer = Some(response.writer); - - if let Some(last_error) = response.error.take() { - return Err(last_error); - } - } else { - let locked = pending_response.mutex.lock(); - let mut locked = locked.unwrap_or_else(PoisonError::into_inner); - - if let Some(mut response) = locked.take() { - self.writer = Some(response.writer); - if let Some(last_error) = response.error.take() { - return Err(last_error); - } - } else { - drop(locked); - self.pending_response = Some(pending_response); - } + pub fn close(mut self) -> io::Result { + if let Some(b) = self.current_bytes.take_if(|b| !b.is_empty()) { + if self.buffer_tx.try_send(b.freeze()).is_err() { + return Err(io::ErrorKind::BrokenPipe.into()); } } - if let Some(writer) = self.writer.take() { - assert!(self.pending_response.is_none()); - - let pending_response = BufferOperatorResp::pending(); - - self.pending_response = Some(pending_response.clone()); - - let operator = BufferOperator::Write(BufferWriteOperator { - span: Span::enter_with_local_parent("BufferWriteOperator"), - writer, - response: pending_response, - buffers: std::mem::take(&mut self.pending_buffers), - target: self.target, - start: Instant::now(), - }); - - self.buffer_pool.operator(operator); - } - - Ok(()) + self.buffer_tx.close(); + self.response.wait_and_take() } - pub fn close(mut self) -> std::io::Result { - self.flush()?; - - if let Some(writer) = self.writer.take() { - let pending_response = BufferOperatorResp::pending(); - - let close_operator = BufferOperator::Close(BufferCloseOperator { - span: Span::enter_with_local_parent("BufferCloseOperator"), - writer, - response: pending_response.clone(), - }); - - self.buffer_pool.operator(close_operator); - - return pending_response.wait_and_take().res; + pub(super) fn finish(&mut self) -> std::io::Result { + if let Some(b) = self.current_bytes.take_if(|b| !b.is_empty()) { + if self.buffer_tx.try_send(b.freeze()).is_err() { + return Err(io::ErrorKind::BrokenPipe.into()); + } } - Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Writer already closed", - )) - } - - pub(super) fn finish(&mut self) -> std::io::Result { - std::mem::replace(self, Self { - writer: None, - current_bytes: None, - buffer_pool: self.buffer_pool.clone(), - pending_buffers: Default::default(), - pending_response: None, - target: self.target, - }) - .close() + self.buffer_tx.close(); + self.response.wait_and_take() } } @@ -403,6 +312,15 @@ impl io::Write for BufferWriter { return Ok(0); } + { + let locked = self.response.mutex.lock(); + let mut locked = locked.unwrap_or_else(PoisonError::into_inner); + + if let Some(Err(cause)) = locked.take_if(|x| x.is_err()) { + return Err(cause); + } + } + let mut current_bytes = match self.current_bytes.take() { Some(bytes) => bytes, None => self.buffer_pool.alloc_buffer()?, @@ -414,56 +332,43 @@ impl io::Write for BufferWriter { let mut available_space = current_bytes.capacity() - current_bytes.len(); if available_space == 0 { - let pending_bytes = current_bytes.freeze(); - self.pending_buffers.push_back(pending_bytes); - - current_bytes = match self.buffer_pool.try_alloc_buffer() { - Some(buffer) => buffer, - None => { - self.write_buffer(true)?; - self.buffer_pool.alloc_buffer()? - } - }; + if self.buffer_tx.try_send(current_bytes.freeze()).is_err() { + return Err(io::ErrorKind::BrokenPipe.into()); + } + current_bytes = self.buffer_pool.alloc_buffer()?; available_space = current_bytes.capacity() - current_bytes.len(); } let to_write = std::cmp::min(remaining.len(), available_space); current_bytes.extend_from_slice(&remaining[..to_write]); - written += to_write; remaining = &remaining[to_write..]; } if current_bytes.capacity() - current_bytes.len() == 0 { - self.pending_buffers.push_back(current_bytes.freeze()); + if self.buffer_tx.try_send(current_bytes.freeze()).is_err() { + return Err(io::ErrorKind::BrokenPipe.into()); + } } else { self.current_bytes = Some(current_bytes); } - if !self.pending_buffers.is_empty() { - self.write_buffer(false)?; - } - Ok(written) } fn flush(&mut self) -> io::Result<()> { - if let Some(current_bytes) = self.current_bytes.take_if(|bytes| !bytes.is_empty()) { - self.pending_buffers.push_back(current_bytes.freeze()); - } - - if !self.pending_buffers.is_empty() { - self.write_buffer(true)?; + if let Some(b) = self.current_bytes.take_if(|b| !b.is_empty()) { + if self.buffer_tx.try_send(b.freeze()).is_err() { + return Err(io::ErrorKind::BrokenPipe.into()); + } } - if let Some(pending_response) = self.pending_response.take() { - let BufferWriteResp { writer, mut error } = pending_response.wait_and_take(); - self.writer = Some(writer); + let locked = self.response.mutex.lock(); + let mut locked = locked.unwrap_or_else(PoisonError::into_inner); - if let Some(error) = error.take() { - return Err(error); - } + if let Some(Err(cause)) = locked.take_if(|x| x.is_err()) { + return Err(cause); } Ok(()) @@ -472,25 +377,9 @@ impl io::Write for BufferWriter { impl Drop for BufferWriter { fn drop(&mut self) { - let pending_buffers = std::mem::take(&mut self.pending_buffers); - - for pending_buffer in pending_buffers { - match pending_buffer.try_into_mut() { - Ok(mut buf) if buf.capacity() == CHUNK_SIZE => { - buf.clear(); - self.buffer_pool.release_buffer(buf); - } - _ => { - log::warn!("Failed to recycle buffer, creating new one"); - let new_buf = BytesMut::with_capacity(CHUNK_SIZE); - self.buffer_pool.release_buffer(new_buf); - } - } - } - - if let Some(mut current_bytes) = self.current_bytes.take() { - current_bytes.clear(); - self.buffer_pool.release_buffer(current_bytes); + if let Some(mut b) = self.current_bytes.take() { + b.clear(); + self.buffer_pool.release_buffer(b); } } } @@ -578,7 +467,7 @@ impl SpillsDataWriter { pub struct SpillsDataReader { location: String, operator: Operator, - row_groups: VecDeque, + row_groups: std::collections::VecDeque, spills_buffer_pool: Arc, data_schema: DataSchemaRef, field_levels: FieldLevels, @@ -613,7 +502,7 @@ impl SpillsDataReader { spills_buffer_pool, data_schema, field_levels, - row_groups: VecDeque::from(row_groups), + row_groups: std::collections::VecDeque::from(row_groups), read_bytes: 0, target, }) @@ -632,7 +521,7 @@ impl SpillsDataReader { let mut row_group = RowGroupCore::new(row_group, None); - let read_bytes = Cell::new(0usize); + let read_bytes = std::cell::Cell::new(0usize); row_group.fetch(&ProjectionMask::all(), None, |fetch_ranges| { let chunk_data = self.spills_buffer_pool.fetch_ranges( @@ -666,31 +555,6 @@ impl SpillsDataReader { } } -pub struct BufferWriteResp { - writer: Writer, - error: Option, -} - -pub struct BufferWriteOperator { - span: Span, - writer: Writer, - buffers: VecDeque, - response: Arc>, - target: SpillTarget, - start: Instant, -} - -pub struct BufferCloseResp { - _writer: Writer, - res: std::io::Result, -} - -pub struct BufferCloseOperator { - span: Span, - writer: Writer, - response: Arc>, -} - pub struct CreateWriterOperator { span: Span, op: Operator, @@ -707,6 +571,14 @@ pub struct FetchOperator { response: Arc>>>, } +pub struct BufferWriterTaskOperator { + span: Span, + writer: Writer, + buffer_rx: async_channel::Receiver, + available_buffers: async_channel::Sender, + response: Arc>>, +} + #[derive(Default)] pub struct BufferOperatorResp { condvar: Condvar, @@ -740,8 +612,7 @@ impl BufferOperatorResp { } pub enum BufferOperator { - Write(BufferWriteOperator), - Close(BufferCloseOperator), + WriterTask(BufferWriterTaskOperator), CreateWriter(CreateWriterOperator), Fetch(FetchOperator), } @@ -749,84 +620,71 @@ pub enum BufferOperator { impl BufferOperator { fn span(&self) -> &Span { match self { - BufferOperator::Write(op) => &op.span, - BufferOperator::Close(op) => &op.span, + BufferOperator::WriterTask(op) => &op.span, BufferOperator::CreateWriter(op) => &op.span, BufferOperator::Fetch(op) => &op.span, } } } -pub struct Background { - available_buffers: async_channel::Sender, -} +pub struct Background; impl Background { - pub fn create(available_buffers: async_channel::Sender) -> Background { - Background { available_buffers } + pub fn create(_available_buffers: async_channel::Sender) -> Background { + Background } pub async fn recv(&mut self, op: BufferOperator) { match op { - BufferOperator::Write(mut op) => { - let start = op.start; - let bytes = op.buffers.clone(); - let bytes_len = bytes.iter().map(|b| b.len()).sum(); - let mut error = op - .writer - .write(op.buffers) - .await - .map_err(std::io::Error::from) - .err(); - for bytes in bytes { - let bytes = match bytes.try_into_mut() { - Ok(mut buf) if buf.capacity() == CHUNK_SIZE => { - buf.clear(); - buf - } - _ => { - log::warn!("Failed to recycle buffer, creating new one"); - BytesMut::with_capacity(CHUNK_SIZE) - } - }; - - if self.available_buffers.send(bytes).await.is_err() && error.is_none() { - error = Some(std::io::Error::new( - std::io::ErrorKind::BrokenPipe, - "buffer pool is closed", - )); - } - } - - op.response.done(BufferWriteResp { - error, - writer: op.writer, - }); - - record_write_profile(op.target, &start, bytes_len); - } - BufferOperator::Close(mut op) => { - let res = op.writer.close().await; - - op.response.done(BufferCloseResp { - _writer: op.writer, - res: res.map_err(std::io::Error::from), - }); + BufferOperator::WriterTask(op) => { + spawn( + async_backtrace::location!(String::from("writer_task")) + .frame(async move { writer_task_loop(op).await }), + ); } BufferOperator::CreateWriter(op) => { let writer = op.op.writer(&op.path).await; - op.response.done(writer); } BufferOperator::Fetch(op) => { let res = get_ranges(&op.fetch_ranges, &op.settings, &op.location, &op.op).await; - op.response.done(res.map(|(chunks, _)| chunks)); } } } } +async fn writer_task_loop(mut op: BufferWriterTaskOperator) { + while let Ok(buf) = op.buffer_rx.recv().await { + if let Err(e) = op.writer.write(buf.clone()).await { + op.buffer_rx.close(); + op.response.done(Err(io::Error::from(e))); + } + + let buf = match buf.clone().try_into_mut() { + Ok(mut b) if b.capacity() == CHUNK_SIZE => { + b.clear(); + b + } + _ => { + log::warn!("Failed to recycle buffer, creating new one"); + BytesMut::with_capacity(CHUNK_SIZE) + } + }; + + if op.available_buffers.send(buf).await.is_err() { + op.buffer_rx.close(); + op.response.done(Err(io::Error::new( + io::ErrorKind::BrokenPipe, + "buffer pool is closed", + ))); + } + } + + op.response + .done(op.writer.close().await.map_err(io::Error::from)); +} + #[cfg(test)] mod tests { use std::io::Write; @@ -841,36 +699,24 @@ mod tests { fn create_test_operator() -> std::io::Result { let builder = opendal::services::Fs::default().root("/tmp"); - Ok(Operator::new(builder)?.finish()) } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_buffer_pool_creation() { - let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let memory = 16 * 1024 * 1024; // 16MB - let workers = 2; - - let pool = SpillsBufferPool::create(runtime.clone(), memory, workers); - - // Should be able to allocate buffers - let buffer1 = pool.try_alloc_buffer(); - assert!(buffer1.is_some()); + let pool = SpillsBufferPool::create(16 * 1024 * 1024, 2).unwrap(); + let buffer1 = pool.alloc_buffer(); + assert!(buffer1.is_ok()); assert_eq!(buffer1.unwrap().capacity(), CHUNK_SIZE); - - let buffer2 = pool.try_alloc_buffer(); - assert!(buffer2.is_some()); - assert_eq!(buffer2.unwrap().capacity(), CHUNK_SIZE); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_buffer_writer_basic_write() { - let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = SpillsBufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); + let pool = SpillsBufferPool::create(8 * 1024 * 1024, 1).unwrap(); let operator = create_test_operator().unwrap(); let writer = operator.writer("test_file").await.unwrap(); - let mut buffer_writer = BufferWriter::new(writer, pool, SpillTarget::Remote); + let mut buffer_writer = pool.buffer_write(writer); let data = b"Hello, World!"; let written = buffer_writer.write(data).unwrap(); @@ -881,17 +727,15 @@ mod tests { assert!(metadata.content_length() > 0); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_buffer_writer_large_write() { - let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = SpillsBufferPool::create(runtime.clone(), 16 * 1024 * 1024, 2); + let pool = SpillsBufferPool::create(16 * 1024 * 1024, 2).unwrap(); let operator = create_test_operator().unwrap(); let writer = operator.writer("large_file").await.unwrap(); - let mut buffer_writer = BufferWriter::new(writer, pool, SpillTarget::Remote); + let mut buffer_writer = pool.buffer_write(writer); - // Write data larger than single buffer - let large_data = vec![0u8; 8 * 1024 * 1024]; // 8MB + let large_data = vec![0u8; 8 * 1024 * 1024]; let written = buffer_writer.write(&large_data).unwrap(); assert_eq!(written, large_data.len()); @@ -900,14 +744,13 @@ mod tests { assert_eq!(metadata.content_length(), large_data.len() as u64); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_buffer_writer_multiple_writes() { - let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = SpillsBufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); + let pool = SpillsBufferPool::create(8 * 1024 * 1024, 1).unwrap(); let operator = create_test_operator().unwrap(); let writer = operator.writer("multi_write_file").await.unwrap(); - let mut buffer_writer = BufferWriter::new(writer, pool, SpillTarget::Remote); + let mut buffer_writer = pool.buffer_write(writer); let mut total_written = 0; for i in 0..100 { @@ -921,22 +764,18 @@ mod tests { assert_eq!(metadata.content_length(), total_written as u64); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_buffer_pool_exhaustion_and_backpressure() { - let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - // Create pool with only 1 buffer to test backpressure - let pool = SpillsBufferPool::create(runtime.clone(), CHUNK_SIZE, 1); + let pool = SpillsBufferPool::create(CHUNK_SIZE, 1).unwrap(); let operator = create_test_operator().unwrap(); let writer = operator.writer("backpressure_test").await.unwrap(); - let mut buffer_writer = BufferWriter::new(writer, pool.clone(), SpillTarget::Remote); + let mut buffer_writer = pool.buffer_write(writer); - // Fill the first buffer let data = vec![0u8; CHUNK_SIZE]; let written = buffer_writer.write(&data).unwrap(); assert_eq!(written, data.len()); - // This should trigger backpressure and eventually succeed let written2 = buffer_writer.write(b"extra data").unwrap(); assert_eq!(written2, 10); @@ -944,45 +783,13 @@ mod tests { let _metadata = buffer_writer.close().unwrap(); } - #[tokio::test] - async fn test_buffer_reuse() { - let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = SpillsBufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); - - // Allocate all buffers - let mut buffers = Vec::new(); - while let Some(buffer) = pool.try_alloc_buffer() { - buffers.push(buffer); - } - - let initial_count = buffers.len(); - assert!(initial_count > 0); - - // Should be no more buffers available - assert!(pool.try_alloc_buffer().is_none()); - - // Release all buffers - for buffer in buffers { - pool.release_buffer(buffer); - } - - // Should be able to allocate the same number again - let mut new_buffers = Vec::new(); - while let Some(buffer) = pool.try_alloc_buffer() { - new_buffers.push(buffer); - } - - assert_eq!(new_buffers.len(), initial_count); - } - - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_empty_write() { - let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = SpillsBufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); + let pool = SpillsBufferPool::create(8 * 1024 * 1024, 1).unwrap(); let operator = create_test_operator().unwrap(); let writer = operator.writer("empty_test").await.unwrap(); - let mut buffer_writer = BufferWriter::new(writer, pool, SpillTarget::Remote); + let mut buffer_writer = pool.buffer_write(writer); let written = buffer_writer.write(b"").unwrap(); assert_eq!(written, 0); @@ -992,24 +799,20 @@ mod tests { assert_eq!(metadata.content_length(), 0); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_close_without_writes() { - let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = SpillsBufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); + let pool = SpillsBufferPool::create(8 * 1024 * 1024, 1).unwrap(); let operator = create_test_operator().unwrap(); let writer = operator.writer("no_write_test").await.unwrap(); - let buffer_writer = BufferWriter::new(writer, pool, SpillTarget::Remote); - - // Should be able to close without any writes + let buffer_writer = pool.buffer_write(writer); let metadata = buffer_writer.close().unwrap(); assert_eq!(metadata.content_length(), 0); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_concurrent_writers() { - let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = SpillsBufferPool::create(runtime.clone(), 16 * 1024 * 1024, 4); + let pool = SpillsBufferPool::create(16 * 1024 * 1024, 4).unwrap(); let operator = create_test_operator().unwrap(); let write_count = Arc::new(AtomicUsize::new(0)); @@ -1025,7 +828,7 @@ mod tests { .writer(&format!("concurrent_{}", i)) .await .unwrap(); - let mut buffer_writer = BufferWriter::new(writer, pool_clone, SpillTarget::Remote); + let mut buffer_writer = pool_clone.buffer_write(writer); for j in 0..10 { let data = format!("Writer {} - Line {}\n", i, j); @@ -1040,35 +843,10 @@ mod tests { handles.push(handle); } - // Wait for all writers to complete for handle in handles { let _metadata = handle.await.unwrap(); } assert_eq!(write_count.load(Ordering::Relaxed), 40); } - - #[tokio::test] - async fn test_writer_close_error_handling() { - let runtime = Arc::new(Runtime::with_worker_threads(2, None).unwrap()); - let pool = SpillsBufferPool::create(runtime.clone(), 8 * 1024 * 1024, 1); - let operator = create_test_operator().unwrap(); - let writer = operator.writer("error_test").await.unwrap(); - - let mut buffer_writer = pool.buffer_write(writer, SpillTarget::Remote); - buffer_writer.write_all(b"test data").unwrap(); - - // Close once - let _metadata = buffer_writer.close().unwrap(); - - // Create another writer and try to close it twice - let writer2 = operator.writer("error_test2").await.unwrap(); - let buffer_writer2 = pool.buffer_write(writer2, SpillTarget::Remote); - let _metadata = buffer_writer2.close().unwrap(); - - // Second close should return error (create new writer for this test) - let writer3 = operator.writer("error_test3").await.unwrap(); - let buffer_writer3 = pool.buffer_write(writer3, SpillTarget::Remote); - let _metadata = buffer_writer3.close().unwrap(); - } } diff --git a/src/query/service/src/spillers/row_group_encoder.rs b/src/query/service/src/spillers/row_group_encoder.rs index 6feeac15750c1..83f72d1b42bb2 100644 --- a/src/query/service/src/spillers/row_group_encoder.rs +++ b/src/query/service/src/spillers/row_group_encoder.rs @@ -57,7 +57,6 @@ use super::Location; use super::SpillerInner; use super::SpillsBufferPool; use super::async_buffer::BufferWriter; -use super::async_buffer::SpillTarget; pub struct Properties { schema: Arc, @@ -467,8 +466,7 @@ impl SpillerInner { }; let remote_location = self.create_unique_location(); - let remote = - pool.buffer_writer(op.clone(), remote_location.clone(), SpillTarget::Remote)?; + let remote = pool.buffer_writer(op.clone(), remote_location.clone())?; Ok(AnyFileWriter::Remote { path: remote_location.clone(), From 612634e29f121132ed1dd2546fc02d0395b3d188 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Tue, 28 Apr 2026 11:53:15 +0800 Subject: [PATCH 02/10] z --- .../service/src/spillers/async_buffer.rs | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/src/query/service/src/spillers/async_buffer.rs b/src/query/service/src/spillers/async_buffer.rs index 755949313e936..ed2bf8d672676 100644 --- a/src/query/service/src/spillers/async_buffer.rs +++ b/src/query/service/src/spillers/async_buffer.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::io; +use std::io::Error; use std::io::Write; use std::ops::Range; use std::sync::Arc; @@ -123,7 +124,7 @@ impl SpillsBufferPool { let available_write_buffers = buffers_tx.clone(); runtime.spawn( async_backtrace::location!(String::from("async_buffer")).frame(async move { - let mut background = Background::create(available_write_buffers); + let mut background = Background::create(); while let Ok(op) = working_queue.recv().await { let span = Span::enter_with_parent("Background::recv", op.span()); background.recv(op).in_span(span).await; @@ -304,6 +305,16 @@ impl BufferWriter { self.buffer_tx.close(); self.response.wait_and_take() } + + fn last_error(&mut self) -> io::Result<()> { + let locked = self.response.mutex.lock(); + let mut locked = locked.unwrap_or_else(PoisonError::into_inner); + + match locked.take_if(|x| x.is_err()) { + Some(Err(err)) => Err(err), + _ => Ok(()), + } + } } impl io::Write for BufferWriter { @@ -364,14 +375,7 @@ impl io::Write for BufferWriter { } } - let locked = self.response.mutex.lock(); - let mut locked = locked.unwrap_or_else(PoisonError::into_inner); - - if let Some(Err(cause)) = locked.take_if(|x| x.is_err()) { - return Err(cause); - } - - Ok(()) + self.last_error() } } @@ -438,7 +442,7 @@ impl SpillsDataWriter { )), SpillsDataWriter::Initialized(writer) => { writer.writer.flush()?; - Ok(writer.writer.inner_mut().flush()?) + Ok(writer.writer.inner_mut().last_error()?) } } } @@ -630,7 +634,7 @@ impl BufferOperator { pub struct Background; impl Background { - pub fn create(_available_buffers: async_channel::Sender) -> Background { + pub fn create() -> Background { Background } @@ -659,6 +663,7 @@ async fn writer_task_loop(mut op: BufferWriterTaskOperator) { if let Err(e) = op.writer.write(buf.clone()).await { op.buffer_rx.close(); op.response.done(Err(io::Error::from(e))); + return; } let buf = match buf.clone().try_into_mut() { @@ -678,6 +683,7 @@ async fn writer_task_loop(mut op: BufferWriterTaskOperator) { io::ErrorKind::BrokenPipe, "buffer pool is closed", ))); + return; } } From 9bad929c2ef083a4a68a6703feba8d7d09793649 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Tue, 28 Apr 2026 12:41:40 +0800 Subject: [PATCH 03/10] z --- .../new_aggregate/new_aggregate_spiller.rs | 3 +- .../new_hash_join/grace/grace_join.rs | 6 +- .../service/src/spillers/async_buffer.rs | 114 ++++++++++++------ .../parquet/src/parquet_reader/row_group.rs | 6 +- 4 files changed, 89 insertions(+), 40 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs index d3b097e2f10c7..3ddfe76212940 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs @@ -413,10 +413,11 @@ fn restore_payload( data_schema.clone(), vec![row_group.clone()], target, + read_setting, )?; let instant = Instant::now(); - let data_block = reader.read(read_setting)?; + let data_block = reader.read()?; let elapsed = instant.elapsed(); let read_size = reader.read_bytes(); diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs index 32a12eb0dc5cc..48459ec982d9f 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs @@ -278,9 +278,10 @@ impl GraceHashJoin { self.desc.build_schema.clone(), data.row_groups, target, + self.read_settings, )?; - while let Some(data_block) = reader.read(self.read_settings)? { + while let Some(data_block) = reader.read()? { self.memory_hash_join.add_block(Some(data_block))?; } } @@ -499,6 +500,7 @@ impl<'a, T: GraceMemoryJoin> RestoreProbeStream<'a, T> { self.join.desc.probe_schema.clone(), data.row_groups, target, + self.join.read_settings, )?; self.spills_reader = Some(reader); break; @@ -510,7 +512,7 @@ impl<'a, T: GraceMemoryJoin> RestoreProbeStream<'a, T> { } if let Some(mut spills_reader) = self.spills_reader.take() { - if let Some(v) = spills_reader.read(self.join.read_settings)? { + if let Some(v) = spills_reader.read()? { self.spills_reader = Some(spills_reader); return Ok(Some(v)); } diff --git a/src/query/service/src/spillers/async_buffer.rs b/src/query/service/src/spillers/async_buffer.rs index ed2bf8d672676..105f05fce5f84 100644 --- a/src/query/service/src/spillers/async_buffer.rs +++ b/src/query/service/src/spillers/async_buffer.rs @@ -13,8 +13,6 @@ // limitations under the License. use std::io; -use std::io::Error; -use std::io::Write; use std::ops::Range; use std::sync::Arc; use std::sync::Condvar; @@ -121,7 +119,6 @@ impl SpillsBufferPool { for _ in 0..workers { let working_queue: async_channel::Receiver = working_rx.clone(); - let available_write_buffers = buffers_tx.clone(); runtime.spawn( async_backtrace::location!(String::from("async_buffer")).frame(async move { let mut background = Background::create(); @@ -244,8 +241,17 @@ impl SpillsBufferPool { data_schema: DataSchemaRef, row_groups: Vec, target: SpillTarget, + settings: ReadSettings, ) -> Result { - SpillsDataReader::create(path, op, data_schema, row_groups, self.clone(), target) + SpillsDataReader::create( + path, + op, + data_schema, + row_groups, + self.clone(), + target, + settings, + ) } pub fn fetch_ranges( @@ -469,10 +475,7 @@ impl SpillsDataWriter { } pub struct SpillsDataReader { - location: String, - operator: Operator, - row_groups: std::collections::VecDeque, - spills_buffer_pool: Arc, + receiver: async_channel::Receiver>, data_schema: DataSchemaRef, field_levels: FieldLevels, read_bytes: usize, @@ -487,6 +490,7 @@ impl SpillsDataReader { row_groups: Vec, spills_buffer_pool: Arc, target: SpillTarget, + settings: ReadSettings, ) -> Result { if row_groups.is_empty() { return Err(ErrorCode::Internal( @@ -500,13 +504,20 @@ impl SpillsDataReader { None, )?; - Ok(SpillsDataReader { + let (tx, rx) = async_channel::bounded(1); + spills_buffer_pool.operator(BufferOperator::ReaderTask(ReaderTaskOperator { + span: Span::enter_with_local_parent("ReaderTask"), + op: operator, location, - operator, - spills_buffer_pool, + row_groups, + settings, + sender: tx, + })); + + Ok(SpillsDataReader { + receiver: rx, data_schema, field_levels, - row_groups: std::collections::VecDeque::from(row_groups), read_bytes: 0, target, }) @@ -516,42 +527,30 @@ impl SpillsDataReader { self.read_bytes } - pub fn read(&mut self, settings: ReadSettings) -> Result> { - let Some(row_group) = self.row_groups.pop_front() else { - return Ok(None); - }; - + pub fn read(&mut self) -> Result> { let start = Instant::now(); - let mut row_group = RowGroupCore::new(row_group, None); - - let read_bytes = std::cell::Cell::new(0usize); - - row_group.fetch(&ProjectionMask::all(), None, |fetch_ranges| { - let chunk_data = self.spills_buffer_pool.fetch_ranges( - self.operator.clone(), - self.location.clone(), - fetch_ranges, - settings, - )?; - let bytes_read = chunk_data.iter().map(|c| c.len()).sum::(); - read_bytes.set(read_bytes.get() + bytes_read); + let fetched = match self.receiver.recv_blocking() { + Ok(Ok(v)) => v, + Ok(Err(e)) => return Err(e), + Err(_) => return Ok(None), + }; - Ok(chunk_data) - })?; + self.read_bytes += fetched.read_bytes; - self.read_bytes += read_bytes.get(); + let mut rg_core = RowGroupCore::new(fetched.metadata, None); + rg_core.fetch(&ProjectionMask::all(), None, |_| Ok(fetched.chunks))?; - let num_rows = row_group.num_rows(); + let num_rows = rg_core.num_rows(); let mut reader = ParquetRecordBatchReader::try_new_with_row_groups( &self.field_levels, - &row_group, + &rg_core, num_rows, None, )?; let batch = reader.next().transpose()?.unwrap(); debug_assert!(reader.next().is_none()); - record_read_profile(self.target, &start, read_bytes.get()); + record_read_profile(self.target, &start, fetched.read_bytes); Ok(Some(DataBlock::from_record_batch( &self.data_schema, &batch, @@ -583,6 +582,21 @@ pub struct BufferWriterTaskOperator { response: Arc>>, } +pub struct FetchedRowGroup { + pub metadata: RowGroupMetaData, + pub chunks: Vec, + pub read_bytes: usize, +} + +pub struct ReaderTaskOperator { + span: Span, + op: Operator, + location: String, + row_groups: Vec, + settings: ReadSettings, + sender: async_channel::Sender>, +} + #[derive(Default)] pub struct BufferOperatorResp { condvar: Condvar, @@ -619,6 +633,7 @@ pub enum BufferOperator { WriterTask(BufferWriterTaskOperator), CreateWriter(CreateWriterOperator), Fetch(FetchOperator), + ReaderTask(ReaderTaskOperator), } impl BufferOperator { @@ -627,6 +642,7 @@ impl BufferOperator { BufferOperator::WriterTask(op) => &op.span, BufferOperator::CreateWriter(op) => &op.span, BufferOperator::Fetch(op) => &op.span, + BufferOperator::ReaderTask(op) => &op.span, } } } @@ -654,6 +670,12 @@ impl Background { let res = get_ranges(&op.fetch_ranges, &op.settings, &op.location, &op.op).await; op.response.done(res.map(|(chunks, _)| chunks)); } + BufferOperator::ReaderTask(op) => { + spawn( + async_backtrace::location!(String::from("reader_task")) + .frame(async move { reader_task_loop(op).await }), + ); + } } } } @@ -691,6 +713,26 @@ async fn writer_task_loop(mut op: BufferWriterTaskOperator) { .done(op.writer.close().await.map_err(io::Error::from)); } +async fn reader_task_loop(op: ReaderTaskOperator) { + for row_group_meta in op.row_groups { + let result = async { + let rg_core = RowGroupCore::new(row_group_meta.clone(), None); + let ranges = rg_core.fetch_ranges(&ProjectionMask::all()); + let (chunks, _) = get_ranges(&ranges, &op.settings, &op.location, &op.op).await?; + let read_bytes = chunks.iter().map(|c| c.len()).sum::(); + Ok(FetchedRowGroup { + metadata: row_group_meta, + chunks, + read_bytes, + }) + }; + + if op.sender.send(result.await).await.is_err() { + break; + } + } +} + #[cfg(test)] mod tests { use std::io::Write; diff --git a/src/query/storages/parquet/src/parquet_reader/row_group.rs b/src/query/storages/parquet/src/parquet_reader/row_group.rs index 46da387c0e2ac..925b2b245bf14 100644 --- a/src/query/storages/parquet/src/parquet_reader/row_group.rs +++ b/src/query/storages/parquet/src/parquet_reader/row_group.rs @@ -226,7 +226,7 @@ impl RowGroupCore { &mut self, projection: &ProjectionMask, selection: Option<&RowSelection>, - get_ranges: impl Fn(Vec>) -> Result>, + get_ranges: impl FnOnce(Vec>) -> Result>, ) -> Result<()> { if let Some((selection, page_locations)) = selection.zip(self.page_locations.as_ref()) { // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the @@ -318,6 +318,10 @@ impl RowGroupCore { } } + pub fn fetch_ranges(&self, projection: &ProjectionMask) -> Vec> { + self.get_fetch_ranges_without_index(projection) + } + fn get_fetch_ranges_without_index(&self, projection: &ProjectionMask) -> Vec> { self.column_chunks .iter() From d12b8037d6da4266a8ecbe504853a6d7b96c9560 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Tue, 28 Apr 2026 16:56:33 +0800 Subject: [PATCH 04/10] fix(query): prevent hang in SpillsBufferPool by spawning Fetch and CreateWriter ops Background workers were directly awaiting Fetch and CreateWriter ops in their event loop, blocking the worker thread for the duration of the I/O. With only 2 workers, concurrent Fetch/CreateWriter ops could starve reader_task_loop tasks (spawned via tokio::spawn onto the same runtime), causing recv_blocking() in SpillsDataReader::read() to hang indefinitely. Fix: spawn Fetch and CreateWriter as independent tasks, consistent with WriterTask and ReaderTask, so workers remain free to dequeue new ops. Co-Authored-By: Claude Sonnet 4.6 --- .../service/src/spillers/async_buffer.rs | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/query/service/src/spillers/async_buffer.rs b/src/query/service/src/spillers/async_buffer.rs index 105f05fce5f84..f47f78730f597 100644 --- a/src/query/service/src/spillers/async_buffer.rs +++ b/src/query/service/src/spillers/async_buffer.rs @@ -663,12 +663,24 @@ impl Background { ); } BufferOperator::CreateWriter(op) => { - let writer = op.op.writer(&op.path).await; - op.response.done(writer); + spawn( + async_backtrace::location!(String::from("create_writer_task")).frame( + async move { + let writer = op.op.writer(&op.path).await; + op.response.done(writer); + }, + ), + ); } BufferOperator::Fetch(op) => { - let res = get_ranges(&op.fetch_ranges, &op.settings, &op.location, &op.op).await; - op.response.done(res.map(|(chunks, _)| chunks)); + spawn( + async_backtrace::location!(String::from("fetch_task")).frame(async move { + let res = + get_ranges(&op.fetch_ranges, &op.settings, &op.location, &op.op) + .await; + op.response.done(res.map(|(chunks, _)| chunks)); + }), + ); } BufferOperator::ReaderTask(op) => { spawn( From 1d7fff26c47140bba63c0402e59ab161cf618dbc Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Tue, 28 Apr 2026 17:37:22 +0800 Subject: [PATCH 05/10] z --- src/query/service/src/spillers/async_buffer.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/query/service/src/spillers/async_buffer.rs b/src/query/service/src/spillers/async_buffer.rs index f47f78730f597..64635f3f57e05 100644 --- a/src/query/service/src/spillers/async_buffer.rs +++ b/src/query/service/src/spillers/async_buffer.rs @@ -676,8 +676,7 @@ impl Background { spawn( async_backtrace::location!(String::from("fetch_task")).frame(async move { let res = - get_ranges(&op.fetch_ranges, &op.settings, &op.location, &op.op) - .await; + get_ranges(&op.fetch_ranges, &op.settings, &op.location, &op.op).await; op.response.done(res.map(|(chunks, _)| chunks)); }), ); From 55882ffad07cce0e8e1e11984dd5e999d4e9bcbd Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Wed, 29 Apr 2026 03:16:29 +0800 Subject: [PATCH 06/10] z --- src/query/service/src/spillers/async_buffer.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/query/service/src/spillers/async_buffer.rs b/src/query/service/src/spillers/async_buffer.rs index 64635f3f57e05..d92a9a406f519 100644 --- a/src/query/service/src/spillers/async_buffer.rs +++ b/src/query/service/src/spillers/async_buffer.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::io; +use std::io::Write; use std::ops::Range; use std::sync::Arc; use std::sync::Condvar; @@ -448,7 +449,7 @@ impl SpillsDataWriter { )), SpillsDataWriter::Initialized(writer) => { writer.writer.flush()?; - Ok(writer.writer.inner_mut().last_error()?) + Ok(writer.writer.inner_mut().flush()?) } } } From 7c58f5da84d894af7ac293c5f1a168b57ef2f9b5 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Wed, 29 Apr 2026 03:25:42 +0800 Subject: [PATCH 07/10] z --- src/query/service/src/spillers/async_buffer.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/query/service/src/spillers/async_buffer.rs b/src/query/service/src/spillers/async_buffer.rs index d92a9a406f519..ecf40d891a26e 100644 --- a/src/query/service/src/spillers/async_buffer.rs +++ b/src/query/service/src/spillers/async_buffer.rs @@ -693,14 +693,15 @@ impl Background { } async fn writer_task_loop(mut op: BufferWriterTaskOperator) { + let mut has_error = false; while let Ok(buf) = op.buffer_rx.recv().await { if let Err(e) = op.writer.write(buf.clone()).await { + has_error = true; op.buffer_rx.close(); op.response.done(Err(io::Error::from(e))); - return; } - let buf = match buf.clone().try_into_mut() { + let buf = match buf.try_into_mut() { Ok(mut b) if b.capacity() == CHUNK_SIZE => { b.clear(); b @@ -712,11 +713,15 @@ async fn writer_task_loop(mut op: BufferWriterTaskOperator) { }; if op.available_buffers.send(buf).await.is_err() { + has_error = true; op.buffer_rx.close(); op.response.done(Err(io::Error::new( io::ErrorKind::BrokenPipe, "buffer pool is closed", ))); + } + + if has_error { return; } } From a1d9893c8bd85cebc5b820e04683f1f1ead5e403 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Wed, 29 Apr 2026 03:27:41 +0800 Subject: [PATCH 08/10] z --- .../service/tests/it/storages/testdata/configs_table_basic.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt index 08942d2bfafd8..ce3100de4c439 100644 --- a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt +++ b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt @@ -236,6 +236,8 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo | 'query' | 'warehouse_id' | 'test_warehouse' | '' | | 'spill' | 'result_set_spilling_disk_quota_ratio' | '0' | '' | | 'spill' | 'sort_spilling_disk_quota_ratio' | '60' | '' | +| 'spill' | 'spill_buffer_pool_memory' | '209715200' | '' | +| 'spill' | 'spill_buffer_pool_workers' | '2' | '' | | 'spill' | 'spill_local_disk_max_bytes' | '18446744073709551615' | '' | | 'spill' | 'spill_local_disk_path' | '' | '' | | 'spill' | 'spill_local_disk_reserved_space_percentage' | '10.0' | '' | From f91b43173b136b5767d3e162fd5096775600c6ad Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Wed, 29 Apr 2026 03:38:49 +0800 Subject: [PATCH 09/10] z --- src/query/service/src/spillers/async_buffer.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/query/service/src/spillers/async_buffer.rs b/src/query/service/src/spillers/async_buffer.rs index ecf40d891a26e..bdd3928df5406 100644 --- a/src/query/service/src/spillers/async_buffer.rs +++ b/src/query/service/src/spillers/async_buffer.rs @@ -292,7 +292,7 @@ pub struct BufferWriter { impl BufferWriter { pub fn close(mut self) -> io::Result { - if let Some(b) = self.current_bytes.take_if(|b| !b.is_empty()) { + if let Some(b) = self.current_bytes.take() { if self.buffer_tx.try_send(b.freeze()).is_err() { return Err(io::ErrorKind::BrokenPipe.into()); } @@ -303,7 +303,7 @@ impl BufferWriter { } pub(super) fn finish(&mut self) -> std::io::Result { - if let Some(b) = self.current_bytes.take_if(|b| !b.is_empty()) { + if let Some(b) = self.current_bytes.take() { if self.buffer_tx.try_send(b.freeze()).is_err() { return Err(io::ErrorKind::BrokenPipe.into()); } @@ -376,7 +376,7 @@ impl io::Write for BufferWriter { } fn flush(&mut self) -> io::Result<()> { - if let Some(b) = self.current_bytes.take_if(|b| !b.is_empty()) { + if let Some(b) = self.current_bytes.take() { if self.buffer_tx.try_send(b.freeze()).is_err() { return Err(io::ErrorKind::BrokenPipe.into()); } @@ -695,7 +695,9 @@ impl Background { async fn writer_task_loop(mut op: BufferWriterTaskOperator) { let mut has_error = false; while let Ok(buf) = op.buffer_rx.recv().await { - if let Err(e) = op.writer.write(buf.clone()).await { + if !buf.is_empty() + && let Err(e) = op.writer.write(buf.clone()).await + { has_error = true; op.buffer_rx.close(); op.response.done(Err(io::Error::from(e))); From 55bbd94c2aa754aee8d6df101a473c4d1c8eba7a Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Wed, 29 Apr 2026 04:36:10 +0800 Subject: [PATCH 10/10] z --- .../service/src/spillers/async_buffer.rs | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/src/query/service/src/spillers/async_buffer.rs b/src/query/service/src/spillers/async_buffer.rs index bdd3928df5406..4e949cce3290a 100644 --- a/src/query/service/src/spillers/async_buffer.rs +++ b/src/query/service/src/spillers/async_buffer.rs @@ -703,24 +703,32 @@ async fn writer_task_loop(mut op: BufferWriterTaskOperator) { op.response.done(Err(io::Error::from(e))); } - let buf = match buf.try_into_mut() { - Ok(mut b) if b.capacity() == CHUNK_SIZE => { - b.clear(); - b - } - _ => { - log::warn!("Failed to recycle buffer, creating new one"); - BytesMut::with_capacity(CHUNK_SIZE) + let mut release_buf = Some(buf); + + while let Some(buf) = release_buf.take() { + let buf = match buf.try_into_mut() { + Ok(mut b) if b.capacity() == CHUNK_SIZE => { + b.clear(); + b + } + _ => { + log::warn!("Failed to recycle buffer, creating new one"); + BytesMut::with_capacity(CHUNK_SIZE) + } + }; + + if op.available_buffers.send(buf).await.is_err() { + op.buffer_rx.close(); + op.response.done(Err(io::Error::new( + io::ErrorKind::BrokenPipe, + "buffer pool is closed", + ))); + return; } - }; - if op.available_buffers.send(buf).await.is_err() { - has_error = true; - op.buffer_rx.close(); - op.response.done(Err(io::Error::new( - io::ErrorKind::BrokenPipe, - "buffer pool is closed", - ))); + if has_error { + release_buf = op.buffer_rx.try_recv().ok(); + } } if has_error {