Skip to content

Commit

Permalink
[#2279] improvement(spark): Trigger the upstream rewrite when the rea…
Browse files Browse the repository at this point in the history
…d stage fails (#2281)

### What changes were proposed in this pull request?

If the current Reader fails to obtain Shuffle data, it does not trigger the upstream Stage to rewrite the data. If a Shuffle Server fails, it does not trigger Stage retry.

### Why are the changes needed?

Fix: #2279

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

UT.
  • Loading branch information
yl09099 authored Dec 20, 2024
1 parent 0481f21 commit e7e191b
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,13 @@ public void reportShuffleWriteFailure(
shuffleManager.getShuffleWriteClient().unregisterShuffle(appId, shuffleId);
shuffleServerWriterFailureRecord.setClearedMapTrackerBlock(true);
LOG.info(
"Clear shuffle result in shuffleId:{}, stageId:{}, stageAttemptNumber:{}.",
"Clear shuffle result in shuffleId:{}, stageId:{}, stageAttemptNumber:{} in the write failure phase.",
shuffleId,
stageAttemptId,
stageAttemptNumber);
} catch (SparkException e) {
LOG.error(
"Clear MapoutTracker Meta failed in shuffleId:{}, stageAttemptId:{}, stageAttemptNumber:{}.",
"Clear MapoutTracker Meta failed in shuffleId:{}, stageAttemptId:{}, stageAttemptNumber:{} in the write failure phase.",
shuffleId,
stageAttemptId,
stageAttemptNumber);
Expand Down Expand Up @@ -158,6 +158,7 @@ public void reportShuffleFetchFailure(
RssProtos.ReportShuffleFetchFailureRequest request,
StreamObserver<RssProtos.ReportShuffleFetchFailureResponse> responseObserver) {
String appId = request.getAppId();
int shuffleId = request.getShuffleId();
int stageAttempt = request.getStageAttemptId();
int partitionId = request.getPartitionId();
RssProtos.StatusCode code;
Expand Down Expand Up @@ -189,18 +190,37 @@ public void reportShuffleFetchFailure(
code = RssProtos.StatusCode.INVALID_REQUEST;
reSubmitWholeStage = false;
} else { // update the stage partition fetch failure count
code = RssProtos.StatusCode.SUCCESS;
status.incPartitionFetchFailure(stageAttempt, partitionId);
int fetchFailureNum = status.getPartitionFetchFailureNum(stageAttempt, partitionId);
if (fetchFailureNum >= shuffleManager.getMaxFetchFailures()) {
reSubmitWholeStage = true;
msg =
String.format(
"report shuffle fetch failure as maximum number(%d) of shuffle fetch is occurred",
shuffleManager.getMaxFetchFailures());
} else {
reSubmitWholeStage = false;
msg = "don't report shuffle fetch failure";
synchronized (status) {
code = RssProtos.StatusCode.SUCCESS;
status.incPartitionFetchFailure(stageAttempt, partitionId);
if (status.currentPartitionIsFetchFailed(stageAttempt, partitionId, shuffleManager)) {
reSubmitWholeStage = true;
if (!status.hasClearedMapTrackerBlock()) {
try {
// Clear the metadata of the completed task, after the upstream ShuffleId is
// cleared, the write Stage can be triggered again.
shuffleManager.unregisterAllMapOutput(shuffleId);
status.clearedMapTrackerBlock();
LOG.info(
"Clear shuffle result in shuffleId:{}, stageId:{} in the write failure phase.",
shuffleId,
stageAttempt);
} catch (SparkException e) {
LOG.error(
"Clear MapoutTracker Meta failed in shuffleId:{}, stageAttemptId:{} in the write failure phase.",
shuffleId,
stageAttempt);
throw new RssException("Clear MapoutTracker Meta failed!", e);
}
}
msg =
String.format(
"report shuffle fetch failure as maximum number(%d) of shuffle fetch is occurred",
shuffleManager.getMaxFetchFailures());
} else {
reSubmitWholeStage = false;
msg = "don't report shuffle fetch failure";
}
}
}
}
Expand Down Expand Up @@ -489,10 +509,13 @@ private static class RssShuffleStatus {
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
private final int[] partitions;
private int stageAttempt;
// Whether the Shuffle result has been cleared for the current number of attempts.
private boolean hasClearedMapTrackerBlock;

private RssShuffleStatus(int partitionNum, int stageAttempt) {
this.stageAttempt = stageAttempt;
this.partitions = new int[partitionNum];
this.hasClearedMapTrackerBlock = false;
}

private <T> T withReadLock(Supplier<T> fn) {
Expand Down Expand Up @@ -553,16 +576,36 @@ public void incPartitionFetchFailure(int stageAttempt, int partition) {
});
}

public int getPartitionFetchFailureNum(int stageAttempt, int partition) {
public boolean currentPartitionIsFetchFailed(
int stageAttempt, int partition, RssShuffleManagerInterface shuffleManager) {
return withReadLock(
() -> {
if (this.stageAttempt != stageAttempt) {
return 0;
return false;
} else {
return this.partitions[partition];
if (this.partitions[partition] >= shuffleManager.getMaxFetchFailures()) {
return true;
} else {
return false;
}
}
});
}

public void clearedMapTrackerBlock() {
withWriteLock(
() -> {
this.hasClearedMapTrackerBlock = true;
return null;
});
}

public boolean hasClearedMapTrackerBlock() {
return withReadLock(
() -> {
return hasClearedMapTrackerBlock;
});
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void updateSparkConfCustomer(SparkConf sparkConf) {
super.updateSparkConfCustomer(sparkConf);
sparkConf.set(
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
+ RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED,
+ RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED,
"true");
}

Expand Down

0 comments on commit e7e191b

Please sign in to comment.