Skip to content

Commit b25c86f

Browse files
committed
Add unit test for spill output file size correctness
1 parent eed010c commit b25c86f

1 file changed

Lines changed: 135 additions & 0 deletions

File tree

native/shuffle/src/shuffle_writer.rs

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1027,6 +1027,141 @@ mod test {
10271027
let _ = fs::remove_file("/tmp/spill_index.out");
10281028
}
10291029

1030+
/// Verify that spill output file size matches non-spill output.
1031+
/// This specifically tests that the byte range tracking in SpillInfo
1032+
/// correctly accounts for all bytes written during BufBatchWriter flush,
1033+
/// including the final coalescer batch that is emitted only during flush().
1034+
/// A bug here would cause the output file to be much smaller than expected
1035+
/// because copy_partition_with_handle() would copy truncated byte ranges.
1036+
#[test]
1037+
#[cfg_attr(miri, ignore)]
1038+
fn test_spill_output_file_size_matches_non_spill() {
1039+
use std::fs;
1040+
1041+
let batch_size = 100;
1042+
let num_batches = 50;
1043+
let num_partitions = 16;
1044+
1045+
let batch = create_batch(batch_size);
1046+
let batches = (0..num_batches).map(|_| batch.clone()).collect::<Vec<_>>();
1047+
1048+
// Run 1: no spilling
1049+
{
1050+
let partitions = std::slice::from_ref(&batches);
1051+
let exec = ShuffleWriterExec::try_new(
1052+
Arc::new(DataSourceExec::new(Arc::new(
1053+
MemorySourceConfig::try_new(partitions, batch.schema(), None).unwrap(),
1054+
))),
1055+
CometPartitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions),
1056+
CompressionCodec::Zstd(1),
1057+
"/tmp/size_no_spill_data.out".to_string(),
1058+
"/tmp/size_no_spill_index.out".to_string(),
1059+
false,
1060+
1024 * 1024,
1061+
)
1062+
.unwrap();
1063+
1064+
let config = SessionConfig::new();
1065+
let runtime_env = Arc::new(
1066+
RuntimeEnvBuilder::new()
1067+
.with_memory_limit(100 * 1024 * 1024, 1.0)
1068+
.build()
1069+
.unwrap(),
1070+
);
1071+
let ctx = SessionContext::new_with_config_rt(config, runtime_env);
1072+
let task_ctx = ctx.task_ctx();
1073+
let stream = exec.execute(0, task_ctx).unwrap();
1074+
let rt = Runtime::new().unwrap();
1075+
rt.block_on(collect(stream)).unwrap();
1076+
}
1077+
1078+
// Run 2: with spilling (very small memory limit to force many spills
1079+
// with small batches that exercise the coalescer flush path)
1080+
{
1081+
let partitions = std::slice::from_ref(&batches);
1082+
let exec = ShuffleWriterExec::try_new(
1083+
Arc::new(DataSourceExec::new(Arc::new(
1084+
MemorySourceConfig::try_new(partitions, batch.schema(), None).unwrap(),
1085+
))),
1086+
CometPartitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions),
1087+
CompressionCodec::Zstd(1),
1088+
"/tmp/size_spill_data.out".to_string(),
1089+
"/tmp/size_spill_index.out".to_string(),
1090+
false,
1091+
1024 * 1024,
1092+
)
1093+
.unwrap();
1094+
1095+
let config = SessionConfig::new();
1096+
let runtime_env = Arc::new(
1097+
RuntimeEnvBuilder::new()
1098+
.with_memory_limit(256 * 1024, 1.0)
1099+
.build()
1100+
.unwrap(),
1101+
);
1102+
let ctx = SessionContext::new_with_config_rt(config, runtime_env);
1103+
let task_ctx = ctx.task_ctx();
1104+
let stream = exec.execute(0, task_ctx).unwrap();
1105+
let rt = Runtime::new().unwrap();
1106+
rt.block_on(collect(stream)).unwrap();
1107+
}
1108+
1109+
let no_spill_data_size = fs::metadata("/tmp/size_no_spill_data.out").unwrap().len();
1110+
let spill_data_size = fs::metadata("/tmp/size_spill_data.out").unwrap().len();
1111+
1112+
// The spill output may differ slightly due to batch coalescing boundaries
1113+
// affecting compression ratios, but it must be within a reasonable range.
1114+
// A data-loss bug would produce a file that is drastically smaller (e.g. <10%).
1115+
let ratio = spill_data_size as f64 / no_spill_data_size as f64;
1116+
assert!(
1117+
ratio > 0.5 && ratio < 2.0,
1118+
"Spill output size ({spill_data_size}) should be comparable to \
1119+
non-spill output size ({no_spill_data_size}), ratio={ratio:.3}. \
1120+
A very small ratio indicates data loss in spill byte range tracking."
1121+
);
1122+
1123+
// Also verify row counts match
1124+
let parse_offsets = |index_data: &[u8]| -> Vec<i64> {
1125+
index_data
1126+
.chunks(8)
1127+
.map(|chunk| i64::from_le_bytes(chunk.try_into().unwrap()))
1128+
.collect()
1129+
};
1130+
1131+
let no_spill_index = fs::read("/tmp/size_no_spill_index.out").unwrap();
1132+
let spill_index = fs::read("/tmp/size_spill_index.out").unwrap();
1133+
let no_spill_offsets = parse_offsets(&no_spill_index);
1134+
let spill_offsets = parse_offsets(&spill_index);
1135+
let no_spill_data = fs::read("/tmp/size_no_spill_data.out").unwrap();
1136+
let spill_data = fs::read("/tmp/size_spill_data.out").unwrap();
1137+
1138+
let total_rows = batch_size * num_batches;
1139+
let mut ns_total = 0;
1140+
let mut s_total = 0;
1141+
for i in 0..num_partitions {
1142+
let ns_rows = read_all_ipc_blocks(
1143+
&no_spill_data[no_spill_offsets[i] as usize..no_spill_offsets[i + 1] as usize],
1144+
);
1145+
let s_rows = read_all_ipc_blocks(
1146+
&spill_data[spill_offsets[i] as usize..spill_offsets[i + 1] as usize],
1147+
);
1148+
assert_eq!(
1149+
ns_rows, s_rows,
1150+
"Partition {i} row count mismatch: no_spill={ns_rows}, spill={s_rows}"
1151+
);
1152+
ns_total += ns_rows;
1153+
s_total += s_rows;
1154+
}
1155+
assert_eq!(ns_total, total_rows, "Non-spill total row count mismatch");
1156+
assert_eq!(s_total, total_rows, "Spill total row count mismatch");
1157+
1158+
// Cleanup
1159+
let _ = fs::remove_file("/tmp/size_no_spill_data.out");
1160+
let _ = fs::remove_file("/tmp/size_no_spill_index.out");
1161+
let _ = fs::remove_file("/tmp/size_spill_data.out");
1162+
let _ = fs::remove_file("/tmp/size_spill_index.out");
1163+
}
1164+
10301165
/// Verify multiple spill events with subsequent insert_batch calls
10311166
/// produce correct output.
10321167
#[tokio::test]

0 commit comments

Comments
 (0)