From 349336ccbd04684bd81f08fdd0b251b9f7e0890f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 15 Jan 2026 11:23:51 -0700 Subject: [PATCH 1/3] split CometShuffleExternalSorter into sync/async implementations --- .../sort/CometShuffleExternalSorter.java | 407 +++--------------- .../sort/CometShuffleExternalSorterAsync.java | 405 +++++++++++++++++ .../sort/CometShuffleExternalSorterSync.java | 334 ++++++++++++++ .../shuffle/CometUnsafeShuffleWriter.java | 2 +- 4 files changed, 790 insertions(+), 358 deletions(-) create mode 100644 spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorterAsync.java create mode 100644 spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorterSync.java diff --git a/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java index b026c6bc4f..60bdd2d1de 100644 --- a/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java +++ b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java @@ -19,37 +19,20 @@ package org.apache.spark.shuffle.sort; -import java.io.File; import java.io.IOException; -import java.util.LinkedList; -import java.util.concurrent.*; - -import scala.Tuple2; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; -import org.apache.spark.memory.SparkOutOfMemoryError; import org.apache.spark.shuffle.ShuffleWriteMetricsReporter; -import org.apache.spark.shuffle.comet.CometShuffleChecksumSupport; import org.apache.spark.shuffle.comet.CometShuffleMemoryAllocatorTrait; -import org.apache.spark.shuffle.comet.TooLargePageException; -import org.apache.spark.sql.comet.execution.shuffle.CometUnsafeShuffleWriter; -import org.apache.spark.sql.comet.execution.shuffle.ShuffleThreadPool; import org.apache.spark.sql.comet.execution.shuffle.SpillInfo; import org.apache.spark.sql.types.StructType; import org.apache.spark.storage.BlockManager; -import org.apache.spark.storage.TempShuffleBlockId; -import org.apache.spark.unsafe.UnsafeAlignedOffset; -import org.apache.spark.unsafe.array.LongArray; -import org.apache.spark.util.Utils; import org.apache.comet.CometConf$; /** - * An external sorter that is specialized for sort-based shuffle. + * An external sorter interface specialized for sort-based shuffle. * *

Incoming records are appended to data pages. When all records have been inserted (or when the * current thread's shuffle memory limit is reached), the in-memory records are sorted according to @@ -57,335 +40,33 @@ * file (or multiple files, if we've spilled). * *

Unlike {@link org.apache.spark.util.collection.ExternalSorter}, this sorter does not merge its - * spill files. Instead, this merging is performed in {@link CometUnsafeShuffleWriter}, which uses a - * specialized merge procedure that avoids extra serialization/deserialization. - * - *

This sorter provides async spilling write mode. When spilling, it will submit a task to thread - * pool to write shuffle spilling file. After submitting the task, it will continue to buffer, sort - * incoming records and submit another spilling task once spilling threshold reached again or memory - * is not enough to buffer incoming records. Each spilling task will write a shuffle spilling file - * separately. After all records have been sorted and spilled, all spill files will be merged by - * {@link CometUnsafeShuffleWriter}. + * spill files. Instead, this merging is performed in {@link + * org.apache.spark.sql.comet.execution.shuffle.CometUnsafeShuffleWriter}, which uses a specialized + * merge procedure that avoids extra serialization/deserialization. */ -public final class CometShuffleExternalSorter implements CometShuffleChecksumSupport { - - private static final Logger logger = LoggerFactory.getLogger(CometShuffleExternalSorter.class); - - public static int MAXIMUM_PAGE_SIZE_BYTES = PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES; - private final int numPartitions; - private final BlockManager blockManager; - private final TaskContext taskContext; - private final ShuffleWriteMetricsReporter writeMetrics; - - private final StructType schema; - - /** Force this sorter to spill when there are this many elements in memory. */ - private final int numElementsForSpillThreshold; - - // When this external sorter allocates memory of `sorterArray`, we need to keep its - // assigned initial size. After spilling, we will reset the array to its initial size. - // See `sorterArray` comment for more details. - private int initialSize; - - /** All sorters with memory pages used by the sorters. */ - private final ConcurrentLinkedQueue spillingSorters = new ConcurrentLinkedQueue<>(); - - private SpillSorter activeSpillSorter; - - private final LinkedList spills = new LinkedList<>(); - - /** Peak memory used by this sorter so far, in bytes. */ - private long peakMemoryUsedBytes; - - // Checksum calculator for each partition. Empty when shuffle checksum disabled. - private final long[] partitionChecksums; +public interface CometShuffleExternalSorter { - private final String checksumAlgorithm; - private final String compressionCodec; - private final int compressionLevel; + int MAXIMUM_PAGE_SIZE_BYTES = PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES; - // The memory allocator for this sorter. It is used to allocate/free memory pages for this sorter. - // Because we need to allocate off-heap memory regardless of configured Spark memory mode - // (on-heap/off-heap), we need a separate memory allocator. - private final CometShuffleMemoryAllocatorTrait allocator; - - /** Whether to write shuffle spilling file in async mode */ - private final boolean isAsync; - - /** Thread pool shared for async spilling write */ - private final ExecutorService threadPool; - - private final int threadNum; - - private ConcurrentLinkedQueue> asyncSpillTasks = new ConcurrentLinkedQueue<>(); - - private boolean spilling = false; - - private final int uaoSize = UnsafeAlignedOffset.getUaoSize(); - private final double preferDictionaryRatio; - private final boolean tracingEnabled; - - public CometShuffleExternalSorter( - CometShuffleMemoryAllocatorTrait allocator, - BlockManager blockManager, - TaskContext taskContext, - int initialSize, - int numPartitions, - SparkConf conf, - ShuffleWriteMetricsReporter writeMetrics, - StructType schema) { - this.allocator = allocator; - this.blockManager = blockManager; - this.taskContext = taskContext; - this.numPartitions = numPartitions; - this.schema = schema; - this.numElementsForSpillThreshold = - (int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD().get(); - this.writeMetrics = writeMetrics; - - this.peakMemoryUsedBytes = getMemoryUsage(); - this.partitionChecksums = createPartitionChecksums(numPartitions, conf); - this.checksumAlgorithm = getChecksumAlgorithm(conf); - this.compressionCodec = CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_CODEC().get(); - this.compressionLevel = - (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL().get(); - - this.initialSize = initialSize; - - this.isAsync = (boolean) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED().get(); - this.tracingEnabled = (boolean) CometConf$.MODULE$.COMET_TRACING_ENABLED().get(); - - if (isAsync) { - this.threadNum = (int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM().get(); - assert (this.threadNum > 0); - this.threadPool = ShuffleThreadPool.getThreadPool(); - } else { - this.threadNum = 0; - this.threadPool = null; - } - - this.preferDictionaryRatio = - (double) CometConf$.MODULE$.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO().get(); - - this.activeSpillSorter = createSpillSorter(); - } - - /** Creates a new SpillSorter with all required dependencies. */ - private SpillSorter createSpillSorter() { - return new SpillSorter( - allocator, - initialSize, - schema, - uaoSize, - preferDictionaryRatio, - compressionCodec, - compressionLevel, - checksumAlgorithm, - partitionChecksums, - writeMetrics, - taskContext, - spills, - this::spill); - } - - public long[] getChecksums() { - return partitionChecksums; - } + /** Returns the checksums for each partition. */ + long[] getChecksums(); /** Sort and spill the current records in response to memory pressure. */ - public void spill() throws IOException { - if (spilling || activeSpillSorter == null || activeSpillSorter.numRecords() == 0) { - return; - } - - // In async mode, if new in-memory sorter cannot allocate required array, it triggers spill - // here. This method will initiate new sorter following normal spill logic and casue stack - // overflow eventually. So we need to avoid triggering spilling again while spilling. But - // we cannot make this as "synchronized" because it will block the caller thread. - spilling = true; - - logger.info( - "Thread {} spilling sort data of {} to disk ({} {} so far)", - Thread.currentThread().getId(), - Utils.bytesToString(getMemoryUsage()), - spills.size(), - spills.size() > 1 ? " times" : " time"); - - final Tuple2 spilledFileInfo = - blockManager.diskBlockManager().createTempShuffleBlock(); - final File file = spilledFileInfo._2(); - final TempShuffleBlockId blockId = spilledFileInfo._1(); - final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId); - - activeSpillSorter.setSpillInfo(spillInfo); - - if (isAsync) { - SpillSorter spillingSorter = activeSpillSorter; - Callable task = - () -> { - spillingSorter.writeSortedFileNative(false, tracingEnabled); - final long spillSize = spillingSorter.freeMemory(); - spillingSorter.freeArray(); - spillingSorters.remove(spillingSorter); - - // Reset the in-memory sorter's pointer array only after freeing up the memory pages - // holding the records. Otherwise, if the task is over allocated memory, then without - // freeing the memory pages, we might not be able to get memory for the pointer array. - synchronized (CometShuffleExternalSorter.this) { - taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); - } - - return null; - }; - - spillingSorters.add(spillingSorter); - asyncSpillTasks.add(threadPool.submit(task)); - - while (asyncSpillTasks.size() == threadNum) { - for (Future spillingTask : asyncSpillTasks) { - if (spillingTask.isDone()) { - asyncSpillTasks.remove(spillingTask); - break; - } - } - } - - activeSpillSorter = createSpillSorter(); - } else { - activeSpillSorter.writeSortedFileNative(false, tracingEnabled); - final long spillSize = activeSpillSorter.freeMemory(); - activeSpillSorter.reset(); - - // Reset the in-memory sorter's pointer array only after freeing up the memory pages holding - // the - // records. Otherwise, if the task is over allocated memory, then without freeing the memory - // pages, we might not be able to get memory for the pointer array. - synchronized (CometShuffleExternalSorter.this) { - taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); - } - } - - spilling = false; - } - - private long getMemoryUsage() { - long totalPageSize = 0; - for (SpillSorter sorter : spillingSorters) { - totalPageSize += sorter.getMemoryUsage(); - } - if (activeSpillSorter != null) { - totalPageSize += activeSpillSorter.getMemoryUsage(); - } - return totalPageSize; - } - - private void updatePeakMemoryUsed() { - long mem = getMemoryUsage(); - if (mem > peakMemoryUsedBytes) { - peakMemoryUsedBytes = mem; - } - } + void spill() throws IOException; /** Return the peak memory used so far, in bytes. */ - public long getPeakMemoryUsedBytes() { - updatePeakMemoryUsed(); - return peakMemoryUsedBytes; - } - - private long freeMemory() { - updatePeakMemoryUsed(); - long memoryFreed = 0; - if (isAsync) { - for (SpillSorter sorter : spillingSorters) { - memoryFreed += sorter.freeMemory(); - sorter.freeArray(); - } - } - memoryFreed += activeSpillSorter.freeMemory(); - activeSpillSorter.freeArray(); - - return memoryFreed; - } + long getPeakMemoryUsedBytes(); /** Force all memory and spill files to be deleted; called by shuffle error-handling code. */ - public void cleanupResources() { - freeMemory(); - - for (SpillInfo spill : spills) { - if (spill.file.exists() && !spill.file.delete()) { - logger.error("Unable to delete spill file {}", spill.file.getPath()); - } - } - } - - /** - * Checks whether there is enough space to insert an additional record in to the sort pointer - * array and grows the array if additional space is required. If the required space cannot be - * obtained, then the in-memory data will be spilled to disk. - */ - private void growPointerArrayIfNecessary() throws IOException { - assert (activeSpillSorter != null); - if (!activeSpillSorter.hasSpaceForAnotherRecord()) { - long used = activeSpillSorter.getMemoryUsage(); - LongArray array; - try { - // could trigger spilling - array = allocator.allocateArray(used / 8 * 2); - } catch (TooLargePageException e) { - // The pointer array is too big to fix in a single page, spill. - spill(); - return; - } catch (SparkOutOfMemoryError e) { - // Cannot allocate enough memory, spill and reset pointer array. - try { - spill(); - } catch (SparkOutOfMemoryError e2) { - // Cannot allocate memory even after spilling, throw the error. - if (!activeSpillSorter.hasSpaceForAnotherRecord()) { - logger.error("Unable to grow the pointer array"); - throw e2; - } - } - return; - } - // check if spilling is triggered or not - if (activeSpillSorter.hasSpaceForAnotherRecord()) { - allocator.freeArray(array); - } else { - activeSpillSorter.expandPointerArray(array); - } - } - } + void cleanupResources(); /** * Writes a record to the shuffle sorter. This copies the record data into this external sorter's * managed memory, which may trigger spilling if the copy would exceed the memory limit. It * inserts a pointer for the record and record's partition id into the in-memory sorter. */ - public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId) - throws IOException { - - assert (activeSpillSorter != null); - int threshold = numElementsForSpillThreshold; - if (activeSpillSorter.numRecords() >= threshold) { - logger.info( - "Spilling data because number of spilledRecords crossed the threshold " + threshold); - spill(); - } - - growPointerArrayIfNecessary(); - - // Need 4 or 8 bytes to store the record length. - final int required = length + uaoSize; - // Acquire enough memory to store the record. - // If we cannot acquire enough memory, we will spill current writers. - if (!activeSpillSorter.acquireNewPageIfNecessary(required)) { - // Spilling is happened, initiate new memory page for new writer. - activeSpillSorter.initialCurrentPage(required); - } - - activeSpillSorter.insertRecord(recordBase, recordOffset, length, partitionId); - } + void insertRecord(Object recordBase, long recordOffset, int length, int partitionId) + throws IOException; /** * Close the sorter, causing any buffered data to be sorted and written out to disk. @@ -393,34 +74,46 @@ public void insertRecord(Object recordBase, long recordOffset, int length, int p * @return metadata for the spill files written by this sorter. If no records were ever inserted * into this sorter, then this will return an empty array. */ - public SpillInfo[] closeAndGetSpills() throws IOException { - if (activeSpillSorter != null) { - // Do not count the final file towards the spill count. - final Tuple2 spilledFileInfo = - blockManager.diskBlockManager().createTempShuffleBlock(); - final File file = spilledFileInfo._2(); - final TempShuffleBlockId blockId = spilledFileInfo._1(); - final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId); + SpillInfo[] closeAndGetSpills() throws IOException; - // Waits for all async tasks to finish. - if (isAsync) { - for (Future task : asyncSpillTasks) { - try { - task.get(); - } catch (Exception e) { - throw new IOException(e); - } - } - - asyncSpillTasks.clear(); - } + /** + * Factory method to create the appropriate sorter implementation based on configuration. + * + * @return either a sync or async implementation based on the COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED + * configuration + */ + static CometShuffleExternalSorter create( + CometShuffleMemoryAllocatorTrait allocator, + BlockManager blockManager, + TaskContext taskContext, + int initialSize, + int numPartitions, + SparkConf conf, + ShuffleWriteMetricsReporter writeMetrics, + StructType schema) { - activeSpillSorter.setSpillInfo(spillInfo); - activeSpillSorter.writeSortedFileNative(true, tracingEnabled); + boolean isAsync = (boolean) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED().get(); - freeMemory(); + if (isAsync) { + return new CometShuffleExternalSorterAsync( + allocator, + blockManager, + taskContext, + initialSize, + numPartitions, + conf, + writeMetrics, + schema); + } else { + return new CometShuffleExternalSorterSync( + allocator, + blockManager, + taskContext, + initialSize, + numPartitions, + conf, + writeMetrics, + schema); } - - return spills.toArray(new SpillInfo[spills.size()]); } } diff --git a/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorterAsync.java b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorterAsync.java new file mode 100644 index 0000000000..33bcbc9cac --- /dev/null +++ b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorterAsync.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.shuffle.sort; + +import java.io.File; +import java.io.IOException; +import java.util.LinkedList; +import java.util.concurrent.*; + +import scala.Tuple2; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.SparkConf; +import org.apache.spark.TaskContext; +import org.apache.spark.memory.SparkOutOfMemoryError; +import org.apache.spark.shuffle.ShuffleWriteMetricsReporter; +import org.apache.spark.shuffle.comet.CometShuffleChecksumSupport; +import org.apache.spark.shuffle.comet.CometShuffleMemoryAllocatorTrait; +import org.apache.spark.shuffle.comet.TooLargePageException; +import org.apache.spark.sql.comet.execution.shuffle.ShuffleThreadPool; +import org.apache.spark.sql.comet.execution.shuffle.SpillInfo; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.storage.BlockManager; +import org.apache.spark.storage.TempShuffleBlockId; +import org.apache.spark.unsafe.UnsafeAlignedOffset; +import org.apache.spark.unsafe.array.LongArray; +import org.apache.spark.util.Utils; + +import org.apache.comet.CometConf$; + +/** + * Asynchronous implementation of the external sorter for sort-based shuffle. + * + *

Incoming records are appended to data pages. When all records have been inserted (or when the + * current thread's shuffle memory limit is reached), the in-memory records are sorted according to + * their partition ids using native sorter. The sorted records are then written to a single output + * file (or multiple files, if we've spilled). + * + *

Unlike {@link org.apache.spark.util.collection.ExternalSorter}, this sorter does not merge its + * spill files. Instead, this merging is performed in {@link + * org.apache.spark.sql.comet.execution.shuffle.CometUnsafeShuffleWriter}, which uses a specialized + * merge procedure that avoids extra serialization/deserialization. + * + *

This sorter provides async spilling write mode. When spilling, it will submit a task to thread + * pool to write shuffle spilling file. After submitting the task, it will continue to buffer, sort + * incoming records and submit another spilling task once spilling threshold reached again or memory + * is not enough to buffer incoming records. Each spilling task will write a shuffle spilling file + * separately. After all records have been sorted and spilled, all spill files will be merged by + * {@link org.apache.spark.sql.comet.execution.shuffle.CometUnsafeShuffleWriter}. + */ +public final class CometShuffleExternalSorterAsync + implements CometShuffleExternalSorter, CometShuffleChecksumSupport { + + private static final Logger logger = + LoggerFactory.getLogger(CometShuffleExternalSorterAsync.class); + + private final int numPartitions; + private final BlockManager blockManager; + private final TaskContext taskContext; + private final ShuffleWriteMetricsReporter writeMetrics; + + private final StructType schema; + + /** Force this sorter to spill when there are this many elements in memory. */ + private final int numElementsForSpillThreshold; + + // When this external sorter allocates memory of `sorterArray`, we need to keep its + // assigned initial size. After spilling, we will reset the array to its initial size. + // See `sorterArray` comment for more details. + private int initialSize; + + /** All sorters with memory pages used by the sorters. */ + private final ConcurrentLinkedQueue spillingSorters = new ConcurrentLinkedQueue<>(); + + private SpillSorter activeSpillSorter; + + private final LinkedList spills = new LinkedList<>(); + + /** Peak memory used by this sorter so far, in bytes. */ + private long peakMemoryUsedBytes; + + // Checksum calculator for each partition. Empty when shuffle checksum disabled. + private final long[] partitionChecksums; + + private final String checksumAlgorithm; + private final String compressionCodec; + private final int compressionLevel; + + // The memory allocator for this sorter. It is used to allocate/free memory pages for this sorter. + // Because we need to allocate off-heap memory regardless of configured Spark memory mode + // (on-heap/off-heap), we need a separate memory allocator. + private final CometShuffleMemoryAllocatorTrait allocator; + + /** Thread pool shared for async spilling write */ + private final ExecutorService threadPool; + + private final int threadNum; + + private ConcurrentLinkedQueue> asyncSpillTasks = new ConcurrentLinkedQueue<>(); + + private boolean spilling = false; + + private final int uaoSize = UnsafeAlignedOffset.getUaoSize(); + private final double preferDictionaryRatio; + private final boolean tracingEnabled; + + public CometShuffleExternalSorterAsync( + CometShuffleMemoryAllocatorTrait allocator, + BlockManager blockManager, + TaskContext taskContext, + int initialSize, + int numPartitions, + SparkConf conf, + ShuffleWriteMetricsReporter writeMetrics, + StructType schema) { + this.allocator = allocator; + this.blockManager = blockManager; + this.taskContext = taskContext; + this.numPartitions = numPartitions; + this.schema = schema; + this.numElementsForSpillThreshold = + (int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD().get(); + this.writeMetrics = writeMetrics; + + this.peakMemoryUsedBytes = getMemoryUsage(); + this.partitionChecksums = createPartitionChecksums(numPartitions, conf); + this.checksumAlgorithm = getChecksumAlgorithm(conf); + this.compressionCodec = CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_CODEC().get(); + this.compressionLevel = + (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL().get(); + + this.initialSize = initialSize; + this.tracingEnabled = (boolean) CometConf$.MODULE$.COMET_TRACING_ENABLED().get(); + + this.threadNum = (int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM().get(); + assert (this.threadNum > 0); + this.threadPool = ShuffleThreadPool.getThreadPool(); + + this.preferDictionaryRatio = + (double) CometConf$.MODULE$.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO().get(); + + this.activeSpillSorter = createSpillSorter(); + } + + /** Creates a new SpillSorter with all required dependencies. */ + private SpillSorter createSpillSorter() { + return new SpillSorter( + allocator, + initialSize, + schema, + uaoSize, + preferDictionaryRatio, + compressionCodec, + compressionLevel, + checksumAlgorithm, + partitionChecksums, + writeMetrics, + taskContext, + spills, + this::spill); + } + + @Override + public long[] getChecksums() { + return partitionChecksums; + } + + /** Sort and spill the current records in response to memory pressure. */ + @Override + public void spill() throws IOException { + if (spilling || activeSpillSorter == null || activeSpillSorter.numRecords() == 0) { + return; + } + + // In async mode, if new in-memory sorter cannot allocate required array, it triggers spill + // here. This method will initiate new sorter following normal spill logic and cause stack + // overflow eventually. So we need to avoid triggering spilling again while spilling. But + // we cannot make this as "synchronized" because it will block the caller thread. + spilling = true; + + logger.info( + "Thread {} spilling sort data of {} to disk ({} {} so far)", + Thread.currentThread().getId(), + Utils.bytesToString(getMemoryUsage()), + spills.size(), + spills.size() > 1 ? " times" : " time"); + + final Tuple2 spilledFileInfo = + blockManager.diskBlockManager().createTempShuffleBlock(); + final File file = spilledFileInfo._2(); + final TempShuffleBlockId blockId = spilledFileInfo._1(); + final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId); + + activeSpillSorter.setSpillInfo(spillInfo); + + SpillSorter spillingSorter = activeSpillSorter; + Callable task = + () -> { + spillingSorter.writeSortedFileNative(false, tracingEnabled); + final long spillSize = spillingSorter.freeMemory(); + spillingSorter.freeArray(); + spillingSorters.remove(spillingSorter); + + // Reset the in-memory sorter's pointer array only after freeing up the memory pages + // holding the records. Otherwise, if the task is over allocated memory, then without + // freeing the memory pages, we might not be able to get memory for the pointer array. + synchronized (CometShuffleExternalSorterAsync.this) { + taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); + } + + return null; + }; + + spillingSorters.add(spillingSorter); + asyncSpillTasks.add(threadPool.submit(task)); + + while (asyncSpillTasks.size() == threadNum) { + for (Future spillingTask : asyncSpillTasks) { + if (spillingTask.isDone()) { + asyncSpillTasks.remove(spillingTask); + break; + } + } + } + + activeSpillSorter = createSpillSorter(); + + spilling = false; + } + + private long getMemoryUsage() { + long totalPageSize = 0; + for (SpillSorter sorter : spillingSorters) { + totalPageSize += sorter.getMemoryUsage(); + } + if (activeSpillSorter != null) { + totalPageSize += activeSpillSorter.getMemoryUsage(); + } + return totalPageSize; + } + + private void updatePeakMemoryUsed() { + long mem = getMemoryUsage(); + if (mem > peakMemoryUsedBytes) { + peakMemoryUsedBytes = mem; + } + } + + /** Return the peak memory used so far, in bytes. */ + @Override + public long getPeakMemoryUsedBytes() { + updatePeakMemoryUsed(); + return peakMemoryUsedBytes; + } + + private long freeMemory() { + updatePeakMemoryUsed(); + long memoryFreed = 0; + for (SpillSorter sorter : spillingSorters) { + memoryFreed += sorter.freeMemory(); + sorter.freeArray(); + } + memoryFreed += activeSpillSorter.freeMemory(); + activeSpillSorter.freeArray(); + + return memoryFreed; + } + + /** Force all memory and spill files to be deleted; called by shuffle error-handling code. */ + @Override + public void cleanupResources() { + freeMemory(); + + for (SpillInfo spill : spills) { + if (spill.file.exists() && !spill.file.delete()) { + logger.error("Unable to delete spill file {}", spill.file.getPath()); + } + } + } + + /** + * Checks whether there is enough space to insert an additional record in to the sort pointer + * array and grows the array if additional space is required. If the required space cannot be + * obtained, then the in-memory data will be spilled to disk. + */ + private void growPointerArrayIfNecessary() throws IOException { + assert (activeSpillSorter != null); + if (!activeSpillSorter.hasSpaceForAnotherRecord()) { + long used = activeSpillSorter.getMemoryUsage(); + LongArray array; + try { + // could trigger spilling + array = allocator.allocateArray(used / 8 * 2); + } catch (TooLargePageException e) { + // The pointer array is too big to fix in a single page, spill. + spill(); + return; + } catch (SparkOutOfMemoryError e) { + // Cannot allocate enough memory, spill and reset pointer array. + try { + spill(); + } catch (SparkOutOfMemoryError e2) { + // Cannot allocate memory even after spilling, throw the error. + if (!activeSpillSorter.hasSpaceForAnotherRecord()) { + logger.error("Unable to grow the pointer array"); + throw e2; + } + } + return; + } + // check if spilling is triggered or not + if (activeSpillSorter.hasSpaceForAnotherRecord()) { + allocator.freeArray(array); + } else { + activeSpillSorter.expandPointerArray(array); + } + } + } + + /** + * Writes a record to the shuffle sorter. This copies the record data into this external sorter's + * managed memory, which may trigger spilling if the copy would exceed the memory limit. It + * inserts a pointer for the record and record's partition id into the in-memory sorter. + */ + @Override + public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId) + throws IOException { + + assert (activeSpillSorter != null); + int threshold = numElementsForSpillThreshold; + if (activeSpillSorter.numRecords() >= threshold) { + logger.info( + "Spilling data because number of spilledRecords crossed the threshold " + threshold); + spill(); + } + + growPointerArrayIfNecessary(); + + // Need 4 or 8 bytes to store the record length. + final int required = length + uaoSize; + // Acquire enough memory to store the record. + // If we cannot acquire enough memory, we will spill current writers. + if (!activeSpillSorter.acquireNewPageIfNecessary(required)) { + // Spilling is happened, initiate new memory page for new writer. + activeSpillSorter.initialCurrentPage(required); + } + + activeSpillSorter.insertRecord(recordBase, recordOffset, length, partitionId); + } + + /** + * Close the sorter, causing any buffered data to be sorted and written out to disk. + * + * @return metadata for the spill files written by this sorter. If no records were ever inserted + * into this sorter, then this will return an empty array. + */ + @Override + public SpillInfo[] closeAndGetSpills() throws IOException { + if (activeSpillSorter != null) { + // Do not count the final file towards the spill count. + final Tuple2 spilledFileInfo = + blockManager.diskBlockManager().createTempShuffleBlock(); + final File file = spilledFileInfo._2(); + final TempShuffleBlockId blockId = spilledFileInfo._1(); + final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId); + + // Waits for all async tasks to finish. + for (Future task : asyncSpillTasks) { + try { + task.get(); + } catch (Exception e) { + throw new IOException(e); + } + } + + asyncSpillTasks.clear(); + + activeSpillSorter.setSpillInfo(spillInfo); + activeSpillSorter.writeSortedFileNative(true, tracingEnabled); + + freeMemory(); + } + + return spills.toArray(new SpillInfo[spills.size()]); + } +} diff --git a/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorterSync.java b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorterSync.java new file mode 100644 index 0000000000..c3e402e88e --- /dev/null +++ b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorterSync.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.shuffle.sort; + +import java.io.File; +import java.io.IOException; +import java.util.LinkedList; + +import scala.Tuple2; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.SparkConf; +import org.apache.spark.TaskContext; +import org.apache.spark.memory.SparkOutOfMemoryError; +import org.apache.spark.shuffle.ShuffleWriteMetricsReporter; +import org.apache.spark.shuffle.comet.CometShuffleChecksumSupport; +import org.apache.spark.shuffle.comet.CometShuffleMemoryAllocatorTrait; +import org.apache.spark.shuffle.comet.TooLargePageException; +import org.apache.spark.sql.comet.execution.shuffle.SpillInfo; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.storage.BlockManager; +import org.apache.spark.storage.TempShuffleBlockId; +import org.apache.spark.unsafe.UnsafeAlignedOffset; +import org.apache.spark.unsafe.array.LongArray; +import org.apache.spark.util.Utils; + +import org.apache.comet.CometConf$; + +/** + * Synchronous implementation of the external sorter for sort-based shuffle. + * + *

Incoming records are appended to data pages. When all records have been inserted (or when the + * current thread's shuffle memory limit is reached), the in-memory records are sorted according to + * their partition ids using native sorter. The sorted records are then written to a single output + * file (or multiple files, if we've spilled). + * + *

Unlike {@link org.apache.spark.util.collection.ExternalSorter}, this sorter does not merge its + * spill files. Instead, this merging is performed in {@link + * org.apache.spark.sql.comet.execution.shuffle.CometUnsafeShuffleWriter}, which uses a specialized + * merge procedure that avoids extra serialization/deserialization. + */ +public final class CometShuffleExternalSorterSync + implements CometShuffleExternalSorter, CometShuffleChecksumSupport { + + private static final Logger logger = + LoggerFactory.getLogger(CometShuffleExternalSorterSync.class); + + private final int numPartitions; + private final BlockManager blockManager; + private final TaskContext taskContext; + private final ShuffleWriteMetricsReporter writeMetrics; + + private final StructType schema; + + /** Force this sorter to spill when there are this many elements in memory. */ + private final int numElementsForSpillThreshold; + + // When this external sorter allocates memory of `sorterArray`, we need to keep its + // assigned initial size. After spilling, we will reset the array to its initial size. + // See `sorterArray` comment for more details. + private int initialSize; + + private SpillSorter activeSpillSorter; + + private final LinkedList spills = new LinkedList<>(); + + /** Peak memory used by this sorter so far, in bytes. */ + private long peakMemoryUsedBytes; + + // Checksum calculator for each partition. Empty when shuffle checksum disabled. + private final long[] partitionChecksums; + + private final String checksumAlgorithm; + private final String compressionCodec; + private final int compressionLevel; + + // The memory allocator for this sorter. It is used to allocate/free memory pages for this sorter. + // Because we need to allocate off-heap memory regardless of configured Spark memory mode + // (on-heap/off-heap), we need a separate memory allocator. + private final CometShuffleMemoryAllocatorTrait allocator; + + private boolean spilling = false; + + private final int uaoSize = UnsafeAlignedOffset.getUaoSize(); + private final double preferDictionaryRatio; + private final boolean tracingEnabled; + + public CometShuffleExternalSorterSync( + CometShuffleMemoryAllocatorTrait allocator, + BlockManager blockManager, + TaskContext taskContext, + int initialSize, + int numPartitions, + SparkConf conf, + ShuffleWriteMetricsReporter writeMetrics, + StructType schema) { + this.allocator = allocator; + this.blockManager = blockManager; + this.taskContext = taskContext; + this.numPartitions = numPartitions; + this.schema = schema; + this.numElementsForSpillThreshold = + (int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD().get(); + this.writeMetrics = writeMetrics; + + this.peakMemoryUsedBytes = getMemoryUsage(); + this.partitionChecksums = createPartitionChecksums(numPartitions, conf); + this.checksumAlgorithm = getChecksumAlgorithm(conf); + this.compressionCodec = CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_CODEC().get(); + this.compressionLevel = + (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL().get(); + + this.initialSize = initialSize; + this.tracingEnabled = (boolean) CometConf$.MODULE$.COMET_TRACING_ENABLED().get(); + + this.preferDictionaryRatio = + (double) CometConf$.MODULE$.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO().get(); + + this.activeSpillSorter = createSpillSorter(); + } + + /** Creates a new SpillSorter with all required dependencies. */ + private SpillSorter createSpillSorter() { + return new SpillSorter( + allocator, + initialSize, + schema, + uaoSize, + preferDictionaryRatio, + compressionCodec, + compressionLevel, + checksumAlgorithm, + partitionChecksums, + writeMetrics, + taskContext, + spills, + this::spill); + } + + @Override + public long[] getChecksums() { + return partitionChecksums; + } + + /** Sort and spill the current records in response to memory pressure. */ + @Override + public void spill() throws IOException { + if (spilling || activeSpillSorter == null || activeSpillSorter.numRecords() == 0) { + return; + } + + spilling = true; + + logger.info( + "Thread {} spilling sort data of {} to disk ({} {} so far)", + Thread.currentThread().getId(), + Utils.bytesToString(getMemoryUsage()), + spills.size(), + spills.size() > 1 ? " times" : " time"); + + final Tuple2 spilledFileInfo = + blockManager.diskBlockManager().createTempShuffleBlock(); + final File file = spilledFileInfo._2(); + final TempShuffleBlockId blockId = spilledFileInfo._1(); + final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId); + + activeSpillSorter.setSpillInfo(spillInfo); + + activeSpillSorter.writeSortedFileNative(false, tracingEnabled); + final long spillSize = activeSpillSorter.freeMemory(); + activeSpillSorter.reset(); + + // Reset the in-memory sorter's pointer array only after freeing up the memory pages holding + // the records. Otherwise, if the task is over allocated memory, then without freeing the + // memory pages, we might not be able to get memory for the pointer array. + taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); + + spilling = false; + } + + private long getMemoryUsage() { + if (activeSpillSorter != null) { + return activeSpillSorter.getMemoryUsage(); + } + return 0; + } + + private void updatePeakMemoryUsed() { + long mem = getMemoryUsage(); + if (mem > peakMemoryUsedBytes) { + peakMemoryUsedBytes = mem; + } + } + + /** Return the peak memory used so far, in bytes. */ + @Override + public long getPeakMemoryUsedBytes() { + updatePeakMemoryUsed(); + return peakMemoryUsedBytes; + } + + private long freeMemory() { + updatePeakMemoryUsed(); + long memoryFreed = activeSpillSorter.freeMemory(); + activeSpillSorter.freeArray(); + return memoryFreed; + } + + /** Force all memory and spill files to be deleted; called by shuffle error-handling code. */ + @Override + public void cleanupResources() { + freeMemory(); + + for (SpillInfo spill : spills) { + if (spill.file.exists() && !spill.file.delete()) { + logger.error("Unable to delete spill file {}", spill.file.getPath()); + } + } + } + + /** + * Checks whether there is enough space to insert an additional record in to the sort pointer + * array and grows the array if additional space is required. If the required space cannot be + * obtained, then the in-memory data will be spilled to disk. + */ + private void growPointerArrayIfNecessary() throws IOException { + assert (activeSpillSorter != null); + if (!activeSpillSorter.hasSpaceForAnotherRecord()) { + long used = activeSpillSorter.getMemoryUsage(); + LongArray array; + try { + // could trigger spilling + array = allocator.allocateArray(used / 8 * 2); + } catch (TooLargePageException e) { + // The pointer array is too big to fix in a single page, spill. + spill(); + return; + } catch (SparkOutOfMemoryError e) { + // Cannot allocate enough memory, spill and reset pointer array. + try { + spill(); + } catch (SparkOutOfMemoryError e2) { + // Cannot allocate memory even after spilling, throw the error. + if (!activeSpillSorter.hasSpaceForAnotherRecord()) { + logger.error("Unable to grow the pointer array"); + throw e2; + } + } + return; + } + // check if spilling is triggered or not + if (activeSpillSorter.hasSpaceForAnotherRecord()) { + allocator.freeArray(array); + } else { + activeSpillSorter.expandPointerArray(array); + } + } + } + + /** + * Writes a record to the shuffle sorter. This copies the record data into this external sorter's + * managed memory, which may trigger spilling if the copy would exceed the memory limit. It + * inserts a pointer for the record and record's partition id into the in-memory sorter. + */ + @Override + public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId) + throws IOException { + + assert (activeSpillSorter != null); + int threshold = numElementsForSpillThreshold; + if (activeSpillSorter.numRecords() >= threshold) { + logger.info( + "Spilling data because number of spilledRecords crossed the threshold " + threshold); + spill(); + } + + growPointerArrayIfNecessary(); + + // Need 4 or 8 bytes to store the record length. + final int required = length + uaoSize; + // Acquire enough memory to store the record. + // If we cannot acquire enough memory, we will spill current writers. + if (!activeSpillSorter.acquireNewPageIfNecessary(required)) { + // Spilling is happened, initiate new memory page for new writer. + activeSpillSorter.initialCurrentPage(required); + } + + activeSpillSorter.insertRecord(recordBase, recordOffset, length, partitionId); + } + + /** + * Close the sorter, causing any buffered data to be sorted and written out to disk. + * + * @return metadata for the spill files written by this sorter. If no records were ever inserted + * into this sorter, then this will return an empty array. + */ + @Override + public SpillInfo[] closeAndGetSpills() throws IOException { + if (activeSpillSorter != null) { + // Do not count the final file towards the spill count. + final Tuple2 spilledFileInfo = + blockManager.diskBlockManager().createTempShuffleBlock(); + final File file = spilledFileInfo._2(); + final TempShuffleBlockId blockId = spilledFileInfo._1(); + final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId); + + activeSpillSorter.setSpillInfo(spillInfo); + activeSpillSorter.writeSortedFileNative(true, tracingEnabled); + + freeMemory(); + } + + return spills.toArray(new SpillInfo[spills.size()]); + } +} diff --git a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometUnsafeShuffleWriter.java b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometUnsafeShuffleWriter.java index a845e743d4..736c42aafa 100644 --- a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometUnsafeShuffleWriter.java +++ b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometUnsafeShuffleWriter.java @@ -258,7 +258,7 @@ private void open() { Math.min( CometShuffleExternalSorter.MAXIMUM_PAGE_SIZE_BYTES, memoryManager.pageSizeBytes())); sorter = - new CometShuffleExternalSorter( + CometShuffleExternalSorter.create( allocator, blockManager, taskContext, From c6f65cd92a70d65997a398ff46a52752f3621574 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 15 Jan 2026 12:00:36 -0700 Subject: [PATCH 2/3] fix --- .../sort/CometShuffleExternalSorterAsync.java | 88 ++++++++++++++++--- 1 file changed, 74 insertions(+), 14 deletions(-) diff --git a/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorterAsync.java b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorterAsync.java index 33bcbc9cac..2b55570869 100644 --- a/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorterAsync.java +++ b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorterAsync.java @@ -152,8 +152,16 @@ public CometShuffleExternalSorterAsync( this.tracingEnabled = (boolean) CometConf$.MODULE$.COMET_TRACING_ENABLED().get(); this.threadNum = (int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM().get(); - assert (this.threadNum > 0); + if (this.threadNum <= 0) { + throw new IllegalArgumentException( + "spark.comet.columnar.shuffle.async.thread.num must be positive, got: " + this.threadNum); + } this.threadPool = ShuffleThreadPool.getThreadPool(); + if (this.threadPool == null) { + throw new IllegalStateException( + "Async shuffle thread pool is not initialized. " + + "Ensure spark.comet.columnar.shuffle.async.enabled is true."); + } this.preferDictionaryRatio = (double) CometConf$.MODULE$.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO().get(); @@ -215,10 +223,21 @@ public void spill() throws IOException { SpillSorter spillingSorter = activeSpillSorter; Callable task = () -> { - spillingSorter.writeSortedFileNative(false, tracingEnabled); - final long spillSize = spillingSorter.freeMemory(); - spillingSorter.freeArray(); - spillingSorters.remove(spillingSorter); + long spillSize = 0; + try { + spillingSorter.writeSortedFileNative(false, tracingEnabled); + spillSize = spillingSorter.freeMemory(); + } finally { + // Ensure cleanup happens even if writeSortedFileNative() throws. + // freeMemory() may have already been called above, but it's safe to call again + // (returns 0 if already freed). freeArray() must be called to release the pointer + // array. + if (spillSize == 0) { + spillSize = spillingSorter.freeMemory(); + } + spillingSorter.freeArray(); + spillingSorters.remove(spillingSorter); + } // Reset the in-memory sorter's pointer array only after freeing up the memory pages // holding the records. Otherwise, if the task is over allocated memory, then without @@ -233,11 +252,20 @@ public void spill() throws IOException { spillingSorters.add(spillingSorter); asyncSpillTasks.add(threadPool.submit(task)); - while (asyncSpillTasks.size() == threadNum) { - for (Future spillingTask : asyncSpillTasks) { - if (spillingTask.isDone()) { - asyncSpillTasks.remove(spillingTask); - break; + // If we've reached the max concurrent spill tasks, block until one completes. + // This provides backpressure to avoid unbounded memory growth. + while (asyncSpillTasks.size() >= threadNum) { + Future oldestTask = asyncSpillTasks.peek(); + if (oldestTask != null) { + try { + oldestTask.get(); // Block until the oldest task completes + asyncSpillTasks.poll(); // Remove the completed task + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting for spill task", e); + } catch (ExecutionException e) { + asyncSpillTasks.poll(); // Remove the failed task + throw new IOException("Async spill task failed", e.getCause()); } } } @@ -288,6 +316,23 @@ private long freeMemory() { /** Force all memory and spill files to be deleted; called by shuffle error-handling code. */ @Override public void cleanupResources() { + // Cancel any pending async spill tasks to stop background work. + // The tasks have try-finally blocks that will clean up their SpillSorter resources. + for (Future task : asyncSpillTasks) { + task.cancel(true); + } + + // Wait briefly for cancelled tasks to complete their cleanup. + // This ensures SpillSorters are removed from spillingSorters before we iterate it. + for (Future task : asyncSpillTasks) { + try { + task.get(100, TimeUnit.MILLISECONDS); + } catch (Exception e) { + // Ignore - task was cancelled or failed, we're cleaning up anyway + } + } + asyncSpillTasks.clear(); + freeMemory(); for (SpillInfo spill : spills) { @@ -383,23 +428,38 @@ public SpillInfo[] closeAndGetSpills() throws IOException { final TempShuffleBlockId blockId = spilledFileInfo._1(); final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId); - // Waits for all async tasks to finish. + // Waits for all async tasks to finish, collecting any exceptions. + // We wait for all tasks even if some fail to ensure proper cleanup. + IOException firstException = null; for (Future task : asyncSpillTasks) { try { task.get(); - } catch (Exception e) { - throw new IOException(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if (firstException == null) { + firstException = new IOException("Interrupted while waiting for spill tasks", e); + } + } catch (ExecutionException e) { + if (firstException == null) { + firstException = new IOException("Async spill task failed", e.getCause()); + } else { + firstException.addSuppressed(e.getCause()); + } } } asyncSpillTasks.clear(); + if (firstException != null) { + throw firstException; + } + activeSpillSorter.setSpillInfo(spillInfo); activeSpillSorter.writeSortedFileNative(true, tracingEnabled); freeMemory(); } - return spills.toArray(new SpillInfo[spills.size()]); + return spills.toArray(new SpillInfo[0]); } } From 9488c4bf7bc0346c79f005fe6bead4df8aa0c32c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 15 Jan 2026 12:28:36 -0700 Subject: [PATCH 3/3] fix --- .../spark/shuffle/sort/CometShuffleExternalSorterAsync.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorterAsync.java b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorterAsync.java index 2b55570869..f5e3d9b686 100644 --- a/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorterAsync.java +++ b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorterAsync.java @@ -96,7 +96,7 @@ public final class CometShuffleExternalSorterAsync private final LinkedList spills = new LinkedList<>(); /** Peak memory used by this sorter so far, in bytes. */ - private long peakMemoryUsedBytes; + private volatile long peakMemoryUsedBytes; // Checksum calculator for each partition. Empty when shuffle checksum disabled. private final long[] partitionChecksums;