Skip to content

Commit 244f5e8

Browse files
committed
Use single spill file per spill event in shuffle repartitioner
1 parent 57b729e commit 244f5e8

File tree

1 file changed

+48
-13
lines changed

1 file changed

+48
-13
lines changed

native/shuffle/src/partitioners/multi_partition.rs

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::partitioners::partitioned_batch_iterator::{
2020
PartitionedBatchIterator, PartitionedBatchesProducer,
2121
};
2222
use crate::partitioners::ShufflePartitioner;
23-
use crate::writers::{BufBatchWriter, PartitionWriter};
23+
use crate::writers::{BufBatchWriter, PartitionSpillRange, PartitionWriter, SpillInfo};
2424
use crate::{comet_partitioning, CometPartitioning, CompressionCodec, ShuffleBlockWriter};
2525
use arrow::array::{ArrayRef, RecordBatch};
2626
use arrow::datatypes::SchemaRef;
@@ -125,6 +125,9 @@ pub(crate) struct MultiPartitionShuffleRepartitioner {
125125
tracing_enabled: bool,
126126
/// Size of the write buffer in bytes
127127
write_buffer_size: usize,
128+
/// Combined spill files. Each entry is a single file containing data from
129+
/// multiple partitions, created during one spill event.
130+
spill_infos: Vec<SpillInfo>,
128131
}
129132

130133
impl MultiPartitionShuffleRepartitioner {
@@ -190,6 +193,7 @@ impl MultiPartitionShuffleRepartitioner {
190193
reservation,
191194
tracing_enabled,
192195
write_buffer_size,
196+
spill_infos: vec![],
193197
})
194198
}
195199

@@ -502,20 +506,53 @@ impl MultiPartitionShuffleRepartitioner {
502506
with_trace("shuffle_spill", self.tracing_enabled, || {
503507
let num_output_partitions = self.partition_writers.len();
504508
let mut partitioned_batches = self.partitioned_batches();
505-
let mut spilled_bytes = 0;
509+
let mut spilled_bytes: usize = 0;
510+
511+
// Create a single temporary file for this spill event
512+
let temp_file = self
513+
.runtime
514+
.disk_manager
515+
.create_tmp_file("shuffle writer spill")?;
516+
let mut spill_file = OpenOptions::new()
517+
.write(true)
518+
.create(true)
519+
.truncate(true)
520+
.open(temp_file.path())
521+
.map_err(|e| {
522+
DataFusionError::Execution(format!("Error occurred while spilling {e}"))
523+
})?;
524+
525+
let mut partition_ranges = Vec::with_capacity(num_output_partitions);
506526

507527
for partition_id in 0..num_output_partitions {
508528
let partition_writer = &mut self.partition_writers[partition_id];
509529
let mut iter = partitioned_batches.produce(partition_id);
510-
spilled_bytes += partition_writer.spill(
530+
531+
let offset = spill_file.stream_position()?;
532+
let bytes_written = partition_writer.write_to(
511533
&mut iter,
512-
&self.runtime,
534+
&mut spill_file,
513535
&self.metrics,
514536
self.write_buffer_size,
515537
self.batch_size,
516538
)?;
539+
540+
if bytes_written > 0 {
541+
partition_ranges.push(Some(PartitionSpillRange {
542+
offset,
543+
length: bytes_written as u64,
544+
}));
545+
spilled_bytes += bytes_written;
546+
} else {
547+
partition_ranges.push(None);
548+
}
517549
}
518550

551+
spill_file.flush()?;
552+
553+
self.spill_infos
554+
.push(SpillInfo::new(temp_file, partition_ranges));
555+
519556
self.reservation.free();
520557
self.metrics.spill_count.add(1);
521558
self.metrics.spilled_bytes.add(spilled_bytes);
@@ -524,8 +561,8 @@ impl MultiPartitionShuffleRepartitioner {
524561
}
525562

526563
#[cfg(test)]
527-
pub(crate) fn partition_writers(&self) -> &[PartitionWriter] {
528-
&self.partition_writers
564+
pub(crate) fn spill_count_files(&self) -> usize {
565+
self.spill_infos.len()
529566
}
530567
}
531568

@@ -579,14 +616,12 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
579616
for i in 0..num_output_partitions {
580617
offsets[i] = output_data.stream_position()?;
581618

582-
// if we wrote a spill file for this partition then copy the
583-
// contents into the shuffle file
584-
if let Some(spill_path) = self.partition_writers[i].path() {
585-
// Use raw File handle (not BufReader) so that std::io::copy
586-
// can use copy_file_range/sendfile for zero-copy on Linux.
587-
let mut spill_file = File::open(spill_path)?;
619+
// Copy spilled data for this partition from each spill file.
620+
// Each SpillInfo is a single file containing data from all partitions
621+
// ordered by partition ID, with byte ranges tracked per partition.
622+
for spill_info in &self.spill_infos {
588623
let mut write_timer = self.metrics.write_time.timer();
589-
std::io::copy(&mut spill_file, &mut output_data)?;
624+
spill_info.copy_partition_to(i, &mut output_data)?;
590625
write_timer.stop();
591626
}
592627

0 commit comments

Comments
 (0)