Skip to content

Commit

Permalink
Refactored RFS Runners and added unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Helma <chelma+github@amazon.com>
  • Loading branch information
chelma committed May 28, 2024
1 parent 5105030 commit 872db82
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 45 deletions.
30 changes: 29 additions & 1 deletion RFS/src/main/java/com/rfs/RunRfsWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;


import com.rfs.cms.CmsClient;
import com.rfs.cms.CmsEntry;
import com.rfs.cms.OpenSearchCmsClient;
import com.rfs.common.ClusterVersion;
import com.rfs.common.ConnectionDetails;
Expand All @@ -32,7 +37,9 @@
import com.rfs.version_os_2_11.GlobalMetadataCreator_OS_2_11;
import com.rfs.worker.GlobalState;
import com.rfs.worker.MetadataRunner;
import com.rfs.worker.Runner;
import com.rfs.worker.SnapshotRunner;
import com.rfs.worker.WorkerStep;

public class RunRfsWorker {
private static final Logger logger = LogManager.getLogger(RunRfsWorker.class);
Expand Down Expand Up @@ -133,9 +140,30 @@ public static void main(String[] args) throws Exception {
MetadataRunner metadataWorker = new MetadataRunner(globalState, cmsClient, snapshotName, metadataFactory, metadataCreator, transformer);
metadataWorker.run();

} catch (Runner.PhaseFailed e) {
logPhaseFailureRecord(e.phase, e.nextStep, e.cmsEntry, e.e);
throw e;
} catch (Exception e) {
logger.error("Error running RfsWorker", e);
logger.error("Unexpected error running RfsWorker", e);
throw e;
}
}

