Skip to content

Commit

Permalink
Cleaned up RFS Worker error handling a bit
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Helma <chelma+github@amazon.com>
  • Loading branch information
chelma committed May 22, 2024
1 parent 7a0fb02 commit 127613d
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 65 deletions.
10 changes: 8 additions & 2 deletions RFS/src/main/java/com/rfs/cms/CmsEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,23 @@
import com.rfs.common.RfsException;

public class CmsEntry {
public abstract static class Base {
protected Base() {}
}

public static enum SnapshotStatus {
NOT_STARTED,
IN_PROGRESS,
COMPLETED,
FAILED,
}

public static class Snapshot {
public static class Snapshot extends Base {
public final String name;
public final SnapshotStatus status;

public Snapshot(String name, SnapshotStatus status) {
super();
this.name = name;
this.status = status;
}
Expand All @@ -26,7 +31,7 @@ public static enum MetadataStatus {
FAILED,
}

public static class Metadata {
public static class Metadata extends Base {
public static final int METADATA_LEASE_MS = 1 * 60 * 1000; // 1 minute, arbitrarily chosen
public static final int MAX_ATTEMPTS = 3; // arbitrarily chosen

Expand All @@ -50,6 +55,7 @@ public static String getLeaseExpiry(long currentTime, int numAttempts) {
public final Integer numAttempts;

public Metadata(MetadataStatus status, String leaseExpiry, int numAttempts) {
super();
this.status = status;
this.leaseExpiry = leaseExpiry;
this.numAttempts = numAttempts;
Expand Down
10 changes: 10 additions & 0 deletions RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public ObjectNode toJson() {
node.put(FIELD_STATUS, status.toString());
return node;
}

@Override
public String toString() {
return this.toJson().toString();
}
}

public static class Metadata extends CmsEntry.Metadata {
Expand Down Expand Up @@ -84,6 +89,11 @@ public ObjectNode toJson() {
node.put(FIELD_NUM_ATTEMPTS, numAttempts);
return node;
}

@Override
public String toString() {
return this.toJson().toString();
}
}

public static class CantParseCmsEntryFromJson extends RfsException {
Expand Down
2 changes: 1 addition & 1 deletion RFS/src/main/java/com/rfs/common/OpenSearchClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public RestClient.Response getSnapshotStatus(String repoName, String snapshotNam
String targetPath = "_snapshot/" + repoName + "/" + snapshotName;
return client.getAsync(targetPath)
.flatMap(resp -> {
if (resp.code == HttpURLConnection.HTTP_OK) {
if (resp.code == HttpURLConnection.HTTP_OK || resp.code == HttpURLConnection.HTTP_NOT_FOUND) {
return Mono.just(resp);
} else {
String errorMessage = "Could get status of snapshot: " + targetPath + ". Response Code: " + resp.code + ", Response Body: " + resp.body;
Expand Down
31 changes: 22 additions & 9 deletions RFS/src/main/java/com/rfs/common/SnapshotCreator.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ public void registerRepo() {
ObjectNode settings = getRequestBodyForRegisterRepo();

// Register the repo; it's fine if it already exists
RestClient.Response response = client.registerSnapshotRepo(getRepoName(), settings);
if (response.code == HttpURLConnection.HTTP_OK || response.code == HttpURLConnection.HTTP_CREATED) {
try {
client.registerSnapshotRepo(getRepoName(), settings);
logger.info("Snapshot repo registration successful");
} else {
logger.error("Snapshot repo registration failed");
} catch (Exception e) {
logger.error("Snapshot repo registration failed", e);
throw new RepoRegistrationFailed(getRepoName());
}
}
Expand All @@ -51,17 +51,24 @@ public void createSnapshot() {
body.put("include_global_state", true);

// Create the snapshot; idempotent operation
RestClient.Response response = client.createSnapshot(getRepoName(), snapshotName, body);
if (response.code == HttpURLConnection.HTTP_OK || response.code == HttpURLConnection.HTTP_CREATED) {
try {
client.createSnapshot(getRepoName(), snapshotName, body);
logger.info("Snapshot " + snapshotName + " creation initiated");
} else {
logger.error("Snapshot " + snapshotName + " creation failed");
} catch (Exception e) {
logger.error("Snapshot " + snapshotName + " creation failed", e);
throw new SnapshotCreationFailed(snapshotName);
}
}

public boolean isSnapshotFinished() {
RestClient.Response response = client.getSnapshotStatus(getRepoName(), snapshotName);
RestClient.Response response;
try {
response = client.getSnapshotStatus(getRepoName(), snapshotName);
} catch (Exception e) {
logger.error("Failed to get snapshot status", e);
throw new SnapshotStatusCheckFailed(snapshotName);
}

if (response.code == HttpURLConnection.HTTP_NOT_FOUND) {
logger.error("Snapshot " + snapshotName + " does not exist");
throw new SnapshotDoesNotExist(snapshotName);
Expand Down Expand Up @@ -106,6 +113,12 @@ public SnapshotDoesNotExist(String snapshotName) {
}
}

public static class SnapshotStatusCheckFailed extends RfsException {
public SnapshotStatusCheckFailed(String snapshotName) {
super("We were unable to retrieve the status of Snapshot " + snapshotName);
}
}

public static class SnapshotStatusUnparsable extends RfsException {
public SnapshotStatusUnparsable(String snapshotName) {
super("Status of Snapshot " + snapshotName + " is not parsable");
Expand Down
64 changes: 32 additions & 32 deletions RFS/src/main/java/com/rfs/worker/MetadataRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,46 +10,46 @@
import com.rfs.transformers.Transformer;
import com.rfs.version_os_2_11.GlobalMetadataCreator_OS_2_11;

public class MetadataRunner {
private static final Logger logger = LogManager.getLogger(SnapshotRunner.class);
private final CmsClient cmsClient;
private final GlobalState globalState;
private final String snapshotName;
private final GlobalMetadata.Factory metadataFactory;
private final GlobalMetadataCreator_OS_2_11 metadataCreator;
private final Transformer transformer;
public class MetadataRunner implements Runner {
private static final Logger logger = LogManager.getLogger(MetadataRunner.class);
private final MetadataStep.SharedMembers members;

public MetadataRunner(GlobalState globalState, CmsClient cmsClient, String snapshotName, GlobalMetadata.Factory metadataFactory,
GlobalMetadataCreator_OS_2_11 metadataCreator, Transformer transformer) {
this.globalState = globalState;
this.cmsClient = cmsClient;
this.snapshotName = snapshotName;
this.metadataFactory = metadataFactory;
this.metadataCreator = metadataCreator;
this.transformer = transformer;
this.members = new MetadataStep.SharedMembers(globalState, cmsClient, snapshotName, metadataFactory, metadataCreator, transformer);
}

public void run() throws Exception {
logger.info("Checking if work remains in the Metadata Phase...");
Metadata metadataEntry = cmsClient.getMetadataEntry();

if (metadataEntry == null || metadataEntry.status != MetadataStatus.COMPLETED) {
MetadataStep.SharedMembers members = new MetadataStep.SharedMembers(
globalState,
cmsClient,
snapshotName,
metadataFactory,
metadataCreator,
transformer
);
WorkerStep nextState = new MetadataStep.EnterPhase(members);
@Override
public void run() {
WorkerStep nextStep = null;
try {
logger.info("Checking if work remains in the Metadata Phase...");
Metadata metadataEntry = members.cmsClient.getMetadataEntry();

if (metadataEntry == null || metadataEntry.status != MetadataStatus.COMPLETED) {
nextStep = new MetadataStep.EnterPhase(members);

while (nextState != null) {
nextState.run();
nextState = nextState.nextStep();
while (nextStep != null) {
nextStep.run();
nextStep = nextStep.nextStep();
}
}

logger.info("Metadata Phase is complete");
} catch (Exception e) {
logger.error("Metadata Migration Phase failed w/ an exception");
logger.error(
getPhaseFailureRecord(
members.globalState.getPhase(),
nextStep,
members.cmsEntry,
e
).toString()
);

throw e;
}

logger.info("Metadata Phase is complete");

}
}
2 changes: 2 additions & 0 deletions RFS/src/main/java/com/rfs/worker/MetadataStep.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public void run() {

@Override
public WorkerStep nextStep() {
// Migrate the templates if we successfully created the CMS entry; otherwise, circle back to the beginning
if (members.cmsEntry != null) {
return new MigrateTemplates(members);
} else {
Expand Down Expand Up @@ -189,6 +190,7 @@ public void run() {

@Override
public WorkerStep nextStep() {
// Migrate the templates if we acquired the lease; otherwise, circle back to the beginning after a backoff
if (members.cmsEntry != null) {
return new MigrateTemplates(members);
} else {
Expand Down
27 changes: 27 additions & 0 deletions RFS/src/main/java/com/rfs/worker/Runner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.rfs.worker;

import java.util.Arrays;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.rfs.cms.CmsEntry;

public abstract interface Runner {
public abstract void run();

default ObjectNode getPhaseFailureRecord(GlobalState.Phase phase, WorkerStep nextStep, CmsEntry.Base cmsEntry, Exception e) {
ObjectNode errorBlob = new ObjectMapper().createObjectNode();
errorBlob.put("exceptionMessage", e.getMessage());
errorBlob.put("exceptionClass", e.getClass().getSimpleName());
errorBlob.put("exceptionTrace", Arrays.toString(e.getStackTrace()));

errorBlob.put("phase", phase.toString());

String currentStep = (nextStep != null) ? nextStep.getClass().getSimpleName() : "null";
errorBlob.put("currentStep", currentStep);

String currentEntry = (cmsEntry != null) ? cmsEntry.toString() : "null";
errorBlob.put("cmsEntry", currentEntry);
return errorBlob;
}
}
54 changes: 33 additions & 21 deletions RFS/src/main/java/com/rfs/worker/SnapshotRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,45 @@
import com.rfs.cms.CmsEntry.SnapshotStatus;
import com.rfs.common.SnapshotCreator;

public class SnapshotRunner {
public class SnapshotRunner implements Runner {
private static final Logger logger = LogManager.getLogger(SnapshotRunner.class);
private final CmsClient cmsClient;
private final GlobalState globalState;
private final SnapshotCreator snapshotCreator;
private final SnapshotStep.SharedMembers members;

public SnapshotRunner(GlobalState globalState, CmsClient cmsClient, SnapshotCreator snapshotCreator) {
this.globalState = globalState;
this.cmsClient = cmsClient;
this.snapshotCreator = snapshotCreator;
this.members = new SnapshotStep.SharedMembers(globalState, cmsClient, snapshotCreator);
}

public void run() throws Exception {
logger.info("Checking if work remains in the Snapshot Phase...");
Snapshot snapshotEntry = cmsClient.getSnapshotEntry(snapshotCreator.getSnapshotName());

if (snapshotEntry == null || snapshotEntry.status != SnapshotStatus.COMPLETED) {
SnapshotStep.SharedMembers sharedMembers = new SnapshotStep.SharedMembers(globalState, cmsClient, snapshotCreator);
WorkerStep nextState = new SnapshotStep.EnterPhase(sharedMembers, snapshotEntry);
@Override
public void run() {
WorkerStep nextStep = null;

try {
logger.info("Checking if work remains in the Snapshot Phase...");
Snapshot snapshotEntry = members.cmsClient.getSnapshotEntry(members.snapshotCreator.getSnapshotName());

if (snapshotEntry == null || snapshotEntry.status != SnapshotStatus.COMPLETED) {
nextStep = new SnapshotStep.EnterPhase(members, snapshotEntry);

while (nextState != null) {
nextState.run();
nextState = nextState.nextStep();
while (nextStep != null) {
nextStep.run();
nextStep = nextStep.nextStep();
}
}
}

logger.info("Snapshot Phase is complete");
}

logger.info("Snapshot Phase is complete");
} catch (Exception e) {
logger.error("Snapshot Phase failed w/ an exception");
logger.error(
getPhaseFailureRecord(
members.globalState.getPhase(),
nextStep,
members.cmsEntry,
e
).toString()
);

throw e;
}

}
}

0 comments on commit 127613d

Please sign in to comment.