Skip to content

Commit

Permalink
Used Optional to clean up some RFS Worker library code
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Helma <chelma+github@amazon.com>
  • Loading branch information
chelma committed May 27, 2024
1 parent 127613d commit fe266eb
Show file tree
Hide file tree
Showing 12 changed files with 289 additions and 228 deletions.
36 changes: 20 additions & 16 deletions RFS/src/main/java/com/rfs/cms/CmsClient.java
Original file line number Diff line number Diff line change
@@ -1,41 +1,45 @@
package com.rfs.cms;

import java.util.Optional;

/*
* Client to connect to and work with the Coordinating Metadata Store. The CMS could be implemented by any reasonable
* data store option (Postgres, AWS DynamoDB, Elasticsearch/Opensearch, etc).
*/
public interface CmsClient {
/*
* Creates a new entry in the CMS for the Snapshot's progress. Returns true if we created the entry, and false if
* the entry already exists.
* Creates a new entry in the CMS for the Snapshot's progress. Returns an Optional; if the document was created, it
* will be the created object and empty otherwise.
*/
public CmsEntry.Snapshot createSnapshotEntry(String snapshotName);
public Optional<CmsEntry.Snapshot> createSnapshotEntry(String snapshotName);

/*
* Attempt to retrieve the Snapshot entry from the CMS, if it exists; null if it doesn't currently exist
* Attempt to retrieve the Snapshot entry from the CMS. Returns an Optional; if the document exists, it will be the
* retrieved entry and empty otherwise.
*/
public CmsEntry.Snapshot getSnapshotEntry(String snapshotName);
public Optional<CmsEntry.Snapshot> getSnapshotEntry(String snapshotName);

/*
* Updates the status of the Snapshot entry in the CMS. Returns true if the update was successful, and false if
* something else updated it before we could
* Updates the Snapshot entry in the CMS. Returns an Optional; if the document was updated, it will be
* the updated entry and empty otherwise.
*/
public CmsEntry.Snapshot updateSnapshotEntry(String snapshotName, CmsEntry.SnapshotStatus status);
public Optional<CmsEntry.Snapshot> updateSnapshotEntry(String snapshotName, CmsEntry.SnapshotStatus status);

/*
* Creates a new entry in the CMS for the Metadata Migration's progress. Returns the created entry if we created it,
* and null if the entry already exists.
* Creates a new entry in the CMS for the Metadata Migration's progress. Returns an Optional; if the document was
* created, it will be the created entry and empty otherwise.
*/
public CmsEntry.Metadata createMetadataEntry();
public Optional<CmsEntry.Metadata> createMetadataEntry();

/*
* Attempt to retrieve the Metadata Migration entry from the CMS, if it exists; null if it doesn't currently exist
* Attempt to retrieve the Metadata Migration entry from the CMS, if it exists. Returns an Optional; if the document
* exists, it will be the retrieved entry and empty otherwise.
*/
public CmsEntry.Metadata getMetadataEntry();
public Optional<CmsEntry.Metadata> getMetadataEntry();

/*
* Updates all fields of the Metadata Migration entry in the CMS. Returns the updated entry if successful, and
* null if something else updated it before we could
* Updates the Metadata Migration entry in the CMS. Returns an Optional; if the document was updated,
* it will be the updated entry and empty otherwise.
*/
public CmsEntry.Metadata updateMetadataEntry(CmsEntry.MetadataStatus status, String leaseExpiry, Integer numAttempts);
public Optional<CmsEntry.Metadata> updateMetadataEntry(CmsEntry.MetadataStatus status, String leaseExpiry, Integer numAttempts);
}
76 changes: 37 additions & 39 deletions RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.rfs.cms;

import java.net.HttpURLConnection;
import java.util.Optional;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.RestClient;

