Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,20 @@ object CometConf extends ShimCometConf {
.checkValue(v => v > 0, "Write buffer size must be positive")
.createWithDefault(1)

val COMET_SHUFFLE_BATCH_SPILL_LIMIT: ConfigEntry[Int] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.batchSpillLimit")
.category(CATEGORY_SHUFFLE)
.doc(
"Maximum number of input batches buffered before the native shuffle writer " +
"spills to disk, regardless of available memory. This prevents the shuffle writer " +
"from buffering too much data, which can degrade throughput due to poor cache " +
"locality during the final write phase. A value of 0 disables this threshold, " +
"meaning spills only occur when the memory pool is full. " +
"The default is 100.")
.intConf
.checkValue(v => v >= 0, "Batch spill limit must be non-negative")
.createWithDefault(100)

val COMET_SHUFFLE_PREFER_DICTIONARY_RATIO: ConfigEntry[Double] = conf(
"spark.comet.shuffle.preferDictionary.ratio")
.category(CATEGORY_SHUFFLE)
Expand Down
18 changes: 18 additions & 0 deletions docs/source/user-guide/latest/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,24 @@ partitioning keys. Columns that are not partitioning keys may contain complex ty
Comet Columnar shuffle is JVM-based and supports `HashPartitioning`, `RoundRobinPartitioning`, `RangePartitioning`, and
`SinglePartitioning`. This shuffle implementation supports complex data types as partitioning keys.

### Shuffle Spill Tuning

The native shuffle writer buffers input batches in memory and periodically spills them to disk. Two mechanisms
control when spilling occurs:

1. **Memory pressure**: When the memory pool rejects an allocation, the writer spills its buffered data to disk.

2. **Batch spill limit**: The writer also spills after buffering a fixed number of input batches, regardless of
memory availability. This prevents the writer from accumulating too much data, which can degrade throughput
due to poor cache locality during the final write phase.

The batch spill limit is configured via `spark.comet.exec.shuffle.batchSpillLimit` (default: 100). Setting it
to 0 disables this threshold, meaning spills only occur under memory pressure.

In most cases, the default value of 100 provides good performance. If you observe that shuffle throughput
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update this section once I have run benchmarks with different values

decreases when more memory is available to Comet, try lowering this value. If you observe excessive spilling
with small data, try increasing it or disabling it with 0.

### Shuffle Compression

By default, Spark compresses shuffle files using LZ4 compression. Comet overrides this behavior with ZSTD compression.
Expand Down
2 changes: 2 additions & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1371,6 +1371,7 @@ impl PhysicalPlanner {
}?;

let write_buffer_size = writer.write_buffer_size as usize;
let batch_spill_limit = writer.batch_spill_limit as usize;
let shuffle_writer = Arc::new(ShuffleWriterExec::try_new(
Arc::clone(&child.native_plan),
partitioning,
Expand All @@ -1379,6 +1380,7 @@ impl PhysicalPlanner {
writer.output_index_file.clone(),
writer.tracing_enabled,
write_buffer_size,
batch_spill_limit,
)?);

Ok((
Expand Down
4 changes: 4 additions & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ message ShuffleWriter {
// Size of the write buffer in bytes used when writing shuffle data to disk.
// Larger values may improve write performance but use more memory.
int32 write_buffer_size = 8;
// Maximum number of buffered batches before the shuffle writer spills to disk,
// regardless of available memory. A value of 0 disables this threshold
// (spills only when the memory pool is full).
int32 batch_spill_limit = 9;
}

message ParquetWriter {
Expand Down
1 change: 1 addition & 0 deletions native/shuffle/benches/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ fn create_shuffle_writer_exec(
"/tmp/index.out".to_string(),
false,
1024 * 1024,
0,
)
.unwrap()
}
Expand Down
9 changes: 9 additions & 0 deletions native/shuffle/src/bin/shuffle_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ struct Args {
/// Each task reads the same input and writes to its own output files.
#[arg(long, default_value_t = 1)]
concurrent_tasks: usize,

/// Maximum number of buffered batches before spilling (0 = disabled)
#[arg(long, default_value_t = 0)]
batch_spill_limit: usize,
}

fn main() {
Expand Down Expand Up @@ -413,6 +417,7 @@ fn run_shuffle_write(
args.limit,
data_file.to_string(),
index_file.to_string(),
args.batch_spill_limit,
)
.await
.unwrap();
Expand All @@ -436,6 +441,7 @@ async fn execute_shuffle_write(
limit: usize,
data_file: String,
index_file: String,
batch_spill_limit: usize,
) -> datafusion::common::Result<(MetricsSet, MetricsSet)> {
let config = SessionConfig::new().with_batch_size(batch_size);
let mut runtime_builder = RuntimeEnvBuilder::new();
Expand Down Expand Up @@ -477,6 +483,7 @@ async fn execute_shuffle_write(
index_file,
false,
write_buffer_size,
batch_spill_limit,
)
.expect("Failed to create ShuffleWriterExec");

Expand Down Expand Up @@ -541,6 +548,7 @@ fn run_concurrent_shuffle_writes(
let memory_limit = args.memory_limit;
let write_buffer_size = args.write_buffer_size;
let limit = args.limit;
let batch_spill_limit = args.batch_spill_limit;

handles.push(tokio::spawn(async move {
execute_shuffle_write(
Expand All @@ -553,6 +561,7 @@ fn run_concurrent_shuffle_writes(
limit,
data_file,
index_file,
batch_spill_limit,
)
.await
.unwrap()
Expand Down
8 changes: 7 additions & 1 deletion native/shuffle/src/partitioners/multi_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ pub(crate) struct MultiPartitionShuffleRepartitioner {
tracing_enabled: bool,
/// Size of the write buffer in bytes
write_buffer_size: usize,
/// Maximum number of buffered batches before spilling, 0 = disabled
batch_spill_limit: usize,
}

impl MultiPartitionShuffleRepartitioner {
Expand All @@ -141,6 +143,7 @@ impl MultiPartitionShuffleRepartitioner {
codec: CompressionCodec,
tracing_enabled: bool,
write_buffer_size: usize,
batch_spill_limit: usize,
) -> datafusion::common::Result<Self> {
let num_output_partitions = partitioning.partition_count();
assert_ne!(
Expand Down Expand Up @@ -190,6 +193,7 @@ impl MultiPartitionShuffleRepartitioner {
reservation,
tracing_enabled,
write_buffer_size,
batch_spill_limit,
})
}

Expand Down Expand Up @@ -427,7 +431,9 @@ impl MultiPartitionShuffleRepartitioner {
mem_growth += after_size.saturating_sub(before_size);
}

if self.reservation.try_grow(mem_growth).is_err() {
let over_batch_limit =
self.batch_spill_limit > 0 && self.buffered_batches.len() >= self.batch_spill_limit;
if over_batch_limit || self.reservation.try_grow(mem_growth).is_err() {
self.spill()?;
}

Expand Down
13 changes: 13 additions & 0 deletions native/shuffle/src/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ pub struct ShuffleWriterExec {
tracing_enabled: bool,
/// Size of the write buffer in bytes
write_buffer_size: usize,
/// Maximum number of buffered batches before spilling, 0 = disabled
batch_spill_limit: usize,
}

impl ShuffleWriterExec {
Expand All @@ -80,6 +82,7 @@ impl ShuffleWriterExec {
output_index_file: String,
tracing_enabled: bool,
write_buffer_size: usize,
batch_spill_limit: usize,
) -> Result<Self> {
let cache = Arc::new(PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&input.schema())),
Expand All @@ -98,6 +101,7 @@ impl ShuffleWriterExec {
codec,
tracing_enabled,
write_buffer_size,
batch_spill_limit,
})
}
}
Expand Down Expand Up @@ -158,6 +162,7 @@ impl ExecutionPlan for ShuffleWriterExec {
self.output_index_file.clone(),
self.tracing_enabled,
self.write_buffer_size,
self.batch_spill_limit,
)?)),
_ => panic!("ShuffleWriterExec wrong number of children"),
}
Expand Down Expand Up @@ -185,6 +190,7 @@ impl ExecutionPlan for ShuffleWriterExec {
self.codec.clone(),
self.tracing_enabled,
self.write_buffer_size,
self.batch_spill_limit,
)
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
)
Expand All @@ -205,6 +211,7 @@ async fn external_shuffle(
codec: CompressionCodec,
tracing_enabled: bool,
write_buffer_size: usize,
batch_spill_limit: usize,
) -> Result<SendableRecordBatchStream> {
let schema = input.schema();

Expand Down Expand Up @@ -241,6 +248,7 @@ async fn external_shuffle(
codec,
tracing_enabled,
write_buffer_size,
batch_spill_limit,
)?),
};

Expand Down Expand Up @@ -363,6 +371,7 @@ mod test {
CompressionCodec::Lz4Frame,
false,
1024 * 1024, // write_buffer_size: 1MB default
0, // batch_spill_limit: disabled
)
.unwrap();

Expand Down Expand Up @@ -467,6 +476,7 @@ mod test {
"/tmp/index.out".to_string(),
false,
1024 * 1024, // write_buffer_size: 1MB default
0, // batch_spill_limit: disabled
)
.unwrap();

Expand Down Expand Up @@ -526,6 +536,7 @@ mod test {
index_file.clone(),
false,
1024 * 1024,
0,
)
.unwrap();

Expand Down Expand Up @@ -730,6 +741,7 @@ mod test {
index_file.to_str().unwrap().to_string(),
false,
1024 * 1024,
0,
)
.unwrap();

Expand Down Expand Up @@ -818,6 +830,7 @@ mod test {
index_file.to_str().unwrap().to_string(),
false,
1024 * 1024,
0,
)
.unwrap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ class CometNativeShuffleWriter[K, V](
CometConf.COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL.get)
shuffleWriterBuilder.setWriteBufferSize(
CometConf.COMET_SHUFFLE_WRITE_BUFFER_SIZE.get().min(Int.MaxValue).toInt)
shuffleWriterBuilder.setBatchSpillLimit(CometConf.COMET_SHUFFLE_BATCH_SPILL_LIMIT.get())

outputPartitioning match {
case p if isSinglePartitioning(p) =>
Expand Down
Loading