diff --git a/RFS/src/main/java/com/rfs/cms/CmsClient.java b/RFS/src/main/java/com/rfs/cms/CmsClient.java index 8da2a5d21..86b53c2c0 100644 --- a/RFS/src/main/java/com/rfs/cms/CmsClient.java +++ b/RFS/src/main/java/com/rfs/cms/CmsClient.java @@ -1,41 +1,45 @@ package com.rfs.cms; +import java.util.Optional; + /* * Client to connect to and work with the Coordinating Metadata Store. The CMS could be implemented by any reasonable * data store option (Postgres, AWS DynamoDB, Elasticsearch/Opensearch, etc). */ public interface CmsClient { /* - * Creates a new entry in the CMS for the Snapshot's progress. Returns true if we created the entry, and false if - * the entry already exists. + * Creates a new entry in the CMS for the Snapshot's progress. Returns an Optional; if the document was created, it + * will be the created object and empty otherwise. */ - public CmsEntry.Snapshot createSnapshotEntry(String snapshotName); + public Optional createSnapshotEntry(String snapshotName); /* - * Attempt to retrieve the Snapshot entry from the CMS, if it exists; null if it doesn't currently exist + * Attempt to retrieve the Snapshot entry from the CMS. Returns an Optional; if the document exists, it will be the + * retrieved entry and empty otherwise. */ - public CmsEntry.Snapshot getSnapshotEntry(String snapshotName); + public Optional getSnapshotEntry(String snapshotName); /* - * Updates the status of the Snapshot entry in the CMS. Returns true if the update was successful, and false if - * something else updated it before we could + * Updates the Snapshot entry in the CMS. Returns an Optional; if the document was updated, it will be + * the updated entry and empty otherwise. */ - public CmsEntry.Snapshot updateSnapshotEntry(String snapshotName, CmsEntry.SnapshotStatus status); + public Optional updateSnapshotEntry(String snapshotName, CmsEntry.SnapshotStatus status); /* - * Creates a new entry in the CMS for the Metadata Migration's progress. Returns the created entry if we created it, - * and null if the entry already exists. + * Creates a new entry in the CMS for the Metadata Migration's progress. Returns an Optional; if the document was + * created, it will be the created entry and empty otherwise. */ - public CmsEntry.Metadata createMetadataEntry(); + public Optional createMetadataEntry(); /* - * Attempt to retrieve the Metadata Migration entry from the CMS, if it exists; null if it doesn't currently exist + * Attempt to retrieve the Metadata Migration entry from the CMS, if it exists. Returns an Optional; if the document + * exists, it will be the retrieved entry and empty otherwise. */ - public CmsEntry.Metadata getMetadataEntry(); + public Optional getMetadataEntry(); /* - * Updates all fields of the Metadata Migration entry in the CMS. Returns the updated entry if successful, and - * null if something else updated it before we could + * Updates the Metadata Migration entry in the CMS. Returns an Optional; if the document was updated, + * it will be the updated entry and empty otherwise. */ - public CmsEntry.Metadata updateMetadataEntry(CmsEntry.MetadataStatus status, String leaseExpiry, Integer numAttempts); + public Optional updateMetadataEntry(CmsEntry.MetadataStatus status, String leaseExpiry, Integer numAttempts); } diff --git a/RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java b/RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java index 25fd02ad7..50b06b0fa 100644 --- a/RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java +++ b/RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java @@ -1,9 +1,9 @@ package com.rfs.cms; -import java.net.HttpURLConnection; +import java.util.Optional; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.rfs.common.OpenSearchClient; -import com.rfs.common.RestClient; public class OpenSearchCmsClient implements CmsClient { public static final String CMS_INDEX_NAME = "cms-reindex-from-snapshot"; @@ -17,73 +17,71 @@ public OpenSearchCmsClient(OpenSearchClient client) { } @Override - public CmsEntry.Snapshot createSnapshotEntry(String snapshotName) { + public Optional createSnapshotEntry(String snapshotName) { OpenSearchCmsEntry.Snapshot newEntry = OpenSearchCmsEntry.Snapshot.getInitial(snapshotName); - boolean createdEntry = client.createDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID, newEntry.toJson()); - if (createdEntry) { - return newEntry; + Optional createdEntry = client.createDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID, newEntry.toJson()); + + if (createdEntry.isPresent()) { + return Optional.of(OpenSearchCmsEntry.Snapshot.fromJson(createdEntry.get())); } else { - return null; + return Optional.empty(); } } @Override - public CmsEntry.Snapshot getSnapshotEntry(String snapshotName) { - RestClient.Response response = client.getDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID); - - if (response.code == HttpURLConnection.HTTP_NOT_FOUND) { - return null; + public Optional getSnapshotEntry(String snapshotName) { + Optional document = client.getDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID); + if (document.isEmpty()) { + return Optional.empty(); } - return OpenSearchCmsEntry.Snapshot.fromJsonString(response.body); + + ObjectNode sourceNode = (ObjectNode) document.get().get("_source"); + return Optional.of(OpenSearchCmsEntry.Snapshot.fromJson(sourceNode)); } @Override - public CmsEntry.Snapshot updateSnapshotEntry(String snapshotName, CmsEntry.SnapshotStatus status) { + public Optional updateSnapshotEntry(String snapshotName, CmsEntry.SnapshotStatus status) { OpenSearchCmsEntry.Snapshot entry = new OpenSearchCmsEntry.Snapshot(snapshotName, status); - boolean updatedEntry = client.updateDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID, entry.toJson()); - if (updatedEntry) { - return entry; - } else { - return null; + Optional updatedEntry = client.updateDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID, entry.toJson()); + if (updatedEntry.isEmpty()) { + return Optional.empty(); } + return Optional.of(OpenSearchCmsEntry.Snapshot.fromJson(updatedEntry.get())); } @Override - public OpenSearchCmsEntry.Metadata createMetadataEntry() { + public Optional createMetadataEntry() { OpenSearchCmsEntry.Metadata entry = OpenSearchCmsEntry.Metadata.getInitial(); + Optional createdEntry = client.createDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID, entry.toJson()); - boolean docCreated = client.createDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID, entry.toJson()); - - if (docCreated) { - return entry; - } else { - return null; + if (createdEntry.isEmpty()) { + return Optional.empty(); } + return Optional.of(OpenSearchCmsEntry.Metadata.fromJson(createdEntry.get())); } @Override - public CmsEntry.Metadata getMetadataEntry() { - RestClient.Response response = client.getDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID); + public Optional getMetadataEntry() { + Optional document = client.getDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID); - if (response.code == HttpURLConnection.HTTP_NOT_FOUND) { - return null; + if (document.isEmpty()) { + return Optional.empty(); } - return OpenSearchCmsEntry.Metadata.fromJsonString(response.body); + + ObjectNode sourceNode = (ObjectNode) document.get().get("_source"); + return Optional.of(OpenSearchCmsEntry.Metadata.fromJson(sourceNode)); } @Override - public CmsEntry.Metadata updateMetadataEntry(CmsEntry.MetadataStatus status, String leaseExpiry, Integer numAttempts) { + public Optional updateMetadataEntry(CmsEntry.MetadataStatus status, String leaseExpiry, Integer numAttempts) { OpenSearchCmsEntry.Metadata metadata = new OpenSearchCmsEntry.Metadata(status, leaseExpiry, numAttempts); + Optional updatedEntry = client.updateDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID, metadata.toJson()); - boolean updatedDoc = client.updateDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID, metadata.toJson()); - - if (updatedDoc) { - return metadata; - } else { - return null; + if (updatedEntry.isEmpty()) { + return Optional.empty(); } - + return Optional.of(OpenSearchCmsEntry.Metadata.fromJson(updatedEntry.get())); } } diff --git a/RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java b/RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java index 06e59fec9..a083c2494 100644 --- a/RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java +++ b/RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java @@ -17,17 +17,14 @@ public static Snapshot getInitial(String name) { return new Snapshot(name, CmsEntry.SnapshotStatus.NOT_STARTED); } - public static Snapshot fromJsonString(String json) { + public static Snapshot fromJson(ObjectNode node) { try { - ObjectNode node = objectMapper.readValue(json, ObjectNode.class); - ObjectNode sourceNode = (ObjectNode) node.get("_source"); - return new Snapshot( - sourceNode.get(FIELD_STATUS).asText(), - CmsEntry.SnapshotStatus.valueOf(sourceNode.get(FIELD_STATUS).asText()) + node.get(FIELD_STATUS).asText(), + CmsEntry.SnapshotStatus.valueOf(node.get(FIELD_STATUS).asText()) ); } catch (Exception e) { - throw new CantParseCmsEntryFromJson(Snapshot.class, json, e); + throw new CantParseCmsEntryFromJson(Snapshot.class, node.toString(), e); } } @@ -63,18 +60,15 @@ public static Metadata getInitial() { ); } - public static Metadata fromJsonString(String json) { + public static Metadata fromJson(ObjectNode node) { try { - ObjectNode node = objectMapper.readValue(json, ObjectNode.class); - ObjectNode sourceNode = (ObjectNode) node.get("_source"); - return new Metadata( - CmsEntry.MetadataStatus.valueOf(sourceNode.get(FIELD_STATUS).asText()), - sourceNode.get(FIELD_LEASE_EXPIRY).asText(), - sourceNode.get(FIELD_NUM_ATTEMPTS).asInt() + CmsEntry.MetadataStatus.valueOf(node.get(FIELD_STATUS).asText()), + node.get(FIELD_LEASE_EXPIRY).asText(), + node.get(FIELD_NUM_ATTEMPTS).asInt() ); } catch (Exception e) { - throw new CantParseCmsEntryFromJson(Metadata.class, json, e); + throw new CantParseCmsEntryFromJson(Metadata.class, node.toString(), e); } } diff --git a/RFS/src/main/java/com/rfs/common/OpenSearchClient.java b/RFS/src/main/java/com/rfs/common/OpenSearchClient.java index 43dd83c8e..f645d1ed5 100644 --- a/RFS/src/main/java/com/rfs/common/OpenSearchClient.java +++ b/RFS/src/main/java/com/rfs/common/OpenSearchClient.java @@ -2,6 +2,7 @@ import java.net.HttpURLConnection; import java.time.Duration; +import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -27,38 +28,42 @@ public OpenSearchClient(ConnectionDetails connectionDetails) { } /* - * Idempotently create a legacy template if it does not already exist; return true if created, false otherwise. + * Create a legacy template if it does not already exist. Returns an Optional; if the template was created, it + * will be the created object and empty otherwise. */ - public boolean createLegacyTemplate(String templateName, ObjectNode settings){ + public Optional createLegacyTemplate(String templateName, ObjectNode settings){ String targetPath = "_template/" + templateName; return createObjectIdempotent(targetPath, settings); } /* - * Idempotently create a component template if it does not already exist; return true if created, false otherwise. + * Create a component template if it does not already exist. Returns an Optional; if the template was created, it + * will be the created object and empty otherwise. */ - public boolean createComponentTemplate(String templateName, ObjectNode settings){ + public Optional createComponentTemplate(String templateName, ObjectNode settings){ String targetPath = "_component_template/" + templateName; return createObjectIdempotent(targetPath, settings); } /* - * Idempotently create an index template if it does not already exist; return true if created, false otherwise. + * Create an index template if it does not already exist. Returns an Optional; if the template was created, it + * will be the created object and empty otherwise. */ - public boolean createIndexTemplate(String templateName, ObjectNode settings){ + public Optional createIndexTemplate(String templateName, ObjectNode settings){ String targetPath = "_index_template/" + templateName; return createObjectIdempotent(targetPath, settings); } /* - * Idempotently create an index if it does not already exist; return true if created, false otherwise. + * Create an index if it does not already exist. Returns an Optional; if the index was created, it + * will be the created object and empty otherwise. */ - public boolean createIndex(String indexName, ObjectNode settings){ + public Optional createIndex(String indexName, ObjectNode settings){ String targetPath = indexName; return createObjectIdempotent(targetPath, settings); } - private boolean createObjectIdempotent(String objectPath, ObjectNode settings){ + private Optional createObjectIdempotent(String objectPath, ObjectNode settings){ RestClient.Response response = client.getAsync(objectPath) .flatMap(resp -> { if (resp.code == HttpURLConnection.HTTP_NOT_FOUND || resp.code == HttpURLConnection.HTTP_OK) { @@ -75,18 +80,22 @@ private boolean createObjectIdempotent(String objectPath, ObjectNode settings){ if (response.code == HttpURLConnection.HTTP_NOT_FOUND) { client.put(objectPath, settings.toString()); - return true; + return Optional.of(settings); } else if (response.code == HttpURLConnection.HTTP_OK) { logger.info(objectPath + " already exists. Skipping creation."); } else { logger.warn("Could not confirm that " + objectPath + " does not already exist. Skipping creation."); } - return false; + return Optional.empty(); } - public RestClient.Response registerSnapshotRepo(String repoName, ObjectNode settings){ + /* + * Register a snapshot repository. Returns an Optional; if the repository was registered, it will be the settings + * and empty otherwise. + */ + public Optional registerSnapshotRepo(String repoName, ObjectNode settings){ String targetPath = "_snapshot/" + repoName; - return client.putAsync(targetPath, settings.toString()) + client.putAsync(targetPath, settings.toString()) .flatMap(resp -> { if (resp.code == HttpURLConnection.HTTP_OK || resp.code == HttpURLConnection.HTTP_CREATED) { return Mono.just(resp); @@ -99,11 +108,16 @@ public RestClient.Response registerSnapshotRepo(String repoName, ObjectNode sett .doOnError(e -> logger.error(e.getMessage())) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(10))) .block(); + + return Optional.of(settings); } - public RestClient.Response createSnapshot(String repoName, String snapshotName, ObjectNode settings){ + /* + * Create a snapshot. Returns an Optional; if the snapshot was created, it will be the settings and empty otherwise. + */ + public Optional createSnapshot(String repoName, String snapshotName, ObjectNode settings){ String targetPath = "_snapshot/" + repoName + "/" + snapshotName; - return client.putAsync(targetPath, settings.toString()) + client.putAsync(targetPath, settings.toString()) .flatMap(resp -> { if (resp.code == HttpURLConnection.HTTP_OK || resp.code == HttpURLConnection.HTTP_CREATED) { return Mono.just(resp); @@ -116,11 +130,17 @@ public RestClient.Response createSnapshot(String repoName, String snapshotName, .doOnError(e -> logger.error(e.getMessage())) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(10))) .block(); + + return Optional.of(settings); } - public RestClient.Response getSnapshotStatus(String repoName, String snapshotName){ + /* + * Get the status of a snapshot. Returns an Optional; if the snapshot was found, it will be the snapshot status + * and empty otherwise. + */ + public Optional getSnapshotStatus(String repoName, String snapshotName){ String targetPath = "_snapshot/" + repoName + "/" + snapshotName; - return client.getAsync(targetPath) + RestClient.Response response = client.getAsync(targetPath) .flatMap(resp -> { if (resp.code == HttpURLConnection.HTTP_OK || resp.code == HttpURLConnection.HTTP_NOT_FOUND) { return Mono.just(resp); @@ -132,12 +152,27 @@ public RestClient.Response getSnapshotStatus(String repoName, String snapshotNam .doOnError(e -> logger.error(e.getMessage())) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(10))) .block(); + + if (response.code == HttpURLConnection.HTTP_OK) { + try { + return Optional.of(objectMapper.readValue(response.body, ObjectNode.class)); + } catch (Exception e) { + String errorMessage = "Could not parse response for: _snapshot/" + repoName + "/" + snapshotName; + throw new OperationFailed(errorMessage, response); + } + } else if (response.code == HttpURLConnection.HTTP_NOT_FOUND) { + return Optional.empty(); + } else { + String errorMessage = "Should not have gotten here while parsing response for: _snapshot/" + repoName + "/" + snapshotName; + throw new OperationFailed(errorMessage, response); + } } /* - * Idempotently create a document if it does not already exist; return true if created, false otherwise. + * Create a document if it does not already exist. Returns an Optional; if the document was created, it + * will be the created object and empty otherwise. */ - public boolean createDocument(String indexName, String documentId, ObjectNode body) { + public Optional createDocument(String indexName, String documentId, ObjectNode body) { String targetPath = indexName + "/_doc/" + documentId + "?op_type=create"; RestClient.Response response = client.putAsync(targetPath, body.toString()) .flatMap(resp -> { @@ -153,18 +188,21 @@ public boolean createDocument(String indexName, String documentId, ObjectNode bo .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(10))) .block(); if (response.code == HttpURLConnection.HTTP_CREATED) { - return true; + return Optional.of(body); } else { // The only response code that can end up here is HTTP_CONFLICT, as everything is an error above - return false; + return Optional.empty(); } } - public RestClient.Response getDocument(String indexName, String documentId) { + /* + * Retrieve a document. Returns an Optional; if the document was found, it will be the document and empty otherwise. + */ + public Optional getDocument(String indexName, String documentId) { String targetPath = indexName + "/_doc/" + documentId; - return client.getAsync(targetPath) + RestClient.Response response = client.getAsync(targetPath) .flatMap(resp -> { - if (resp.code == HttpURLConnection.HTTP_OK || resp.code == HttpURLConnection.HTTP_NOT_FOUND) { + if (resp.code == HttpURLConnection.HTTP_OK || resp.code == HttpURLConnection.HTTP_NOT_FOUND) { return Mono.just(resp); } else { String errorMessage = ("Could not retrieve document: " + indexName + "/" + documentId + ". Response Code: " + resp.code @@ -175,23 +213,40 @@ public RestClient.Response getDocument(String indexName, String documentId) { .doOnError(e -> logger.error(e.getMessage())) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(10))) .block(); + + if (response.code == HttpURLConnection.HTTP_OK) { + try { + return Optional.of(objectMapper.readValue(response.body, ObjectNode.class)); + } catch (Exception e) { + String errorMessage = "Could not parse response for: " + indexName + "/" + documentId; + throw new OperationFailed(errorMessage, response); + } + } else if (response.code == HttpURLConnection.HTTP_NOT_FOUND) { + return Optional.empty(); + } else { + String errorMessage = "Should not have gotten here while parsing response for: " + indexName + "/" + documentId; + throw new OperationFailed(errorMessage, response); + } } /* - * Update a document using optimistic locking; return true if updated, false otherwise. + * Update a document using optimistic locking. Returns an Optional; if the document was updated, it + * will be the new value and empty otherwise. */ - public boolean updateDocument(String indexName, String documentId, ObjectNode body) { - RestClient.Response getResponse = getDocument(indexName, documentId); + public Optional updateDocument(String indexName, String documentId, ObjectNode body) { + Optional document = getDocument(indexName, documentId); + if (document.isEmpty()) { + throw new UpdateFailed("Document not found: " + indexName + "/" + documentId); + } String currentSeqNum; String currentPrimaryTerm; try { - ObjectNode document = (ObjectNode) objectMapper.readTree(getResponse.body); - currentSeqNum = document.get("_seq_no").asText(); - currentPrimaryTerm = document.get("_primary_term").asText(); + currentSeqNum = document.get().get("_seq_no").asText(); + currentPrimaryTerm = document.get().get("_primary_term").asText(); } catch (Exception e) { - String errorMessage = "Could not update document: " + indexName + "/" + documentId + ". Response Code: " + getResponse.code; - throw new OperationFailed(errorMessage, getResponse); + String errorMessage = "Could not update document: " + indexName + "/" + documentId; + throw new RfsException(errorMessage, e); } ObjectNode upsertBody = new ObjectMapper().createObjectNode(); @@ -213,10 +268,10 @@ public boolean updateDocument(String indexName, String documentId, ObjectNode bo .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(10))) .block(); if (response.code == HttpURLConnection.HTTP_OK) { - return true; + return Optional.of(body); } else { // The only response code that can end up here is HTTP_CONFLICT, as everything is an error above - return false; + return Optional.empty(); } } @@ -273,6 +328,12 @@ public String getFailureMessage() { } } + public static class UpdateFailed extends RfsException { + public UpdateFailed(String message) { + super(message); + } + } + public static class OperationFailed extends RfsException { public final RestClient.Response response; diff --git a/RFS/src/main/java/com/rfs/common/SnapshotCreator.java b/RFS/src/main/java/com/rfs/common/SnapshotCreator.java index 219637e0b..8512c2a59 100644 --- a/RFS/src/main/java/com/rfs/common/SnapshotCreator.java +++ b/RFS/src/main/java/com/rfs/common/SnapshotCreator.java @@ -6,7 +6,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.net.HttpURLConnection; +import java.util.Optional; public abstract class SnapshotCreator { private static final Logger logger = LogManager.getLogger(SnapshotCreator.class); @@ -61,7 +61,7 @@ public void createSnapshot() { } public boolean isSnapshotFinished() { - RestClient.Response response; + Optional response; try { response = client.getSnapshotStatus(getRepoName(), snapshotName); } catch (Exception e) { @@ -69,18 +69,12 @@ public boolean isSnapshotFinished() { throw new SnapshotStatusCheckFailed(snapshotName); } - if (response.code == HttpURLConnection.HTTP_NOT_FOUND) { + if (response.isEmpty()) { logger.error("Snapshot " + snapshotName + " does not exist"); throw new SnapshotDoesNotExist(snapshotName); } - JsonNode responseJson; - try { - responseJson = mapper.readTree(response.body); - } catch (Exception e) { - logger.error("Failed to parse snapshot status response", e); - throw new SnapshotStatusUnparsable(snapshotName); - } + JsonNode responseJson = response.get(); JsonNode firstSnapshot = responseJson.path("snapshots").get(0); JsonNode stateNode = firstSnapshot.path("state"); String state = stateNode.asText(); @@ -118,10 +112,4 @@ public SnapshotStatusCheckFailed(String snapshotName) { super("We were unable to retrieve the status of Snapshot " + snapshotName); } } - - public static class SnapshotStatusUnparsable extends RfsException { - public SnapshotStatusUnparsable(String snapshotName) { - super("Status of Snapshot " + snapshotName + " is not parsable"); - } - } } diff --git a/RFS/src/main/java/com/rfs/worker/MetadataRunner.java b/RFS/src/main/java/com/rfs/worker/MetadataRunner.java index 5f836d048..bbfe78b03 100644 --- a/RFS/src/main/java/com/rfs/worker/MetadataRunner.java +++ b/RFS/src/main/java/com/rfs/worker/MetadataRunner.java @@ -1,9 +1,13 @@ package com.rfs.worker; import org.apache.logging.log4j.Logger; + +import java.util.Optional; + import org.apache.logging.log4j.LogManager; import com.rfs.cms.CmsClient; +import com.rfs.cms.CmsEntry; import com.rfs.cms.CmsEntry.Metadata; import com.rfs.cms.CmsEntry.MetadataStatus; import com.rfs.common.GlobalMetadata; @@ -24,9 +28,9 @@ public void run() { WorkerStep nextStep = null; try { logger.info("Checking if work remains in the Metadata Phase..."); - Metadata metadataEntry = members.cmsClient.getMetadataEntry(); + Optional metadataEntry = members.cmsClient.getMetadataEntry(); - if (metadataEntry == null || metadataEntry.status != MetadataStatus.COMPLETED) { + if (metadataEntry.isEmpty() || metadataEntry.get().status != MetadataStatus.COMPLETED) { nextStep = new MetadataStep.EnterPhase(members); while (nextStep != null) { @@ -42,7 +46,7 @@ public void run() { getPhaseFailureRecord( members.globalState.getPhase(), nextStep, - members.cmsEntry, + members.cmsEntry.map(bar -> (CmsEntry.Base) bar), e ).toString() ); diff --git a/RFS/src/main/java/com/rfs/worker/MetadataStep.java b/RFS/src/main/java/com/rfs/worker/MetadataStep.java index e76651575..0770363ec 100644 --- a/RFS/src/main/java/com/rfs/worker/MetadataStep.java +++ b/RFS/src/main/java/com/rfs/worker/MetadataStep.java @@ -1,6 +1,7 @@ package com.rfs.worker; import java.time.Instant; +import java.util.Optional; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -23,7 +24,7 @@ public static class SharedMembers { protected final GlobalMetadata.Factory metadataFactory; protected final GlobalMetadataCreator_OS_2_11 metadataCreator; protected final Transformer transformer; - protected CmsEntry.Metadata cmsEntry; + protected Optional cmsEntry; public SharedMembers(GlobalState globalState, CmsClient cmsClient, String snapshotName, GlobalMetadata.Factory metadataFactory, GlobalMetadataCreator_OS_2_11 metadataCreator, Transformer transformer) { @@ -33,16 +34,16 @@ public SharedMembers(GlobalState globalState, CmsClient cmsClient, String snapsh this.metadataFactory = metadataFactory; this.metadataCreator = metadataCreator; this.transformer = transformer; - this.cmsEntry = null; + this.cmsEntry = Optional.empty(); } - // A convient way to check if the CMS entry is null before retrieving it. In some places, it's fine/expected - // for the CMS entry to be null, but in others, it's a problem. - public CmsEntry.Metadata getCmsEntryNotNull() { - if (cmsEntry == null) { + // A convient way to check if the CMS entry is present before retrieving it. In some places, it's fine/expected + // for the CMS entry to be missing, but in others, it's a problem. + public CmsEntry.Metadata getCmsEntryNotMissing() { + if (cmsEntry.isEmpty()) { throw new MissingMigrationEntry("The Metadata Migration CMS entry we expected to be stored in local memory was null"); } - return cmsEntry; + return cmsEntry.get(); } } @@ -98,36 +99,37 @@ public void run() { @Override public WorkerStep nextStep() { - if (members.cmsEntry == null) { + if (members.cmsEntry.isEmpty()) { return new CreateEntry(members); - } else { - switch (members.cmsEntry.status) { - case IN_PROGRESS: - // TODO: This uses the client-side clock to evaluate the lease expiration, when we should - // ideally be using the server-side clock. Consider this a temporary solution until we find - // out how to use the server-side clock. - long leaseExpiryMillis = Long.parseLong(members.cmsEntry.leaseExpiry); - Instant leaseExpiryInstant = Instant.ofEpochMilli(leaseExpiryMillis); - boolean leaseExpired = leaseExpiryInstant.isBefore(Instant.now()); - - // Don't try to acquire the lease if we're already at the max number of attempts - if (members.cmsEntry.numAttempts >= CmsEntry.Metadata.MAX_ATTEMPTS && leaseExpired) { - return new ExitPhaseFailed(members, new MaxAttemptsExceeded()); - } - - if (leaseExpired) { - return new AcquireLease(members); - } - - logger.info("Metadata Migration entry found, but there's already a valid work lease on it"); - return new RandomWait(members); - case COMPLETED: - return new ExitPhaseSuccess(members); - case FAILED: - return new ExitPhaseFailed(members, new FoundFailedMetadataMigration()); - default: - throw new IllegalStateException("Unexpected metadata migration status: " + members.cmsEntry.status); - } + } + + CmsEntry.Metadata currentEntry = members.cmsEntry.get(); + switch (currentEntry.status) { + case IN_PROGRESS: + // TODO: This uses the client-side clock to evaluate the lease expiration, when we should + // ideally be using the server-side clock. Consider this a temporary solution until we find + // out how to use the server-side clock. + long leaseExpiryMillis = Long.parseLong(currentEntry.leaseExpiry); + Instant leaseExpiryInstant = Instant.ofEpochMilli(leaseExpiryMillis); + boolean leaseExpired = leaseExpiryInstant.isBefore(Instant.now()); + + // Don't try to acquire the lease if we're already at the max number of attempts + if (currentEntry.numAttempts >= CmsEntry.Metadata.MAX_ATTEMPTS && leaseExpired) { + return new ExitPhaseFailed(members, new MaxAttemptsExceeded()); + } + + if (leaseExpired) { + return new AcquireLease(members); + } + + logger.info("Metadata Migration entry found, but there's already a valid work lease on it"); + return new RandomWait(members); + case COMPLETED: + return new ExitPhaseSuccess(members); + case FAILED: + return new ExitPhaseFailed(members, new FoundFailedMetadataMigration()); + default: + throw new IllegalStateException("Unexpected metadata migration status: " + currentEntry.status); } } } @@ -148,7 +150,7 @@ public void run() { @Override public WorkerStep nextStep() { // Migrate the templates if we successfully created the CMS entry; otherwise, circle back to the beginning - if (members.cmsEntry != null) { + if (members.cmsEntry.isPresent()) { return new MigrateTemplates(members); } else { return new GetEntry(members); @@ -169,7 +171,7 @@ protected long getNowMs() { @Override public void run() { // We only get here if we know we want to acquire the lock, so we know the CMS entry should not be null - CmsEntry.Metadata currentCmsEntry = members.getCmsEntryNotNull(); + CmsEntry.Metadata currentCmsEntry = members.getCmsEntryNotMissing(); logger.info("Current Metadata Migration work lease appears to have expired; attempting to acquire it..."); @@ -181,7 +183,7 @@ public void run() { currentCmsEntry.numAttempts + 1 ); - if (members.cmsEntry != null) { + if (members.cmsEntry.isPresent()) { logger.info("Lease acquired"); } else { logger.info("Failed to acquire lease"); @@ -191,7 +193,7 @@ public void run() { @Override public WorkerStep nextStep() { // Migrate the templates if we acquired the lease; otherwise, circle back to the beginning after a backoff - if (members.cmsEntry != null) { + if (members.cmsEntry.isPresent()) { return new MigrateTemplates(members); } else { return new RandomWait(members); @@ -208,7 +210,7 @@ public MigrateTemplates(SharedMembers members) { @Override public void run() { // We only get here if we acquired the lock, so we know the CMS entry should not be null - CmsEntry.Metadata currentCmsEntry = members.getCmsEntryNotNull(); + CmsEntry.Metadata currentCmsEntry = members.getCmsEntryNotMissing(); logger.info("Setting the worker's current work item to be the Metadata Migration..."); members.globalState.updateWorkItem(new OpenSearchWorkItem(OpenSearchCmsClient.CMS_INDEX_NAME, OpenSearchCmsClient.CMS_METADATA_DOC_ID)); @@ -236,7 +238,7 @@ public void run() { @Override public WorkerStep nextStep() { - if (members.cmsEntry == null) { + if (members.cmsEntry.isEmpty()) { // In this scenario, we've done all the work, but failed to update the CMS entry so that we know we've // done the work. We circle back around to try again, which is made more reasonable by the fact we // don't re-migrate templates that already exist on the target cluster. If we didn't circle back @@ -307,7 +309,7 @@ public ExitPhaseFailed(SharedMembers members, MetadataMigrationFailed e) { public void run() { // We either failed the Metadata Migration or found it had already been failed; either way this // should not be null - CmsEntry.Metadata currentCmsEntry = members.getCmsEntryNotNull(); + CmsEntry.Metadata currentCmsEntry = members.getCmsEntryNotMissing(); logger.error("Metadata Migration failed"); members.cmsClient.updateMetadataEntry( diff --git a/RFS/src/main/java/com/rfs/worker/Runner.java b/RFS/src/main/java/com/rfs/worker/Runner.java index c1b34605e..1cb6c3c98 100644 --- a/RFS/src/main/java/com/rfs/worker/Runner.java +++ b/RFS/src/main/java/com/rfs/worker/Runner.java @@ -1,6 +1,7 @@ package com.rfs.worker; import java.util.Arrays; +import java.util.Optional; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -9,7 +10,7 @@ public abstract interface Runner { public abstract void run(); - default ObjectNode getPhaseFailureRecord(GlobalState.Phase phase, WorkerStep nextStep, CmsEntry.Base cmsEntry, Exception e) { + default ObjectNode getPhaseFailureRecord(GlobalState.Phase phase, WorkerStep nextStep, Optional cmsEntry, Exception e) { ObjectNode errorBlob = new ObjectMapper().createObjectNode(); errorBlob.put("exceptionMessage", e.getMessage()); errorBlob.put("exceptionClass", e.getClass().getSimpleName()); @@ -20,7 +21,7 @@ default ObjectNode getPhaseFailureRecord(GlobalState.Phase phase, WorkerStep nex String currentStep = (nextStep != null) ? nextStep.getClass().getSimpleName() : "null"; errorBlob.put("currentStep", currentStep); - String currentEntry = (cmsEntry != null) ? cmsEntry.toString() : "null"; + String currentEntry = (cmsEntry.isPresent()) ? cmsEntry.toString() : "null"; errorBlob.put("cmsEntry", currentEntry); return errorBlob; } diff --git a/RFS/src/main/java/com/rfs/worker/SnapshotRunner.java b/RFS/src/main/java/com/rfs/worker/SnapshotRunner.java index 73233e08e..c3fd20f34 100644 --- a/RFS/src/main/java/com/rfs/worker/SnapshotRunner.java +++ b/RFS/src/main/java/com/rfs/worker/SnapshotRunner.java @@ -1,9 +1,13 @@ package com.rfs.worker; import org.apache.logging.log4j.Logger; + +import java.util.Optional; + import org.apache.logging.log4j.LogManager; import com.rfs.cms.CmsClient; +import com.rfs.cms.CmsEntry; import com.rfs.cms.CmsEntry.Snapshot; import com.rfs.cms.CmsEntry.SnapshotStatus; import com.rfs.common.SnapshotCreator; @@ -22,9 +26,9 @@ public void run() { try { logger.info("Checking if work remains in the Snapshot Phase..."); - Snapshot snapshotEntry = members.cmsClient.getSnapshotEntry(members.snapshotCreator.getSnapshotName()); + Optional snapshotEntry = members.cmsClient.getSnapshotEntry(members.snapshotCreator.getSnapshotName()); - if (snapshotEntry == null || snapshotEntry.status != SnapshotStatus.COMPLETED) { + if (snapshotEntry.isEmpty() || snapshotEntry.get().status != SnapshotStatus.COMPLETED) { nextStep = new SnapshotStep.EnterPhase(members, snapshotEntry); while (nextStep != null) { @@ -40,7 +44,7 @@ public void run() { getPhaseFailureRecord( members.globalState.getPhase(), nextStep, - members.cmsEntry, + members.cmsEntry.map(bar -> (CmsEntry.Base) bar), e ).toString() ); diff --git a/RFS/src/main/java/com/rfs/worker/SnapshotStep.java b/RFS/src/main/java/com/rfs/worker/SnapshotStep.java index 394905ec0..798bc6139 100644 --- a/RFS/src/main/java/com/rfs/worker/SnapshotStep.java +++ b/RFS/src/main/java/com/rfs/worker/SnapshotStep.java @@ -1,5 +1,7 @@ package com.rfs.worker; +import java.util.Optional; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -15,13 +17,13 @@ public static class SharedMembers { protected final CmsClient cmsClient; protected final GlobalState globalState; protected final SnapshotCreator snapshotCreator; - protected CmsEntry.Snapshot cmsEntry; + protected Optional cmsEntry; public SharedMembers(GlobalState globalState, CmsClient cmsClient, SnapshotCreator snapshotCreator) { this.globalState = globalState; this.cmsClient = cmsClient; this.snapshotCreator = snapshotCreator; - this.cmsEntry = null; + this.cmsEntry = Optional.empty(); } } @@ -45,7 +47,7 @@ public Base(SharedMembers members) { */ public static class EnterPhase extends Base { - public EnterPhase(SharedMembers members, CmsEntry.Snapshot currentEntry) { + public EnterPhase(SharedMembers members, Optional currentEntry) { super(members); this.members.cmsEntry = currentEntry; } @@ -58,17 +60,18 @@ public void run() { @Override public WorkerStep nextStep() { - if (members.cmsEntry == null) { + if (members.cmsEntry.isEmpty()) { return new CreateEntry(members); - } else { - switch (members.cmsEntry.status) { - case NOT_STARTED: - return new InitiateSnapshot(members); - case IN_PROGRESS: - return new WaitForSnapshot(members); - default: - throw new IllegalStateException("Unexpected snapshot status: " + members.cmsEntry.status); - } + } + + CmsEntry.Snapshot currentEntry = members.cmsEntry.get(); + switch (currentEntry.status) { + case NOT_STARTED: + return new InitiateSnapshot(members); + case IN_PROGRESS: + return new WaitForSnapshot(members); + default: + throw new IllegalStateException("Unexpected snapshot status: " + currentEntry.status); } } } diff --git a/RFS/src/test/java/com/rfs/worker/MetadataStepTest.java b/RFS/src/test/java/com/rfs/worker/MetadataStepTest.java index eefd75664..2a7bf66a9 100644 --- a/RFS/src/test/java/com/rfs/worker/MetadataStepTest.java +++ b/RFS/src/test/java/com/rfs/worker/MetadataStepTest.java @@ -2,6 +2,7 @@ import java.time.Duration; import java.time.Instant; +import java.util.Optional; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -65,77 +66,77 @@ static Stream provideGetEntryArgs() { return Stream.of( // There is no CMS entry, so we need to create one Arguments.of( - null, + Optional.empty(), MetadataStep.CreateEntry.class ), // The CMS entry has an expired lease and is under the retry limit, so we try to acquire the lease Arguments.of( - new CmsEntry.Metadata( + Optional.of(new CmsEntry.Metadata( CmsEntry.MetadataStatus.IN_PROGRESS, String.valueOf(Instant.now().minus(Duration.ofDays(1)).toEpochMilli()), CmsEntry.Metadata.MAX_ATTEMPTS - 1 - ), + )), MetadataStep.AcquireLease.class ), // The CMS entry has an expired lease and is at the retry limit, so we exit as failed Arguments.of( - new CmsEntry.Metadata( + Optional.of(new CmsEntry.Metadata( CmsEntry.MetadataStatus.IN_PROGRESS, String.valueOf(Instant.now().minus(Duration.ofDays(1)).toEpochMilli()), CmsEntry.Metadata.MAX_ATTEMPTS - ), + )), MetadataStep.ExitPhaseFailed.class ), // The CMS entry has an expired lease and is over the retry limit, so we exit as failed Arguments.of( - new CmsEntry.Metadata( + Optional.of(new CmsEntry.Metadata( CmsEntry.MetadataStatus.IN_PROGRESS, String.valueOf(Instant.now().minus(Duration.ofDays(1)).toEpochMilli()), CmsEntry.Metadata.MAX_ATTEMPTS + 1 - ), + )), MetadataStep.ExitPhaseFailed.class ), // The CMS entry has valid lease and is under the retry limit, so we back off a bit Arguments.of( - new CmsEntry.Metadata( + Optional.of(new CmsEntry.Metadata( CmsEntry.MetadataStatus.IN_PROGRESS, String.valueOf(Instant.now().plus(Duration.ofDays(1)).toEpochMilli()), CmsEntry.Metadata.MAX_ATTEMPTS - 1 - ), + )), MetadataStep.RandomWait.class ), // The CMS entry has valid lease and is at the retry limit, so we back off a bit Arguments.of( - new CmsEntry.Metadata( + Optional.of(new CmsEntry.Metadata( CmsEntry.MetadataStatus.IN_PROGRESS, String.valueOf(Instant.now().plus(Duration.ofDays(1)).toEpochMilli()), CmsEntry.Metadata.MAX_ATTEMPTS - ), + )), MetadataStep.RandomWait.class ), // The CMS entry is marked as completed, so we exit as success Arguments.of( - new CmsEntry.Metadata( + Optional.of(new CmsEntry.Metadata( CmsEntry.MetadataStatus.COMPLETED, String.valueOf(Instant.now().plus(Duration.ofDays(1)).toEpochMilli()), CmsEntry.Metadata.MAX_ATTEMPTS - 1 - ), + )), MetadataStep.ExitPhaseSuccess.class ), // The CMS entry is marked as completed, so we exit as success Arguments.of( - new CmsEntry.Metadata( + Optional.of(new CmsEntry.Metadata( CmsEntry.MetadataStatus.FAILED, String.valueOf(Instant.now().plus(Duration.ofDays(1)).toEpochMilli()), CmsEntry.Metadata.MAX_ATTEMPTS - 1 - ), + )), MetadataStep.ExitPhaseFailed.class ) ); @@ -143,7 +144,7 @@ static Stream provideGetEntryArgs() { @ParameterizedTest @MethodSource("provideGetEntryArgs") - void GetEntry_AsExpected(CmsEntry.Metadata metadata, Class nextStepClass) { + void GetEntry_AsExpected(Optional metadata, Class nextStepClass) { // Set up the test Mockito.when(testMembers.cmsClient.getMetadataEntry()).thenReturn(metadata); @@ -161,22 +162,22 @@ static Stream provideCreateEntryArgs() { return Stream.of( // We were able to create the CMS entry ourselves, so we have the work lease Arguments.of( - new CmsEntry.Metadata( + Optional.of(new CmsEntry.Metadata( CmsEntry.MetadataStatus.IN_PROGRESS, String.valueOf(Instant.now().plus(Duration.ofDays(1)).toEpochMilli()), 1 - ), + )), MetadataStep.MigrateTemplates.class ), // We were unable to create the CMS entry ourselves, so we do not have the work lease - Arguments.of(null, MetadataStep.GetEntry.class) + Arguments.of(Optional.empty(), MetadataStep.GetEntry.class) ); } @ParameterizedTest @MethodSource("provideCreateEntryArgs") - void CreateEntry_AsExpected(CmsEntry.Metadata createdEntry, Class nextStepClass) { + void CreateEntry_AsExpected(Optional createdEntry, Class nextStepClass) { // Set up the test Mockito.when(testMembers.cmsClient.createMetadataEntry()).thenReturn(createdEntry); @@ -207,28 +208,28 @@ static Stream provideAcquireLeaseArgs() { return Stream.of( // We were able to acquire the lease Arguments.of( - new CmsEntry.Metadata( + Optional.of(new CmsEntry.Metadata( CmsEntry.MetadataStatus.IN_PROGRESS, String.valueOf(Instant.now().plus(Duration.ofDays(1)).toEpochMilli()), 1 - ), + )), MetadataStep.MigrateTemplates.class ), // We were unable to acquire the lease - Arguments.of(null, MetadataStep.RandomWait.class) + Arguments.of(Optional.empty(), MetadataStep.RandomWait.class) ); } @ParameterizedTest @MethodSource("provideAcquireLeaseArgs") - void AcquireLease_AsExpected(CmsEntry.Metadata updatedEntry, Class nextStepClass) { + void AcquireLease_AsExpected(Optional updatedEntry, Class nextStepClass) { // Set up the test - CmsEntry.Metadata existingEntry = new CmsEntry.Metadata( + var existingEntry = Optional.of(new CmsEntry.Metadata( CmsEntry.MetadataStatus.IN_PROGRESS, CmsEntry.Metadata.getLeaseExpiry(0L, CmsEntry.Metadata.MAX_ATTEMPTS - 1), CmsEntry.Metadata.MAX_ATTEMPTS - 1 - ); + )); testMembers.cmsEntry = existingEntry; Mockito.when(testMembers.cmsClient.updateMetadataEntry( @@ -253,28 +254,28 @@ static Stream provideMigrateTemplatesArgs() { return Stream.of( // We were able to acquire the lease Arguments.of( - new CmsEntry.Metadata( + Optional.of(new CmsEntry.Metadata( CmsEntry.MetadataStatus.COMPLETED, String.valueOf(42), 1 - ), + )), MetadataStep.ExitPhaseSuccess.class ), // We were unable to acquire the lease - Arguments.of(null, MetadataStep.GetEntry.class) + Arguments.of(Optional.empty(), MetadataStep.GetEntry.class) ); } @ParameterizedTest @MethodSource("provideMigrateTemplatesArgs") - void MigrateTemplates_AsExpected(CmsEntry.Metadata updatedEntry, Class nextStepClass) { + void MigrateTemplates_AsExpected(Optional updatedEntry, Class nextStepClass) { // Set up the test - CmsEntry.Metadata existingEntry = new CmsEntry.Metadata( + var existingEntry = Optional.of(new CmsEntry.Metadata( CmsEntry.MetadataStatus.IN_PROGRESS, String.valueOf(42), 1 - ); + )); testMembers.cmsEntry = existingEntry; GlobalMetadata.Data testGlobalMetadata = Mockito.mock(GlobalMetadata.Data.class); @@ -285,8 +286,8 @@ void MigrateTemplates_AsExpected(CmsEntry.Metadata updatedEntry, Class nextSt Mockito.when(testMembers.transformer.transformGlobalMetadata(testNode)).thenReturn(testTransformedNode); Mockito.when(testMembers.cmsClient.updateMetadataEntry( CmsEntry.MetadataStatus.COMPLETED, - existingEntry.leaseExpiry, - existingEntry.numAttempts + existingEntry.get().leaseExpiry, + existingEntry.get().numAttempts )).thenReturn(updatedEntry); @@ -317,8 +318,8 @@ void MigrateTemplates_AsExpected(CmsEntry.Metadata updatedEntry, Class nextSt ); Mockito.verify(testMembers.cmsClient, times(1)).updateMetadataEntry( CmsEntry.MetadataStatus.COMPLETED, - existingEntry.leaseExpiry, - existingEntry.numAttempts + existingEntry.get().leaseExpiry, + existingEntry.get().numAttempts ); Mockito.verify(testMembers.globalState, times(1)).updateWorkItem( null @@ -368,11 +369,11 @@ void ExitPhaseFailed_AsExpected() { // Set up the test MaxAttemptsExceeded e = new MaxAttemptsExceeded(); - CmsEntry.Metadata existingEntry = new CmsEntry.Metadata( + var existingEntry = Optional.of(new CmsEntry.Metadata( CmsEntry.MetadataStatus.IN_PROGRESS, String.valueOf(42), 1 - ); + )); testMembers.cmsEntry = existingEntry; // Run the test @@ -385,8 +386,8 @@ void ExitPhaseFailed_AsExpected() { // Check the results Mockito.verify(testMembers.cmsClient, times(1)).updateMetadataEntry( CmsEntry.MetadataStatus.FAILED, - existingEntry.leaseExpiry, - existingEntry.numAttempts + existingEntry.get().leaseExpiry, + existingEntry.get().numAttempts ); Mockito.verify(testMembers.globalState, times(1)).updatePhase( GlobalState.Phase.METADATA_FAILED diff --git a/RFS/src/test/java/com/rfs/worker/SnapshotStepTest.java b/RFS/src/test/java/com/rfs/worker/SnapshotStepTest.java index 87697a2d4..d944443c4 100644 --- a/RFS/src/test/java/com/rfs/worker/SnapshotStepTest.java +++ b/RFS/src/test/java/com/rfs/worker/SnapshotStepTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.*; +import java.util.Optional; import java.util.stream.Stream; @@ -42,15 +43,15 @@ void setUp() { static Stream provideEnterPhaseArgs() { return Stream.of( - Arguments.of(null, SnapshotStep.CreateEntry.class), - Arguments.of(new Snapshot("test", SnapshotStatus.NOT_STARTED), SnapshotStep.InitiateSnapshot.class), - Arguments.of(new Snapshot("test", SnapshotStatus.IN_PROGRESS), SnapshotStep.WaitForSnapshot.class) + Arguments.of(Optional.empty(), SnapshotStep.CreateEntry.class), + Arguments.of(Optional.of(new Snapshot("test", SnapshotStatus.NOT_STARTED)), SnapshotStep.InitiateSnapshot.class), + Arguments.of(Optional.of(new Snapshot("test", SnapshotStatus.IN_PROGRESS)), SnapshotStep.WaitForSnapshot.class) ); } @ParameterizedTest @MethodSource("provideEnterPhaseArgs") - void EnterPhase_AsExpected(Snapshot snapshotEntry, Class expected) { + void EnterPhase_AsExpected(Optional snapshotEntry, Class expected) { // Run the test SnapshotStep.EnterPhase testStep = new SnapshotStep.EnterPhase(testMembers, snapshotEntry); testStep.run();