Skip to content

Commit 57b729e

Browse files
committed
Refactor PartitionWriter and add SpillInfo for combined spill files
1 parent 10513e5 commit 57b729e

2 files changed

Lines changed: 63 additions & 57 deletions

File tree

native/shuffle/src/writers/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,4 @@ mod spill;
2323
pub(crate) use buf_batch_writer::BufBatchWriter;
2424
pub(crate) use checksum::Checksum;
2525
pub use shuffle_block_writer::{CompressionCodec, ShuffleBlockWriter};
26-
pub(crate) use spill::PartitionWriter;
26+
pub(crate) use spill::{PartitionSpillRange, PartitionWriter, SpillInfo};

native/shuffle/src/writers/spill.rs

Lines changed: 62 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,68 @@ use super::ShuffleBlockWriter;
1919
use crate::metrics::ShufflePartitionerMetrics;
2020
use crate::partitioners::PartitionedBatchIterator;
2121
use crate::writers::buf_batch_writer::BufBatchWriter;
22-
use datafusion::common::DataFusionError;
2322
use datafusion::execution::disk_manager::RefCountedTempFile;
24-
use datafusion::execution::runtime_env::RuntimeEnv;
25-
use std::fs::{File, OpenOptions};
23+
use std::fs::File;
24+
use std::io::{Read, Seek, SeekFrom, Write};
2625

27-
/// A temporary disk file for spilling a partition's intermediate shuffle data.
28-
struct SpillFile {
29-
temp_file: RefCountedTempFile,
30-
file: File,
26+
/// The byte range of a single partition's data within a combined spill file.
27+
#[derive(Debug, Clone)]
28+
pub(crate) struct PartitionSpillRange {
29+
pub offset: u64,
30+
pub length: u64,
3131
}
3232

33-
/// Manages encoding and optional disk spilling for a single shuffle partition.
33+
/// Represents a single spill file that contains data from multiple partitions.
34+
/// Data is written sequentially ordered by partition ID. Each partition's byte
35+
/// range is tracked in `partition_ranges` so it can be read back during merge.
36+
pub(crate) struct SpillInfo {
37+
/// The temporary file handle -- kept alive to prevent cleanup until we are done.
38+
_temp_file: RefCountedTempFile,
39+
/// Path to the spill file on disk.
40+
path: std::path::PathBuf,
41+
/// Byte range for each partition. None means the partition had no data in this spill.
42+
pub partition_ranges: Vec<Option<PartitionSpillRange>>,
43+
}
44+
45+
impl SpillInfo {
46+
pub(crate) fn new(
47+
temp_file: RefCountedTempFile,
48+
partition_ranges: Vec<Option<PartitionSpillRange>>,
49+
) -> Self {
50+
let path = temp_file.path().to_path_buf();
51+
Self {
52+
_temp_file: temp_file,
53+
path,
54+
partition_ranges,
55+
}
56+
}
57+
58+
/// Copy the data for `partition_id` from this spill file into `output`.
59+
/// Returns the number of bytes copied.
60+
pub(crate) fn copy_partition_to(
61+
&self,
62+
partition_id: usize,
63+
output: &mut impl Write,
64+
) -> datafusion::common::Result<u64> {
65+
if let Some(ref range) = self.partition_ranges[partition_id] {
66+
if range.length == 0 {
67+
return Ok(0);
68+
}
69+
let mut spill_file = File::open(&self.path)?;
70+
spill_file.seek(SeekFrom::Start(range.offset))?;
71+
let mut limited = spill_file.take(range.length);
72+
let copied = std::io::copy(&mut limited, output)?;
73+
Ok(copied)
74+
} else {
75+
Ok(0)
76+
}
77+
}
78+
}
79+
80+
/// Manages encoding for a single shuffle partition. Does not own any spill file --
81+
/// spill files are managed at the repartitioner level as combined SpillInfo objects.
3482
pub(crate) struct PartitionWriter {
35-
/// Spill file for intermediate shuffle output for this partition. Each spill event
36-
/// will append to this file and the contents will be copied to the shuffle file at
37-
/// the end of processing.
38-
spill_file: Option<SpillFile>,
39-
/// Writer that performs encoding and compression
83+
/// Writer that performs encoding and compression.
4084
shuffle_block_writer: ShuffleBlockWriter,
4185
}
4286

@@ -45,51 +89,25 @@ impl PartitionWriter {
4589
shuffle_block_writer: ShuffleBlockWriter,
4690
) -> datafusion::common::Result<Self> {
4791
Ok(Self {
48-
spill_file: None,
4992
shuffle_block_writer,
5093
})
5194
}
5295

53-
fn ensure_spill_file_created(
54-
&mut self,
55-
runtime: &RuntimeEnv,
56-
) -> datafusion::common::Result<()> {
57-
if self.spill_file.is_none() {
58-
// Spill file is not yet created, create it
59-
let spill_file = runtime
60-
.disk_manager
61-
.create_tmp_file("shuffle writer spill")?;
62-
let spill_data = OpenOptions::new()
63-
.write(true)
64-
.create(true)
65-
.truncate(true)
66-
.open(spill_file.path())
67-
.map_err(|e| {
68-
DataFusionError::Execution(format!("Error occurred while spilling {e}"))
69-
})?;
70-
self.spill_file = Some(SpillFile {
71-
temp_file: spill_file,
72-
file: spill_data,
73-
});
74-
}
75-
Ok(())
76-
}
77-
78-
pub(crate) fn spill(
96+
/// Encode and write a partition's batches to the provided writer.
97+
/// Returns the number of bytes written.
98+
pub(crate) fn write_to<W: Write>(
7999
&mut self,
80100
iter: &mut PartitionedBatchIterator,
81-
runtime: &RuntimeEnv,
101+
writer: &mut W,
82102
metrics: &ShufflePartitionerMetrics,
83103
write_buffer_size: usize,
84104
batch_size: usize,
85105
) -> datafusion::common::Result<usize> {
86106
if let Some(batch) = iter.next() {
87-
self.ensure_spill_file_created(runtime)?;
88-
89107
let total_bytes_written = {
90108
let mut buf_batch_writer = BufBatchWriter::new(
91109
&mut self.shuffle_block_writer,
92-
&mut self.spill_file.as_mut().unwrap().file,
110+
writer,
93111
write_buffer_size,
94112
batch_size,
95113
);
@@ -106,21 +124,9 @@ impl PartitionWriter {
106124
buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?;
107125
bytes_written
108126
};
109-
110127
Ok(total_bytes_written)
111128
} else {
112129
Ok(0)
113130
}
114131
}
115-
116-
pub(crate) fn path(&self) -> Option<&std::path::Path> {
117-
self.spill_file
118-
.as_ref()
119-
.map(|spill_file| spill_file.temp_file.path())
120-
}
121-
122-
#[cfg(test)]
123-
pub(crate) fn has_spill_file(&self) -> bool {
124-
self.spill_file.is_some()
125-
}
126132
}

0 commit comments

Comments
 (0)