diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index ac8170c9d97a0..78f2b2b88308b 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -30,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.spark.SparkEnv; import org.apache.spark.TaskContext; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.memory.MemoryConsumer; @@ -43,6 +44,7 @@ import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.util.Utils; + /** * External sorter based on {@link UnsafeInMemorySorter}. */ @@ -74,6 +76,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { * Force this sorter to spill when there are this many elements in memory. */ private final int numElementsForSpillThreshold; + private final long maxMemoryBeforeSpill; /** * Memory pages that hold the records being sorted. The pages in this list are freed when @@ -174,6 +177,7 @@ private UnsafeExternalSorter( } this.peakMemoryUsedBytes = getMemoryUsage(); this.numElementsForSpillThreshold = numElementsForSpillThreshold; + this.maxMemoryBeforeSpill = SparkEnv.get().conf().getSizeAsMb("spark.lyft.spill.mb", "0m") * 1024 * 1024; // Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at // the end of the task. This is necessary to avoid memory leaks in when the downstream operator @@ -393,6 +397,8 @@ private void growPointerArrayIfNecessary() throws IOException { // The pointer array is too big to fix in a single page, spill. spill(); } catch (SparkOutOfMemoryError e) { + logger.error("SparkOutOfMemoryException: memory used {}, records {}, threshold {}", + used, inMemSorter.numRecords(), numElementsForSpillThreshold); if (inMemSorter.numRecords() > 0) { logger.error("Unable to grow the pointer array"); throw e; @@ -479,6 +485,13 @@ public void insertRecord( spill(); } + if (maxMemoryBeforeSpill > 0 && inMemSorter.getMemoryUsage() >= maxMemoryBeforeSpill) + { + logger.info("Spilling data because the memory usage crossed the threshold " + + "{} used {} ", maxMemoryBeforeSpill, inMemSorter.getMemoryUsage()); + spill(); + } + final int uaoSize = UnsafeAlignedOffset.getUaoSize(); // Need 4 or 8 bytes to store the record length. final int required = length + uaoSize;