Skip to content

Commit d5345d9

Browse files
committed
Use single spill file for multiple partitions in native shuffle
1 parent 9b2f1b1 commit d5345d9

File tree

4 files changed

+119
-83
lines changed

4 files changed

+119
-83
lines changed

native/shuffle/src/partitioners/multi_partition.rs

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::partitioners::partitioned_batch_iterator::{
2020
PartitionedBatchIterator, PartitionedBatchesProducer,
2121
};
2222
use crate::partitioners::ShufflePartitioner;
23-
use crate::writers::{BufBatchWriter, PartitionWriter};
23+
use crate::writers::{BufBatchWriter, PartitionSpillRange, PartitionWriter, SpillInfo};
2424
use crate::{comet_partitioning, CometPartitioning, CompressionCodec, ShuffleBlockWriter};
2525
use arrow::array::{ArrayRef, RecordBatch};
2626
use arrow::datatypes::SchemaRef;
@@ -125,6 +125,9 @@ pub(crate) struct MultiPartitionShuffleRepartitioner {
125125
tracing_enabled: bool,
126126
/// Size of the write buffer in bytes
127127
write_buffer_size: usize,
128+
/// Combined spill files. Each entry is a single file containing data from
129+
/// multiple partitions, created during one spill event.
130+
spill_infos: Vec<SpillInfo>,
128131
}
129132

130133
impl MultiPartitionShuffleRepartitioner {
@@ -190,6 +193,7 @@ impl MultiPartitionShuffleRepartitioner {
190193
reservation,
191194
tracing_enabled,
192195
write_buffer_size,
196+
spill_infos: vec![],
193197
})
194198
}
195199

