@@ -696,6 +696,8 @@ mod test {
696696 total_rows
697697 }
698698
699+ #[ test]
700+ #[ cfg_attr( miri, ignore) ]
699701 fn test_empty_schema_shuffle_writer ( ) {
700702 use std:: fs;
701703 use std:: io:: Read ;
@@ -849,7 +851,6 @@ mod test {
849851 assert_eq ! ( offset, 0 , "All offsets should be 0 with zero rows" ) ;
850852 }
851853 }
852- }
853854
854855 /// Verify that spilling an empty repartitioner produces no spill files.
855856 #[ tokio:: test]
@@ -1027,6 +1028,141 @@ mod test {
10271028 let _ = fs:: remove_file ( "/tmp/spill_index.out" ) ;
10281029 }
10291030
1031+ /// Verify that spill output file size matches non-spill output.
1032+ /// This specifically tests that the byte range tracking in SpillInfo
1033+ /// correctly accounts for all bytes written during BufBatchWriter flush,
1034+ /// including the final coalescer batch that is emitted only during flush().
1035+ /// A bug here would cause the output file to be much smaller than expected
1036+ /// because copy_partition_with_handle() would copy truncated byte ranges.
1037+ #[ test]
1038+ #[ cfg_attr( miri, ignore) ]
1039+ fn test_spill_output_file_size_matches_non_spill ( ) {
1040+ use std:: fs;
1041+
1042+ let batch_size = 100 ;
1043+ let num_batches = 50 ;
1044+ let num_partitions = 16 ;
1045+
1046+ let batch = create_batch ( batch_size) ;
1047+ let batches = ( 0 ..num_batches) . map ( |_| batch. clone ( ) ) . collect :: < Vec < _ > > ( ) ;
1048+
1049+ // Run 1: no spilling
1050+ {
1051+ let partitions = std:: slice:: from_ref ( & batches) ;
1052+ let exec = ShuffleWriterExec :: try_new (
1053+ Arc :: new ( DataSourceExec :: new ( Arc :: new (
1054+ MemorySourceConfig :: try_new ( partitions, batch. schema ( ) , None ) . unwrap ( ) ,
1055+ ) ) ) ,
1056+ CometPartitioning :: Hash ( vec ! [ Arc :: new( Column :: new( "a" , 0 ) ) ] , num_partitions) ,
1057+ CompressionCodec :: Zstd ( 1 ) ,
1058+ "/tmp/size_no_spill_data.out" . to_string ( ) ,
1059+ "/tmp/size_no_spill_index.out" . to_string ( ) ,
1060+ false ,
1061+ 1024 * 1024 ,
1062+ )
1063+ . unwrap ( ) ;
1064+
1065+ let config = SessionConfig :: new ( ) ;
1066+ let runtime_env = Arc :: new (
1067+ RuntimeEnvBuilder :: new ( )
1068+ . with_memory_limit ( 100 * 1024 * 1024 , 1.0 )
1069+ . build ( )
1070+ . unwrap ( ) ,
1071+ ) ;
1072+ let ctx = SessionContext :: new_with_config_rt ( config, runtime_env) ;
1073+ let task_ctx = ctx. task_ctx ( ) ;
1074+ let stream = exec. execute ( 0 , task_ctx) . unwrap ( ) ;
1075+ let rt = Runtime :: new ( ) . unwrap ( ) ;
1076+ rt. block_on ( collect ( stream) ) . unwrap ( ) ;
1077+ }
1078+
1079+ // Run 2: with spilling (very small memory limit to force many spills
1080+ // with small batches that exercise the coalescer flush path)
1081+ {
1082+ let partitions = std:: slice:: from_ref ( & batches) ;
1083+ let exec = ShuffleWriterExec :: try_new (
1084+ Arc :: new ( DataSourceExec :: new ( Arc :: new (
1085+ MemorySourceConfig :: try_new ( partitions, batch. schema ( ) , None ) . unwrap ( ) ,
1086+ ) ) ) ,
1087+ CometPartitioning :: Hash ( vec ! [ Arc :: new( Column :: new( "a" , 0 ) ) ] , num_partitions) ,
1088+ CompressionCodec :: Zstd ( 1 ) ,
1089+ "/tmp/size_spill_data.out" . to_string ( ) ,
1090+ "/tmp/size_spill_index.out" . to_string ( ) ,
1091+ false ,
1092+ 1024 * 1024 ,
1093+ )
1094+ . unwrap ( ) ;
1095+
1096+ let config = SessionConfig :: new ( ) ;
1097+ let runtime_env = Arc :: new (
1098+ RuntimeEnvBuilder :: new ( )
1099+ . with_memory_limit ( 256 * 1024 , 1.0 )
1100+ . build ( )
1101+ . unwrap ( ) ,
1102+ ) ;
1103+ let ctx = SessionContext :: new_with_config_rt ( config, runtime_env) ;
1104+ let task_ctx = ctx. task_ctx ( ) ;
1105+ let stream = exec. execute ( 0 , task_ctx) . unwrap ( ) ;
1106+ let rt = Runtime :: new ( ) . unwrap ( ) ;
1107+ rt. block_on ( collect ( stream) ) . unwrap ( ) ;
1108+ }
1109+
1110+ let no_spill_data_size = fs:: metadata ( "/tmp/size_no_spill_data.out" ) . unwrap ( ) . len ( ) ;
1111+ let spill_data_size = fs:: metadata ( "/tmp/size_spill_data.out" ) . unwrap ( ) . len ( ) ;
1112+
1113+ // The spill output may differ slightly due to batch coalescing boundaries
1114+ // affecting compression ratios, but it must be within a reasonable range.
1115+ // A data-loss bug would produce a file that is drastically smaller (e.g. <10%).
1116+ let ratio = spill_data_size as f64 / no_spill_data_size as f64 ;
1117+ assert ! (
1118+ ratio > 0.5 && ratio < 2.0 ,
1119+ "Spill output size ({spill_data_size}) should be comparable to \
1120+ non-spill output size ({no_spill_data_size}), ratio={ratio:.3}. \
1121+ A very small ratio indicates data loss in spill byte range tracking."
1122+ ) ;
1123+
1124+ // Also verify row counts match
1125+ let parse_offsets = |index_data : & [ u8 ] | -> Vec < i64 > {
1126+ index_data
1127+ . chunks ( 8 )
1128+ . map ( |chunk| i64:: from_le_bytes ( chunk. try_into ( ) . unwrap ( ) ) )
1129+ . collect ( )
1130+ } ;
1131+
1132+ let no_spill_index = fs:: read ( "/tmp/size_no_spill_index.out" ) . unwrap ( ) ;
1133+ let spill_index = fs:: read ( "/tmp/size_spill_index.out" ) . unwrap ( ) ;
1134+ let no_spill_offsets = parse_offsets ( & no_spill_index) ;
1135+ let spill_offsets = parse_offsets ( & spill_index) ;
1136+ let no_spill_data = fs:: read ( "/tmp/size_no_spill_data.out" ) . unwrap ( ) ;
1137+ let spill_data = fs:: read ( "/tmp/size_spill_data.out" ) . unwrap ( ) ;
1138+
1139+ let total_rows = batch_size * num_batches;
1140+ let mut ns_total = 0 ;
1141+ let mut s_total = 0 ;
1142+ for i in 0 ..num_partitions {
1143+ let ns_rows = read_all_ipc_blocks (
1144+ & no_spill_data[ no_spill_offsets[ i] as usize ..no_spill_offsets[ i + 1 ] as usize ] ,
1145+ ) ;
1146+ let s_rows = read_all_ipc_blocks (
1147+ & spill_data[ spill_offsets[ i] as usize ..spill_offsets[ i + 1 ] as usize ] ,
1148+ ) ;
1149+ assert_eq ! (
1150+ ns_rows, s_rows,
1151+ "Partition {i} row count mismatch: no_spill={ns_rows}, spill={s_rows}"
1152+ ) ;
1153+ ns_total += ns_rows;
1154+ s_total += s_rows;
1155+ }
1156+ assert_eq ! ( ns_total, total_rows, "Non-spill total row count mismatch" ) ;
1157+ assert_eq ! ( s_total, total_rows, "Spill total row count mismatch" ) ;
1158+
1159+ // Cleanup
1160+ let _ = fs:: remove_file ( "/tmp/size_no_spill_data.out" ) ;
1161+ let _ = fs:: remove_file ( "/tmp/size_no_spill_index.out" ) ;
1162+ let _ = fs:: remove_file ( "/tmp/size_spill_data.out" ) ;
1163+ let _ = fs:: remove_file ( "/tmp/size_spill_index.out" ) ;
1164+ }
1165+
10301166 /// Verify multiple spill events with subsequent insert_batch calls
10311167 /// produce correct output.
10321168 #[ tokio:: test]
0 commit comments