Skip to content

Commit eed010c

Browse files
committed
Cache spill file handles during finalize to avoid N*S open() calls
1 parent 4ca6eac commit eed010c

File tree

2 files changed

+33
-13
lines changed

2 files changed

+33
-13
lines changed

native/shuffle/src/partitioners/multi_partition.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -529,20 +529,22 @@ impl MultiPartitionShuffleRepartitioner {
529529
let mut iter = partitioned_batches.produce(partition_id);
530530

531531
let offset = spill_file.stream_position()?;
532-
let bytes_written = partition_writer.write_to(
532+
partition_writer.write_to(
533533
&mut iter,
534534
&mut spill_file,
535535
&self.metrics,
536536
self.write_buffer_size,
537537
self.batch_size,
538538
)?;
539+
let end_offset = spill_file.stream_position()?;
540+
let actual_bytes = (end_offset - offset) as usize;
539541

540-
if bytes_written > 0 {
542+
if actual_bytes > 0 {
541543
partition_ranges.push(Some(PartitionSpillRange {
542544
offset,
543-
length: bytes_written as u64,
545+
length: actual_bytes as u64,
544546
}));
545-
spilled_bytes += bytes_written;
547+
spilled_bytes += actual_bytes;
546548
} else {
547549
partition_ranges.push(None);
548550
}
@@ -612,16 +614,24 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
612614

613615
let mut output_data = BufWriter::new(output_data);
614616

617+
// Pre-open all spill files once to avoid repeated File::open() calls.
618+
// With N partitions and S spill files, this reduces open() calls from
619+
// N*S to S.
620+
let mut spill_handles: Vec<_> = self
621+
.spill_infos
622+
.iter()
623+
.map(|info| info.open_for_read())
624+
.collect::<datafusion::common::Result<Vec<_>>>()?;
625+
615626
#[allow(clippy::needless_range_loop)]
616627
for i in 0..num_output_partitions {
617628
offsets[i] = output_data.stream_position()?;
618629

619-
// Copy spilled data for this partition from each spill file.
620-
// Each SpillInfo is a single file containing data from all partitions
621-
// ordered by partition ID, with byte ranges tracked per partition.
622-
for spill_info in &self.spill_infos {
630+
// Copy spilled data for this partition from each spill file
631+
// using pre-opened file handles.
632+
for (spill_info, handle) in self.spill_infos.iter().zip(spill_handles.iter_mut()) {
623633
let mut write_timer = self.metrics.write_time.timer();
624-
spill_info.copy_partition_to(i, &mut output_data)?;
634+
spill_info.copy_partition_with_handle(i, handle, &mut output_data)?;
625635
write_timer.stop();
626636
}
627637

native/shuffle/src/writers/spill.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use super::ShuffleBlockWriter;
1919
use crate::metrics::ShufflePartitionerMetrics;
2020
use crate::partitioners::PartitionedBatchIterator;
2121
use crate::writers::buf_batch_writer::BufBatchWriter;
22+
use datafusion::common::DataFusionError;
2223
use datafusion::execution::disk_manager::RefCountedTempFile;
2324
use std::fs::File;
2425
use std::io::{Read, Seek, SeekFrom, Write};
@@ -55,26 +56,35 @@ impl SpillInfo {
5556
}
5657
}
5758

58-
/// Copy the data for `partition_id` from this spill file into `output`.
59+
/// Copy the data for `partition_id` using a pre-opened file handle.
60+
/// Avoids repeated File::open() calls when iterating over partitions.
5961
/// Returns the number of bytes copied.
60-
pub(crate) fn copy_partition_to(
62+
pub(crate) fn copy_partition_with_handle(
6163
&self,
6264
partition_id: usize,
65+
spill_file: &mut File,
6366
output: &mut impl Write,
6467
) -> datafusion::common::Result<u64> {
6568
if let Some(ref range) = self.partition_ranges[partition_id] {
6669
if range.length == 0 {
6770
return Ok(0);
6871
}
69-
let mut spill_file = File::open(&self.path)?;
7072
spill_file.seek(SeekFrom::Start(range.offset))?;
71-
let mut limited = spill_file.take(range.length);
73+
let mut limited = Read::take(spill_file, range.length);
7274
let copied = std::io::copy(&mut limited, output)?;
7375
Ok(copied)
7476
} else {
7577
Ok(0)
7678
}
7779
}
80+
81+
/// Open the spill file for reading. The returned handle can be reused
82+
/// across multiple copy_partition_with_handle() calls.
83+
pub(crate) fn open_for_read(&self) -> datafusion::common::Result<File> {
84+
File::open(&self.path).map_err(|e| {
85+
DataFusionError::Execution(format!("Failed to open spill file for reading: {e}"))
86+
})
87+
}
7888
}
7989

8090
/// Manages encoding for a single shuffle partition. Does not own any spill file --

0 commit comments

Comments
 (0)