public class OpenSearchCmsClient implements CmsClient {
public static final String CMS_INDEX_NAME = "cms-reindex-from-snapshot";
Expand All @@ -17,73 +17,71 @@ public OpenSearchCmsClient(OpenSearchClient client) {
}

@Override
public CmsEntry.Snapshot createSnapshotEntry(String snapshotName) {
public Optional<CmsEntry.Snapshot> createSnapshotEntry(String snapshotName) {
OpenSearchCmsEntry.Snapshot newEntry = OpenSearchCmsEntry.Snapshot.getInitial(snapshotName);
boolean createdEntry = client.createDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID, newEntry.toJson());
if (createdEntry) {
return newEntry;
Optional<ObjectNode> createdEntry = client.createDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID, newEntry.toJson());

if (createdEntry.isPresent()) {
return Optional.of(OpenSearchCmsEntry.Snapshot.fromJson(createdEntry.get()));
} else {
return null;
return Optional.empty();
}
}

@Override
public CmsEntry.Snapshot getSnapshotEntry(String snapshotName) {
RestClient.Response response = client.getDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID);

if (response.code == HttpURLConnection.HTTP_NOT_FOUND) {
return null;
public Optional<CmsEntry.Snapshot> getSnapshotEntry(String snapshotName) {
Optional<ObjectNode> document = client.getDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID);
if (document.isEmpty()) {
return Optional.empty();
}
return OpenSearchCmsEntry.Snapshot.fromJsonString(response.body);

ObjectNode sourceNode = (ObjectNode) document.get().get("_source");
return Optional.of(OpenSearchCmsEntry.Snapshot.fromJson(sourceNode));
}

@Override
public CmsEntry.Snapshot updateSnapshotEntry(String snapshotName, CmsEntry.SnapshotStatus status) {
public Optional<CmsEntry.Snapshot> updateSnapshotEntry(String snapshotName, CmsEntry.SnapshotStatus status) {
OpenSearchCmsEntry.Snapshot entry = new OpenSearchCmsEntry.Snapshot(snapshotName, status);
boolean updatedEntry = client.updateDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID, entry.toJson());
if (updatedEntry) {
return entry;
} else {
return null;
Optional<ObjectNode> updatedEntry = client.updateDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID, entry.toJson());
if (updatedEntry.isEmpty()) {
return Optional.empty();
}
return Optional.of(OpenSearchCmsEntry.Snapshot.fromJson(updatedEntry.get()));
}

@Override
public OpenSearchCmsEntry.Metadata createMetadataEntry() {
public Optional<CmsEntry.Metadata> createMetadataEntry() {
OpenSearchCmsEntry.Metadata entry = OpenSearchCmsEntry.Metadata.getInitial();
Optional<ObjectNode> createdEntry = client.createDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID, entry.toJson());

boolean docCreated = client.createDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID, entry.toJson());

if (docCreated) {
return entry;
} else {
return null;
if (createdEntry.isEmpty()) {
return Optional.empty();
}
return Optional.of(OpenSearchCmsEntry.Metadata.fromJson(createdEntry.get()));

}

@Override
public CmsEntry.Metadata getMetadataEntry() {
RestClient.Response response = client.getDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID);
public Optional<CmsEntry.Metadata> getMetadataEntry() {
Optional<ObjectNode> document = client.getDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID);

if (response.code == HttpURLConnection.HTTP_NOT_FOUND) {
return null;
if (document.isEmpty()) {
return Optional.empty();
}
return OpenSearchCmsEntry.Metadata.fromJsonString(response.body);

ObjectNode sourceNode = (ObjectNode) document.get().get("_source");
return Optional.of(OpenSearchCmsEntry.Metadata.fromJson(sourceNode));
}

@Override
public CmsEntry.Metadata updateMetadataEntry(CmsEntry.MetadataStatus status, String leaseExpiry, Integer numAttempts) {
public Optional<CmsEntry.Metadata> updateMetadataEntry(CmsEntry.MetadataStatus status, String leaseExpiry, Integer numAttempts) {
OpenSearchCmsEntry.Metadata metadata = new OpenSearchCmsEntry.Metadata(status, leaseExpiry, numAttempts);
Optional<ObjectNode> updatedEntry = client.updateDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID, metadata.toJson());

boolean updatedDoc = client.updateDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID, metadata.toJson());

if (updatedDoc) {
return metadata;
} else {
return null;
if (updatedEntry.isEmpty()) {
return Optional.empty();
}

return Optional.of(OpenSearchCmsEntry.Metadata.fromJson(updatedEntry.get()));
}

}
24 changes: 9 additions & 15 deletions RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@ public static Snapshot getInitial(String name) {
return new Snapshot(name, CmsEntry.SnapshotStatus.NOT_STARTED);
}

public static Snapshot fromJsonString(String json) {
public static Snapshot fromJson(ObjectNode node) {
try {
ObjectNode node = objectMapper.readValue(json, ObjectNode.class);
ObjectNode sourceNode = (ObjectNode) node.get("_source");

return new Snapshot(
sourceNode.get(FIELD_STATUS).asText(),
CmsEntry.SnapshotStatus.valueOf(sourceNode.get(FIELD_STATUS).asText())
node.get(FIELD_STATUS).asText(),
CmsEntry.SnapshotStatus.valueOf(node.get(FIELD_STATUS).asText())
);
} catch (Exception e) {
throw new CantParseCmsEntryFromJson(Snapshot.class, json, e);
throw new CantParseCmsEntryFromJson(Snapshot.class, node.toString(), e);
}
}

Expand Down Expand Up @@ -63,18 +60,15 @@ public static Metadata getInitial() {
);
}

public static Metadata fromJsonString(String json) {
public static Metadata fromJson(ObjectNode node) {
try {
ObjectNode node = objectMapper.readValue(json, ObjectNode.class);
ObjectNode sourceNode = (ObjectNode) node.get("_source");

return new Metadata(
CmsEntry.MetadataStatus.valueOf(sourceNode.get(FIELD_STATUS).asText()),
sourceNode.get(FIELD_LEASE_EXPIRY).asText(),
sourceNode.get(FIELD_NUM_ATTEMPTS).asInt()
CmsEntry.MetadataStatus.valueOf(node.get(FIELD_STATUS).asText()),
node.get(FIELD_LEASE_EXPIRY).asText(),
node.get(FIELD_NUM_ATTEMPTS).asInt()
);
} catch (Exception e) {
throw new CantParseCmsEntryFromJson(Metadata.class, json, e);
throw new CantParseCmsEntryFromJson(Metadata.class, node.toString(), e);
}
}

Expand Down
Loading

0 comments on commit fe266eb

Please sign in to comment.