@@ -502,20 +506,53 @@ impl MultiPartitionShuffleRepartitioner {
502506
with_trace("shuffle_spill", self.tracing_enabled, || {
503507
let num_output_partitions = self.partition_writers.len();
504508
let mut partitioned_batches = self.partitioned_batches();
505-
let mut spilled_bytes = 0;
509+
let mut spilled_bytes: usize = 0;
510+
511+
// Create a single temporary file for this spill event
512+
let temp_file = self
513+
.runtime
514+
.disk_manager
515+
.create_tmp_file("shuffle writer spill")?;
516+
let mut spill_file = OpenOptions::new()
517+
.write(true)
518+
.create(true)
519+
.truncate(true)
520+
.open(temp_file.path())
521+
.map_err(|e| {
522+
DataFusionError::Execution(format!("Error occurred while spilling {e}"))
523+
})?;
524+
525+
let mut partition_ranges = Vec::with_capacity(num_output_partitions);
506526

507527
for partition_id in 0..num_output_partitions {
508528
let partition_writer = &mut self.partition_writers[partition_id];
509529
let mut iter = partitioned_batches.produce(partition_id);
510-
spilled_bytes += partition_writer.spill(
530+
531+
let offset = spill_file.stream_position()?;
532+
let bytes_written = partition_writer.write_to(
511533
&mut iter,
512-
&self.runtime,
534+
&mut spill_file,
513535
&self.metrics,
514536
self.write_buffer_size,
515537
self.batch_size,
516538
)?;
539+
540+
if bytes_written > 0 {
541+
partition_ranges.push(Some(PartitionSpillRange {
542+
offset,
543+
length: bytes_written as u64,
544+
}));
545+
spilled_bytes += bytes_written;
546+
} else {
547+
partition_ranges.push(None);
548+
}
517549
}
518550

551+
spill_file.flush()?;
552+
553+
self.spill_infos
554+
.push(SpillInfo::new(temp_file, partition_ranges));
555+
519556
self.reservation.free();
520557
self.metrics.spill_count.add(1);
521558
self.metrics.spilled_bytes.add(spilled_bytes);
@@ -524,8 +561,8 @@ impl MultiPartitionShuffleRepartitioner {
524561
}
525562

526563
#[cfg(test)]
527-
pub(crate) fn partition_writers(&self) -> &[PartitionWriter] {
528-
&self.partition_writers
564+
pub(crate) fn spill_count_files(&self) -> usize {
565+
self.spill_infos.len()
529566
}
530567
}
531568

@@ -579,14 +616,12 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
579616
for i in 0..num_output_partitions {
580617
offsets[i] = output_data.stream_position()?;
581618

582-
// if we wrote a spill file for this partition then copy the
583-
// contents into the shuffle file
584-
if let Some(spill_path) = self.partition_writers[i].path() {
585-
// Use raw File handle (not BufReader) so that std::io::copy
586-
// can use copy_file_range/sendfile for zero-copy on Linux.
587-
let mut spill_file = File::open(spill_path)?;
619+
// Copy spilled data for this partition from each spill file.
620+
// Each SpillInfo is a single file containing data from all partitions
621+
// ordered by partition ID, with byte ranges tracked per partition.
622+
for spill_info in &self.spill_infos {
588623
let mut write_timer = self.metrics.write_time.timer();
589-
std::io::copy(&mut spill_file, &mut output_data)?;
624+
spill_info.copy_partition_to(i, &mut output_data)?;
590625
write_timer.stop();
591626
}
592627

native/shuffle/src/shuffle_writer.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -367,25 +367,20 @@ mod test {
367367

368368
repartitioner.insert_batch(batch.clone()).await.unwrap();
369369

370-
{
371-
let partition_writers = repartitioner.partition_writers();
372-
assert_eq!(partition_writers.len(), 2);
373-
374-
assert!(!partition_writers[0].has_spill_file());
375-
assert!(!partition_writers[1].has_spill_file());
376-
}
370+
// before spill, no spill files should exist
371+
assert_eq!(repartitioner.spill_count_files(), 0);
377372

378373
repartitioner.spill().unwrap();
379374

380-
// after spill, there should be spill files
381-
{
382-
let partition_writers = repartitioner.partition_writers();
383-
assert!(partition_writers[0].has_spill_file());
384-
assert!(partition_writers[1].has_spill_file());
385-
}
375+
// after spill, exactly one combined spill file should exist (not one per partition)
376+
assert_eq!(repartitioner.spill_count_files(), 1);
386377

387378
// insert another batch after spilling
388379
repartitioner.insert_batch(batch.clone()).await.unwrap();
380+
381+
// spill again -- should create a second combined spill file
382+
repartitioner.spill().unwrap();
383+
assert_eq!(repartitioner.spill_count_files(), 2);
389384
}
390385

391386
fn create_runtime(memory_limit: usize) -> Arc<RuntimeEnv> {

native/shuffle/src/writers/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,4 @@ mod spill;
2323
pub(crate) use buf_batch_writer::BufBatchWriter;
2424
pub(crate) use checksum::Checksum;
2525
pub use shuffle_block_writer::{CompressionCodec, ShuffleBlockWriter};
26-
pub(crate) use spill::PartitionWriter;
26+
pub(crate) use spill::{PartitionSpillRange, PartitionWriter, SpillInfo};

native/shuffle/src/writers/spill.rs

Lines changed: 62 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,68 @@ use super::ShuffleBlockWriter;
1919
use crate::metrics::ShufflePartitionerMetrics;
2020
use crate::partitioners::PartitionedBatchIterator;
2121
use crate::writers::buf_batch_writer::BufBatchWriter;
22-
use datafusion::common::DataFusionError;
2322
use datafusion::execution::disk_manager::RefCountedTempFile;
24-
use datafusion::execution::runtime_env::RuntimeEnv;
25-
use std::fs::{File, OpenOptions};
23+
use std::fs::File;
24+
use std::io::{Read, Seek, SeekFrom, Write};
2625

27-
/// A temporary disk file for spilling a partition's intermediate shuffle data.
28-
struct SpillFile {
29-
temp_file: RefCountedTempFile,
30-
file: File,
26+
/// The byte range of a single partition's data within a combined spill file.
27+
#[derive(Debug, Clone)]
28+
pub(crate) struct PartitionSpillRange {
29+
pub offset: u64,
30+
pub length: u64,
3131
}
3232

33-
/// Manages encoding and optional disk spilling for a single shuffle partition.
33+
/// Represents a single spill file that contains data from multiple partitions.
34+
/// Data is written sequentially ordered by partition ID. Each partition's byte
35+
/// range is tracked in `partition_ranges` so it can be read back during merge.
36+
pub(crate) struct SpillInfo {
37+
/// The temporary file handle -- kept alive to prevent cleanup until we are done.
38+
_temp_file: RefCountedTempFile,
39+
/// Path to the spill file on disk.
40+
path: std::path::PathBuf,
41+
/// Byte range for each partition. None means the partition had no data in this spill.
42+
pub partition_ranges: Vec<Option<PartitionSpillRange>>,
43+
}
44+
45+
impl SpillInfo {
46+
pub(crate) fn new(
47+
temp_file: RefCountedTempFile,
48+
partition_ranges: Vec<Option<PartitionSpillRange>>,
49+
) -> Self {
50+
let path = temp_file.path().to_path_buf();
51+
Self {
52+
_temp_file: temp_file,
53+
path,
54+
partition_ranges,
55+
}
56+
}
57+
58+
/// Copy the data for `partition_id` from this spill file into `output`.
59+
/// Returns the number of bytes copied.
60+
pub(crate) fn copy_partition_to(
61+
&self,
62+
partition_id: usize,
63+
output: &mut impl Write,
64+
) -> datafusion::common::Result<u64> {
65+
if let Some(ref range) = self.partition_ranges[partition_id] {
66+
if range.length == 0 {
67+
return Ok(0);
68+
}
69+
let mut spill_file = File::open(&self.path)?;
70+
spill_file.seek(SeekFrom::Start(range.offset))?;
71+
let mut limited = spill_file.take(range.length);
72+
let copied = std::io::copy(&mut limited, output)?;
73+
Ok(copied)
74+
} else {
75+
Ok(0)
76+
}
77+
}
78+
}
79+
80+
/// Manages encoding for a single shuffle partition. Does not own any spill file --
81+
/// spill files are managed at the repartitioner level as combined SpillInfo objects.
3482
pub(crate) struct PartitionWriter {
35-
/// Spill file for intermediate shuffle output for this partition. Each spill event
36-
/// will append to this file and the contents will be copied to the shuffle file at
37-
/// the end of processing.
38-
spill_file: Option<SpillFile>,
39-
/// Writer that performs encoding and compression
83+
/// Writer that performs encoding and compression.
4084
shuffle_block_writer: ShuffleBlockWriter,
4185
}
4286

@@ -45,51 +89,25 @@ impl PartitionWriter {
4589
shuffle_block_writer: ShuffleBlockWriter,
4690
) -> datafusion::common::Result<Self> {
4791
Ok(Self {
48-
spill_file: None,
4992
shuffle_block_writer,
5093
})
5194
}
5295

53-
fn ensure_spill_file_created(
54-
&mut self,
55-
runtime: &RuntimeEnv,
56-
) -> datafusion::common::Result<()> {
57-
if self.spill_file.is_none() {
58-
// Spill file is not yet created, create it
59-
let spill_file = runtime
60-
.disk_manager
61-
.create_tmp_file("shuffle writer spill")?;
62-
let spill_data = OpenOptions::new()
63-
.write(true)
64-
.create(true)
65-
.truncate(true)
66-
.open(spill_file.path())
67-
.map_err(|e| {
68-
DataFusionError::Execution(format!("Error occurred while spilling {e}"))
69-
})?;
70-
self.spill_file = Some(SpillFile {
71-
temp_file: spill_file,
72-
file: spill_data,
73-
});
74-
}
75-
Ok(())
76-
}
77-
78-
pub(crate) fn spill(
96+
/// Encode and write a partition's batches to the provided writer.
97+
/// Returns the number of bytes written.
98+
pub(crate) fn write_to<W: Write>(
7999
&mut self,
80100
iter: &mut PartitionedBatchIterator,
81-
runtime: &RuntimeEnv,
101+
writer: &mut W,
82102
metrics: &ShufflePartitionerMetrics,
83103
write_buffer_size: usize,
84104
batch_size: usize,
85105
) -> datafusion::common::Result<usize> {
86106
if let Some(batch) = iter.next() {
87-
self.ensure_spill_file_created(runtime)?;
88-
89107
let total_bytes_written = {
90108
let mut buf_batch_writer = BufBatchWriter::new(
91109
&mut self.shuffle_block_writer,
92-
&mut self.spill_file.as_mut().unwrap().file,
110+
writer,
93111
write_buffer_size,
94112
batch_size,
95113
);
@@ -106,21 +124,9 @@ impl PartitionWriter {
106124
buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?;
107125
bytes_written
108126
};
109-
110127
Ok(total_bytes_written)
111128
} else {
112129
Ok(0)
113130
}
114131
}
115-
116-
pub(crate) fn path(&self) -> Option<&std::path::Path> {
117-
self.spill_file
118-
.as_ref()
119-
.map(|spill_file| spill_file.temp_file.path())
120-
}
121-
122-
#[cfg(test)]
123-
pub(crate) fn has_spill_file(&self) -> bool {
124-
self.spill_file.is_some()
125-
}
126132
}

0 commit comments

Comments
 (0)