@@ -865,6 +865,141 @@ mod test {
865865 let _ = fs:: remove_file ( "/tmp/spill_index.out" ) ;
866866 }
867867
868+ /// Verify that spill output file size matches non-spill output.
869+ /// This specifically tests that the byte range tracking in SpillInfo
870+ /// correctly accounts for all bytes written during BufBatchWriter flush,
871+ /// including the final coalescer batch that is emitted only during flush().
872+ /// A bug here would cause the output file to be much smaller than expected
873+ /// because copy_partition_with_handle() would copy truncated byte ranges.
874+ #[ test]
875+ #[ cfg_attr( miri, ignore) ]
876+ fn test_spill_output_file_size_matches_non_spill ( ) {
877+ use std:: fs;
878+
879+ let batch_size = 100 ;
880+ let num_batches = 50 ;
881+ let num_partitions = 16 ;
882+
883+ let batch = create_batch ( batch_size) ;
884+ let batches = ( 0 ..num_batches) . map ( |_| batch. clone ( ) ) . collect :: < Vec < _ > > ( ) ;
885+
886+ // Run 1: no spilling
887+ {
888+ let partitions = std:: slice:: from_ref ( & batches) ;
889+ let exec = ShuffleWriterExec :: try_new (
890+ Arc :: new ( DataSourceExec :: new ( Arc :: new (
891+ MemorySourceConfig :: try_new ( partitions, batch. schema ( ) , None ) . unwrap ( ) ,
892+ ) ) ) ,
893+ CometPartitioning :: Hash ( vec ! [ Arc :: new( Column :: new( "a" , 0 ) ) ] , num_partitions) ,
894+ CompressionCodec :: Zstd ( 1 ) ,
895+ "/tmp/size_no_spill_data.out" . to_string ( ) ,
896+ "/tmp/size_no_spill_index.out" . to_string ( ) ,
897+ false ,
898+ 1024 * 1024 ,
899+ )
900+ . unwrap ( ) ;
901+
902+ let config = SessionConfig :: new ( ) ;
903+ let runtime_env = Arc :: new (
904+ RuntimeEnvBuilder :: new ( )
905+ . with_memory_limit ( 100 * 1024 * 1024 , 1.0 )
906+ . build ( )
907+ . unwrap ( ) ,
908+ ) ;
909+ let ctx = SessionContext :: new_with_config_rt ( config, runtime_env) ;
910+ let task_ctx = ctx. task_ctx ( ) ;
911+ let stream = exec. execute ( 0 , task_ctx) . unwrap ( ) ;
912+ let rt = Runtime :: new ( ) . unwrap ( ) ;
913+ rt. block_on ( collect ( stream) ) . unwrap ( ) ;
914+ }
915+
916+ // Run 2: with spilling (very small memory limit to force many spills
917+ // with small batches that exercise the coalescer flush path)
918+ {
919+ let partitions = std:: slice:: from_ref ( & batches) ;
920+ let exec = ShuffleWriterExec :: try_new (
921+ Arc :: new ( DataSourceExec :: new ( Arc :: new (
922+ MemorySourceConfig :: try_new ( partitions, batch. schema ( ) , None ) . unwrap ( ) ,
923+ ) ) ) ,
924+ CometPartitioning :: Hash ( vec ! [ Arc :: new( Column :: new( "a" , 0 ) ) ] , num_partitions) ,
925+ CompressionCodec :: Zstd ( 1 ) ,
926+ "/tmp/size_spill_data.out" . to_string ( ) ,
927+ "/tmp/size_spill_index.out" . to_string ( ) ,
928+ false ,
929+ 1024 * 1024 ,
930+ )
931+ . unwrap ( ) ;
932+
933+ let config = SessionConfig :: new ( ) ;
934+ let runtime_env = Arc :: new (
935+ RuntimeEnvBuilder :: new ( )
936+ . with_memory_limit ( 256 * 1024 , 1.0 )
937+ . build ( )
938+ . unwrap ( ) ,
939+ ) ;
940+ let ctx = SessionContext :: new_with_config_rt ( config, runtime_env) ;
941+ let task_ctx = ctx. task_ctx ( ) ;
942+ let stream = exec. execute ( 0 , task_ctx) . unwrap ( ) ;
943+ let rt = Runtime :: new ( ) . unwrap ( ) ;
944+ rt. block_on ( collect ( stream) ) . unwrap ( ) ;
945+ }
946+
947+ let no_spill_data_size = fs:: metadata ( "/tmp/size_no_spill_data.out" ) . unwrap ( ) . len ( ) ;
948+ let spill_data_size = fs:: metadata ( "/tmp/size_spill_data.out" ) . unwrap ( ) . len ( ) ;
949+
950+ // The spill output may differ slightly due to batch coalescing boundaries
951+ // affecting compression ratios, but it must be within a reasonable range.
952+ // A data-loss bug would produce a file that is drastically smaller (e.g. <10%).
953+ let ratio = spill_data_size as f64 / no_spill_data_size as f64 ;
954+ assert ! (
955+ ratio > 0.5 && ratio < 2.0 ,
956+ "Spill output size ({spill_data_size}) should be comparable to \
957+ non-spill output size ({no_spill_data_size}), ratio={ratio:.3}. \
958+ A very small ratio indicates data loss in spill byte range tracking."
959+ ) ;
960+
961+ // Also verify row counts match
962+ let parse_offsets = |index_data : & [ u8 ] | -> Vec < i64 > {
963+ index_data
964+ . chunks ( 8 )
965+ . map ( |chunk| i64:: from_le_bytes ( chunk. try_into ( ) . unwrap ( ) ) )
966+ . collect ( )
967+ } ;
968+
969+ let no_spill_index = fs:: read ( "/tmp/size_no_spill_index.out" ) . unwrap ( ) ;
970+ let spill_index = fs:: read ( "/tmp/size_spill_index.out" ) . unwrap ( ) ;
971+ let no_spill_offsets = parse_offsets ( & no_spill_index) ;
972+ let spill_offsets = parse_offsets ( & spill_index) ;
973+ let no_spill_data = fs:: read ( "/tmp/size_no_spill_data.out" ) . unwrap ( ) ;
974+ let spill_data = fs:: read ( "/tmp/size_spill_data.out" ) . unwrap ( ) ;
975+
976+ let total_rows = batch_size * num_batches;
977+ let mut ns_total = 0 ;
978+ let mut s_total = 0 ;
979+ for i in 0 ..num_partitions {
980+ let ns_rows = read_all_ipc_blocks (
981+ & no_spill_data[ no_spill_offsets[ i] as usize ..no_spill_offsets[ i + 1 ] as usize ] ,
982+ ) ;
983+ let s_rows = read_all_ipc_blocks (
984+ & spill_data[ spill_offsets[ i] as usize ..spill_offsets[ i + 1 ] as usize ] ,
985+ ) ;
986+ assert_eq ! (
987+ ns_rows, s_rows,
988+ "Partition {i} row count mismatch: no_spill={ns_rows}, spill={s_rows}"
989+ ) ;
990+ ns_total += ns_rows;
991+ s_total += s_rows;
992+ }
993+ assert_eq ! ( ns_total, total_rows, "Non-spill total row count mismatch" ) ;
994+ assert_eq ! ( s_total, total_rows, "Spill total row count mismatch" ) ;
995+
996+ // Cleanup
997+ let _ = fs:: remove_file ( "/tmp/size_no_spill_data.out" ) ;
998+ let _ = fs:: remove_file ( "/tmp/size_no_spill_index.out" ) ;
999+ let _ = fs:: remove_file ( "/tmp/size_spill_data.out" ) ;
1000+ let _ = fs:: remove_file ( "/tmp/size_spill_index.out" ) ;
1001+ }
1002+
8681003 /// Verify multiple spill events with subsequent insert_batch calls
8691004 /// produce correct output.
8701005 #[ tokio:: test]
0 commit comments