public static void logPhaseFailureRecord(GlobalState.Phase phase, WorkerStep nextStep, Optional<CmsEntry.Base> cmsEntry, Exception e) {
ObjectNode errorBlob = new ObjectMapper().createObjectNode();
errorBlob.put("exceptionMessage", e.getMessage());
errorBlob.put("exceptionClass", e.getClass().getSimpleName());
errorBlob.put("exceptionTrace", Arrays.toString(e.getStackTrace()));

errorBlob.put("phase", phase.toString());

String currentStep = (nextStep != null) ? nextStep.getClass().getSimpleName() : "null";
errorBlob.put("currentStep", currentStep);

String currentEntry = (cmsEntry.isPresent()) ? cmsEntry.toString() : "null";
errorBlob.put("cmsEntry", currentEntry);


logger.error(errorBlob.toString());
}
}
2 changes: 1 addition & 1 deletion RFS/src/main/java/com/rfs/worker/GlobalState.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
public class GlobalState {
private static final AtomicReference<GlobalState> instance = new AtomicReference<>();

enum Phase {
public enum Phase {
UNSET,
SNAPSHOT_IN_PROGRESS,
SNAPSHOT_COMPLETED,
Expand Down
38 changes: 22 additions & 16 deletions RFS/src/main/java/com/rfs/worker/MetadataRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ public MetadataRunner(GlobalState globalState, CmsClient cmsClient, String snaps
}

@Override
public void run() {
public void runInternal() {
WorkerStep nextStep = null;
try {
logger.info("Checking if work remains in the Metadata Phase...");
Optional<Metadata> metadataEntry = members.cmsClient.getMetadataEntry();

if (metadataEntry.isEmpty() || metadataEntry.get().status != MetadataStatus.COMPLETED) {
Expand All @@ -38,22 +37,29 @@ public void run() {
nextStep = nextStep.nextStep();
}
}

logger.info("Metadata Phase is complete");
} catch (Exception e) {
logger.error("Metadata Migration Phase failed w/ an exception");
logger.error(
getPhaseFailureRecord(
members.globalState.getPhase(),
nextStep,
members.cmsEntry.map(bar -> (CmsEntry.Base) bar),
e
).toString()
throw new MetadataMigrationPhaseFailed(
members.globalState.getPhase(),
nextStep,
members.cmsEntry.map(bar -> (CmsEntry.Base) bar),
e
);
}
}

throw e;
}
@Override
public String getPhaseName() {
return "Metadata Migration";
}


}
@Override
public Logger getLogger() {
return logger;
}

public static class MetadataMigrationPhaseFailed extends Runner.PhaseFailed {
public MetadataMigrationPhaseFailed(GlobalState.Phase phase, WorkerStep nextStep, Optional<CmsEntry.Base> cmsEntry, Exception e) {
super("Metadata Migration Phase failed", phase, nextStep, cmsEntry, e);
}
}
}
40 changes: 28 additions & 12 deletions RFS/src/main/java/com/rfs/worker/Runner.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,44 @@
package com.rfs.worker;

import org.apache.logging.log4j.Logger;

import java.util.Arrays;
import java.util.Optional;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.rfs.cms.CmsEntry;
import com.rfs.common.RfsException;

public abstract interface Runner {
public abstract void run();
abstract void runInternal();
abstract String getPhaseName();
abstract Logger getLogger();

default ObjectNode getPhaseFailureRecord(GlobalState.Phase phase, WorkerStep nextStep, Optional<CmsEntry.Base> cmsEntry, Exception e) {
ObjectNode errorBlob = new ObjectMapper().createObjectNode();
errorBlob.put("exceptionMessage", e.getMessage());
errorBlob.put("exceptionClass", e.getClass().getSimpleName());
errorBlob.put("exceptionTrace", Arrays.toString(e.getStackTrace()));
default void run() {
try {
getLogger().info("Checking if work remains in the " + getPhaseName() +" Phase...");
runInternal();
getLogger().info(getPhaseName() + " Phase is complete");
} catch (Exception e) {
getLogger().error(getPhaseName() + " Phase failed w/ an exception");

errorBlob.put("phase", phase.toString());
throw e;
}
}

String currentStep = (nextStep != null) ? nextStep.getClass().getSimpleName() : "null";
errorBlob.put("currentStep", currentStep);
public static class PhaseFailed extends RfsException {
public final GlobalState.Phase phase;
public final WorkerStep nextStep;
public final Optional<CmsEntry.Base> cmsEntry;
public final Exception e;

String currentEntry = (cmsEntry.isPresent()) ? cmsEntry.toString() : "null";
errorBlob.put("cmsEntry", currentEntry);
return errorBlob;
public PhaseFailed(String message, GlobalState.Phase phase, WorkerStep nextStep, Optional<CmsEntry.Base> cmsEntry, Exception e) {
super(message);
this.phase = phase;
this.nextStep = nextStep;
this.cmsEntry = cmsEntry;
this.e = e;
}
}
}
37 changes: 22 additions & 15 deletions RFS/src/main/java/com/rfs/worker/SnapshotRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ public SnapshotRunner(GlobalState globalState, CmsClient cmsClient, SnapshotCrea
}

@Override
public void run() {
public void runInternal() {
WorkerStep nextStep = null;

try {
logger.info("Checking if work remains in the Snapshot Phase...");
Optional<Snapshot> snapshotEntry = members.cmsClient.getSnapshotEntry(members.snapshotCreator.getSnapshotName());

if (snapshotEntry.isEmpty() || snapshotEntry.get().status != SnapshotStatus.COMPLETED) {
Expand All @@ -36,21 +35,29 @@ public void run() {
nextStep = nextStep.nextStep();
}
}

logger.info("Snapshot Phase is complete");
} catch (Exception e) {
logger.error("Snapshot Phase failed w/ an exception");
logger.error(
getPhaseFailureRecord(
members.globalState.getPhase(),
nextStep,
members.cmsEntry.map(bar -> (CmsEntry.Base) bar),
e
).toString()
throw new SnapshotPhaseFailed(
members.globalState.getPhase(),
nextStep,
members.cmsEntry.map(bar -> (CmsEntry.Base) bar),
e
);
}
}

@Override
public String getPhaseName() {
return "Snapshot";
}

throw e;
@Override
public Logger getLogger() {
return logger;
}

public static class SnapshotPhaseFailed extends Runner.PhaseFailed {
public SnapshotPhaseFailed(GlobalState.Phase phase, WorkerStep nextStep, Optional<CmsEntry.Base> cmsEntry, Exception e) {
super("Snapshot Phase failed", phase, nextStep, cmsEntry, e);
}

}
}
}
52 changes: 52 additions & 0 deletions RFS/src/test/java/com/rfs/worker/MetadataRunnerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.rfs.worker;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.*;

import java.util.Optional;

import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import com.rfs.cms.CmsClient;
import com.rfs.common.GlobalMetadata;
import com.rfs.common.RfsException;
import com.rfs.common.SnapshotCreator;
import com.rfs.transformers.Transformer;
import com.rfs.version_os_2_11.GlobalMetadataCreator_OS_2_11;

class MetadataRunnerTest {

@Test
void run_encountersAnException_asExpected() {
// Setup
GlobalState globalState = Mockito.mock(GlobalState.class);
CmsClient cmsClient = Mockito.mock(CmsClient.class);
String snapshotName = "testSnapshot";
GlobalMetadata.Factory metadataFactory = Mockito.mock(GlobalMetadata.Factory.class);
GlobalMetadataCreator_OS_2_11 metadataCreator = Mockito.mock(GlobalMetadataCreator_OS_2_11.class);
Transformer transformer = Mockito.mock(Transformer.class);
RfsException testException = new RfsException("Unit test");

doThrow(testException).when(cmsClient).getMetadataEntry();
when(globalState.getPhase()).thenReturn(GlobalState.Phase.METADATA_IN_PROGRESS);


MetadataRunner testRunner = new MetadataRunner(globalState, cmsClient, snapshotName, metadataFactory, metadataCreator, transformer);

// Run the test
try {
testRunner.run();
} catch (MetadataRunner.MetadataMigrationPhaseFailed e) {
assertEquals(GlobalState.Phase.METADATA_IN_PROGRESS, e.phase);
assertEquals(null, e.nextStep);
assertEquals(Optional.empty(), e.cmsEntry);
assertEquals(testException, e.e);

} catch (Exception e) {
fail("Unexpected exception thrown: " + e.getClass().getName());
}
}

}
45 changes: 45 additions & 0 deletions RFS/src/test/java/com/rfs/worker/SnapshotRunnerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.rfs.worker;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.*;

import java.util.Optional;

import org.junit.jupiter.api.Test;

import com.rfs.cms.CmsClient;
import com.rfs.common.RfsException;
import com.rfs.common.SnapshotCreator;

class SnapshotRunnerTest {

@Test
void run_encountersAnException_asExpected() {
// Setup
String snapshotName = "snapshotName";
GlobalState globalState = mock(GlobalState.class);
CmsClient cmsClient = mock(CmsClient.class);
SnapshotCreator snapshotCreator = mock(SnapshotCreator.class);
SnapshotRunner testRunner = new SnapshotRunner(globalState, cmsClient, snapshotCreator);
RfsException testException = new RfsException("Unit test");

doThrow(testException).when(cmsClient).getSnapshotEntry(snapshotName);
when(globalState.getPhase()).thenReturn(GlobalState.Phase.SNAPSHOT_IN_PROGRESS);
when(snapshotCreator.getSnapshotName()).thenReturn(snapshotName);

// Run the test
try {
testRunner.run();
} catch (SnapshotRunner.SnapshotPhaseFailed e) {
assertEquals(GlobalState.Phase.SNAPSHOT_IN_PROGRESS, e.phase);
assertEquals(null, e.nextStep);
assertEquals(Optional.empty(), e.cmsEntry);
assertEquals(testException, e.e);

} catch (Exception e) {
fail("Unexpected exception thrown: " + e.getClass().getName());
}
}

}

0 comments on commit 872db82

Please sign in to comment.