Skip to content

Commit

Permalink
allow more retries when requesting more memory in sortbasedpusher
Browse files Browse the repository at this point in the history
  • Loading branch information
CodingCat committed Apr 16, 2024
1 parent 3121828 commit 1e8d8c2
Showing 1 changed file with 44 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public boolean insertRecord(
return true;
}

private void growPointerArrayIfNecessary() throws IOException {
private void growPointerArrayIfNecessary(long required) throws IOException {
assert (inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
if (inMemSorter.numRecords() <= 0) {
Expand All @@ -339,24 +339,49 @@ private void growPointerArrayIfNecessary() throws IOException {
}

long used = inMemSorter.getMemoryUsage();
long requestedBytes = used / 8 * 2;
int allocateMemoryRetryCount = 0;
int maxMemoryAllocationRetry = 3;
LongArray array = null;
try {
// could trigger spilling
array = allocateArray(used / 8 * 2);
} catch (TooLargePageException e) {
// The pointer array is too big to fix in a single page, spill.
logger.info(
"Pushdata in growPointerArrayIfNecessary, memory used {}",
Utils.bytesToString(getUsed()));
pushData(true);
} catch (SparkOutOfMemoryError rethrow) {
// should have trigger spilling
if (inMemSorter.numRecords() > 0) {
logger.error("OOM, unable to grow the pointer array");
throw rethrow;
boolean cont = true;
while (allocateMemoryRetryCount < maxMemoryAllocationRetry && cont) {
try {
// could trigger spilling
logger.info("asking for " + requestedBytes + " more bytes to accommodate more records");
array = allocateArray(used / 8 * 2);
cont = false;
} catch (TooLargePageException e) {
// The pointer array is too big to fix in a single page, spill.
logger.info(
"Pushdata in growPointerArrayIfNecessary, memory used {}",
Utils.bytesToString(getUsed()));
pushData(true);
cont = false;
} catch (SparkOutOfMemoryError rethrow) {
// should have trigger spilling
allocateMemoryRetryCount += 1;
if (inMemSorter.numRecords() > 0) {
if (allocateMemoryRetryCount == maxMemoryAllocationRetry) {
logger.error("OOM, unable to grow the pointer array");
throw rethrow;
} else {
long oldReq = requestedBytes;
requestedBytes = Math.max((long) (requestedBytes * 0.5), required);
logger.warn(
"cannot allocate "
+ oldReq
+ " bytes, cut the request to "
+ requestedBytes
+ " bytes and retry",
rethrow);
pushData(true);
}
} else {
// The new array could not be allocated, but that is not an issue as it is longer needed,
// as all records were spilled.
cont = false;
}
}
// The new array could not be allocated, but that is not an issue as it is longer needed,
// as all records were spilled.
}

if (inMemSorter.numRecords() <= 0) {
Expand Down Expand Up @@ -401,7 +426,7 @@ private void acquireNewPageIfNecessary(int required) {
private void allocateMemoryForRecordIfNecessary(int required) throws IOException {
// Step 1:
// Ensure that the pointer array has space for another record. This may cause a spill.
growPointerArrayIfNecessary();
growPointerArrayIfNecessary(required);
// Step 2:
// Ensure that the last page has space for another record. This may cause a spill.
acquireNewPageIfNecessary(required);
Expand All @@ -419,7 +444,7 @@ private void allocateMemoryForRecordIfNecessary(int required) throws IOException
// no-op that does not allocate any memory, and therefore can't cause a spill event.
//
// Thus there is no need to call `acquireNewPageIfNecessary` again after this step.
growPointerArrayIfNecessary();
growPointerArrayIfNecessary(required);
}

@Override
Expand Down

0 comments on commit 1e8d8c2

Please sign in to comment.