@@ -20,7 +20,7 @@ use crate::partitioners::partitioned_batch_iterator::{
2020 PartitionedBatchIterator , PartitionedBatchesProducer ,
2121} ;
2222use crate :: partitioners:: ShufflePartitioner ;
23- use crate :: writers:: { BufBatchWriter , PartitionWriter } ;
23+ use crate :: writers:: { BufBatchWriter , PartitionSpillRange , PartitionWriter , SpillInfo } ;
2424use crate :: { comet_partitioning, CometPartitioning , CompressionCodec , ShuffleBlockWriter } ;
2525use arrow:: array:: { ArrayRef , RecordBatch } ;
2626use arrow:: datatypes:: SchemaRef ;
@@ -125,6 +125,9 @@ pub(crate) struct MultiPartitionShuffleRepartitioner {
125125 tracing_enabled : bool ,
126126 /// Size of the write buffer in bytes
127127 write_buffer_size : usize ,
128+ /// Combined spill files. Each entry is a single file containing data from
129+ /// multiple partitions, created during one spill event.
130+ spill_infos : Vec < SpillInfo > ,
128131}
129132
130133impl MultiPartitionShuffleRepartitioner {
@@ -190,6 +193,7 @@ impl MultiPartitionShuffleRepartitioner {
190193 reservation,
191194 tracing_enabled,
192195 write_buffer_size,
196+ spill_infos : vec ! [ ] ,
193197 } )
194198 }
195199
@@ -502,20 +506,53 @@ impl MultiPartitionShuffleRepartitioner {
502506 with_trace ( "shuffle_spill" , self . tracing_enabled , || {
503507 let num_output_partitions = self . partition_writers . len ( ) ;
504508 let mut partitioned_batches = self . partitioned_batches ( ) ;
505- let mut spilled_bytes = 0 ;
509+ let mut spilled_bytes: usize = 0 ;
510+
511+ // Create a single temporary file for this spill event
512+ let temp_file = self
513+ . runtime
514+ . disk_manager
515+ . create_tmp_file ( "shuffle writer spill" ) ?;
516+ let mut spill_file = OpenOptions :: new ( )
517+ . write ( true )
518+ . create ( true )
519+ . truncate ( true )
520+ . open ( temp_file. path ( ) )
521+ . map_err ( |e| {
522+ DataFusionError :: Execution ( format ! ( "Error occurred while spilling {e}" ) )
523+ } ) ?;
524+
525+ let mut partition_ranges = Vec :: with_capacity ( num_output_partitions) ;
506526
507527 for partition_id in 0 ..num_output_partitions {
508528 let partition_writer = & mut self . partition_writers [ partition_id] ;
509529 let mut iter = partitioned_batches. produce ( partition_id) ;
510- spilled_bytes += partition_writer. spill (
530+
531+ let offset = spill_file. stream_position ( ) ?;
532+ let bytes_written = partition_writer. write_to (
511533 & mut iter,
512- & self . runtime ,
534+ & mut spill_file ,
513535 & self . metrics ,
514536 self . write_buffer_size ,
515537 self . batch_size ,
516538 ) ?;
539+
540+ if bytes_written > 0 {
541+ partition_ranges. push ( Some ( PartitionSpillRange {
542+ offset,
543+ length : bytes_written as u64 ,
544+ } ) ) ;
545+ spilled_bytes += bytes_written;
546+ } else {
547+ partition_ranges. push ( None ) ;
548+ }
517549 }
518550
551+ spill_file. flush ( ) ?;
552+
553+ self . spill_infos
554+ . push ( SpillInfo :: new ( temp_file, partition_ranges) ) ;
555+
519556 self . reservation . free ( ) ;
520557 self . metrics . spill_count . add ( 1 ) ;
521558 self . metrics . spilled_bytes . add ( spilled_bytes) ;
@@ -524,8 +561,8 @@ impl MultiPartitionShuffleRepartitioner {
524561 }
525562
526563 #[ cfg( test) ]
527- pub ( crate ) fn partition_writers ( & self ) -> & [ PartitionWriter ] {
528- & self . partition_writers
564+ pub ( crate ) fn spill_count_files ( & self ) -> usize {
565+ self . spill_infos . len ( )
529566 }
530567}
531568
@@ -579,14 +616,12 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
579616 for i in 0 ..num_output_partitions {
580617 offsets[ i] = output_data. stream_position ( ) ?;
581618
582- // if we wrote a spill file for this partition then copy the
583- // contents into the shuffle file
584- if let Some ( spill_path) = self . partition_writers [ i] . path ( ) {
585- // Use raw File handle (not BufReader) so that std::io::copy
586- // can use copy_file_range/sendfile for zero-copy on Linux.
587- let mut spill_file = File :: open ( spill_path) ?;
619+ // Copy spilled data for this partition from each spill file.
620+ // Each SpillInfo is a single file containing data from all partitions
621+ // ordered by partition ID, with byte ranges tracked per partition.
622+ for spill_info in & self . spill_infos {
588623 let mut write_timer = self . metrics . write_time . timer ( ) ;
589- std :: io :: copy ( & mut spill_file , & mut output_data) ?;
624+ spill_info . copy_partition_to ( i , & mut output_data) ?;
590625 write_timer. stop ( ) ;
591626 }
592627
0 commit comments