Skip to content

Commit

Permalink
Unit tested coordinated RFS index creation
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Helma <chelma+github@amazon.com>
  • Loading branch information
chelma committed Jun 2, 2024
1 parent e51089a commit ea34e04
Show file tree
Hide file tree
Showing 5 changed files with 598 additions and 36 deletions.
46 changes: 14 additions & 32 deletions RFS/src/main/java/com/rfs/worker/IndexStep.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.rfs.cms.CmsClient;
import com.rfs.cms.CmsEntry;
import com.rfs.cms.OpenSearchCmsClient;
import com.rfs.cms.OpenSearchCmsEntry;
import com.rfs.common.IndexMetadata;
import com.rfs.common.RfsException;
import com.rfs.common.SnapshotRepo;
Expand Down Expand Up @@ -123,8 +122,7 @@ public WorkerStep nextStep() {
return new RandomWait(members);

case IN_PROGRESS:
return new ExitPhaseSuccess(members);
// return new GetIndicesToMigrate(members);
return new GetIndicesToMigrate(members);
case COMPLETED:
return new ExitPhaseSuccess(members);
case FAILED:
Expand Down Expand Up @@ -202,7 +200,6 @@ public WorkerStep nextStep() {
}
}
}


public static class SetupIndexWorkEntries extends Base {

Expand All @@ -229,8 +226,8 @@ public void run() {
logger.info("Finished setting up the Index Work Items.");

logger.info("Updating the Index Migration entry to indicate setup has been completed...");
OpenSearchCmsEntry.Index updatedEntry = new OpenSearchCmsEntry.Index(
CmsEntry.IndexStatus.IN_PROGRESS,
CmsEntry.Index updatedEntry = new CmsEntry.Index(
CmsEntry.IndexStatus.COMPLETED,
lastCmsEntry.leaseExpiry,
lastCmsEntry.numAttempts
);
Expand Down Expand Up @@ -338,20 +335,19 @@ public void run() {

IndexMetadataData_OS_2_11 indexMetadataOS211 = new IndexMetadataData_OS_2_11(transformedRoot, indexMetadata.getId(), workItem.name);
members.indexCreator.create(workItem.name, indexMetadataOS211).ifPresentOrElse(
value -> {
logger.info("Index " + workItem.name + " created successfully");
logger.info("Forcefully updating the Index Work Item to indicate it has been completed...");
CmsEntry.IndexWorkItem updatedEntry = new CmsEntry.IndexWorkItem(
workItem.name,
CmsEntry.IndexWorkItemStatus.COMPLETED,
workItem.numAttempts,
workItem.numShards
);
members.cmsClient.updateIndexWorkItemForceful(updatedEntry);
logger.info("Index Work Item updated");
},
value -> logger.info("Index " + workItem.name + " created successfully"),
() -> logger.info("Index " + workItem.name + " already existed; no work required")
);

logger.info("Forcefully updating the Index Work Item to indicate it has been completed...");
CmsEntry.IndexWorkItem updatedEntry = new CmsEntry.IndexWorkItem(
workItem.name,
CmsEntry.IndexWorkItemStatus.COMPLETED,
workItem.numAttempts,
workItem.numShards
);
members.cmsClient.updateIndexWorkItemForceful(updatedEntry);
logger.info("Index Work Item updated");
} catch (Exception e) {
logger.info("Failed to migrate index: " + workItem.name, e);
logger.info("Updating the Index Work Item with incremented attempt count...");
Expand All @@ -376,20 +372,6 @@ public WorkerStep nextStep() {
}
}















public static class RandomWait extends Base {
private final static int WAIT_TIME_MS = 5 * 1000; // arbitrarily chosen

Expand Down
49 changes: 49 additions & 0 deletions RFS/src/test/java/com/rfs/worker/IndexRunnerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.rfs.worker;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.*;

import java.util.Optional;

import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import com.rfs.cms.CmsClient;
import com.rfs.common.IndexMetadata;
import com.rfs.common.RfsException;
import com.rfs.transformers.Transformer;
import com.rfs.version_os_2_11.IndexCreator_OS_2_11;

public class IndexRunnerTest {

@Test
void run_encountersAnException_asExpected() {
// Setup
GlobalState globalState = Mockito.mock(GlobalState.class);
CmsClient cmsClient = Mockito.mock(CmsClient.class);
String snapshotName = "testSnapshot";
IndexMetadata.Factory metadataFactory = Mockito.mock(IndexMetadata.Factory.class);
IndexCreator_OS_2_11 creator = Mockito.mock(IndexCreator_OS_2_11.class);
Transformer transformer = Mockito.mock(Transformer.class);
RfsException testException = new RfsException("Unit test");

doThrow(testException).when(cmsClient).getIndexEntry();
when(globalState.getPhase()).thenReturn(GlobalState.Phase.INDEX_IN_PROGRESS);

// Run the test
try {
IndexRunner testRunner = new IndexRunner(globalState, cmsClient, snapshotName, metadataFactory, creator, transformer);
testRunner.run();
} catch (IndexRunner.IndexMigrationPhaseFailed e) {
assertEquals(GlobalState.Phase.INDEX_IN_PROGRESS, e.phase);
assertEquals(IndexStep.GetEntry.class, e.nextStep.getClass());
assertEquals(Optional.empty(), e.cmsEntry);
assertEquals(testException, e.e);

} catch (Exception e) {
fail("Unexpected exception thrown: " + e.getClass().getName());
}
}

}
Loading

0 comments on commit ea34e04

Please sign in to comment.