Skip to content

Commit

Permalink
More updates per PR discussion
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Helma <chelma+github@amazon.com>
  • Loading branch information
chelma committed May 28, 2024
1 parent 872db82 commit 8ab93f8
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 45 deletions.
44 changes: 9 additions & 35 deletions RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,68 +20,42 @@ public OpenSearchCmsClient(OpenSearchClient client) {
public Optional<CmsEntry.Snapshot> createSnapshotEntry(String snapshotName) {
OpenSearchCmsEntry.Snapshot newEntry = OpenSearchCmsEntry.Snapshot.getInitial(snapshotName);
Optional<ObjectNode> createdEntry = client.createDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID, newEntry.toJson());

if (createdEntry.isPresent()) {
return Optional.of(OpenSearchCmsEntry.Snapshot.fromJson(createdEntry.get()));
} else {
return Optional.empty();
}
return createdEntry.map(OpenSearchCmsEntry.Snapshot::fromJson);
}

@Override
public Optional<CmsEntry.Snapshot> getSnapshotEntry(String snapshotName) {
Optional<ObjectNode> document = client.getDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID);
if (document.isEmpty()) {
return Optional.empty();
}

ObjectNode sourceNode = (ObjectNode) document.get().get("_source");
return Optional.of(OpenSearchCmsEntry.Snapshot.fromJson(sourceNode));
return document.map(doc -> (ObjectNode) doc.get("_source"))
.map(OpenSearchCmsEntry.Snapshot::fromJson);
}

@Override
public Optional<CmsEntry.Snapshot> updateSnapshotEntry(String snapshotName, CmsEntry.SnapshotStatus status) {
OpenSearchCmsEntry.Snapshot entry = new OpenSearchCmsEntry.Snapshot(snapshotName, status);
Optional<ObjectNode> updatedEntry = client.updateDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID, entry.toJson());
if (updatedEntry.isEmpty()) {
return Optional.empty();
}
return Optional.of(OpenSearchCmsEntry.Snapshot.fromJson(updatedEntry.get()));
return updatedEntry.map(OpenSearchCmsEntry.Snapshot::fromJson);
}

@Override
public Optional<CmsEntry.Metadata> createMetadataEntry() {
OpenSearchCmsEntry.Metadata entry = OpenSearchCmsEntry.Metadata.getInitial();
Optional<ObjectNode> createdEntry = client.createDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID, entry.toJson());

if (createdEntry.isEmpty()) {
return Optional.empty();
}
return Optional.of(OpenSearchCmsEntry.Metadata.fromJson(createdEntry.get()));
return createdEntry.map(OpenSearchCmsEntry.Metadata::fromJson);

}

@Override
public Optional<CmsEntry.Metadata> getMetadataEntry() {
Optional<ObjectNode> document = client.getDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID);

if (document.isEmpty()) {
return Optional.empty();
}

ObjectNode sourceNode = (ObjectNode) document.get().get("_source");
return Optional.of(OpenSearchCmsEntry.Metadata.fromJson(sourceNode));
return document.map(doc -> (ObjectNode) doc.get("_source"))
.map(OpenSearchCmsEntry.Metadata::fromJson);
}

@Override
public Optional<CmsEntry.Metadata> updateMetadataEntry(CmsEntry.MetadataStatus status, String leaseExpiry, Integer numAttempts) {
OpenSearchCmsEntry.Metadata metadata = new OpenSearchCmsEntry.Metadata(status, leaseExpiry, numAttempts);
Optional<ObjectNode> updatedEntry = client.updateDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID, metadata.toJson());

if (updatedEntry.isEmpty()) {
return Optional.empty();
}
return Optional.of(OpenSearchCmsEntry.Metadata.fromJson(updatedEntry.get()));
}

return updatedEntry.map(OpenSearchCmsEntry.Metadata::fromJson);
}
}
28 changes: 21 additions & 7 deletions RFS/src/main/java/com/rfs/common/OpenSearchClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ private Optional<ObjectNode> createObjectIdempotent(String objectPath, ObjectNod
}

/*
* Register a snapshot repository. Returns an Optional; if the repository was registered, it will be the settings
* and empty otherwise.
* Attempts to register a snapshot repository; no-op if the repo already exists. Returns an Optional; if the repo
* was created, it will be the settings used and empty if it already existed.
*/
public Optional<ObjectNode> registerSnapshotRepo(String repoName, ObjectNode settings){
String targetPath = "_snapshot/" + repoName;
client.putAsync(targetPath, settings.toString())
RestClient.Response response = client.putAsync(targetPath, settings.toString())
.flatMap(resp -> {
if (resp.code == HttpURLConnection.HTTP_OK || resp.code == HttpURLConnection.HTTP_CREATED) {
return Mono.just(resp);
Expand All @@ -109,15 +109,21 @@ public Optional<ObjectNode> registerSnapshotRepo(String repoName, ObjectNode set
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(10)))
.block();

return Optional.of(settings);
if (response.code == HttpURLConnection.HTTP_CREATED) {
return Optional.of(settings);
} else {
logger.info("Snapshot repo already exists. Registration is a no-op.");
return Optional.empty();
}
}

/*
* Create a snapshot. Returns an Optional; if the snapshot was created, it will be the settings and empty otherwise.
* Attempts to create a snapshot; no-op if the snapshot already exists. Returns an Optional; if the snapshot
* was created, it will be the settings used and empty if it already existed.
*/
public Optional<ObjectNode> createSnapshot(String repoName, String snapshotName, ObjectNode settings){
String targetPath = "_snapshot/" + repoName + "/" + snapshotName;
client.putAsync(targetPath, settings.toString())
RestClient.Response response = client.putAsync(targetPath, settings.toString())
.flatMap(resp -> {
if (resp.code == HttpURLConnection.HTTP_OK || resp.code == HttpURLConnection.HTTP_CREATED) {
return Mono.just(resp);
Expand All @@ -131,7 +137,13 @@ public Optional<ObjectNode> createSnapshot(String repoName, String snapshotName,
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(10)))
.block();

return Optional.of(settings);

if (response.code == HttpURLConnection.HTTP_CREATED) {
return Optional.of(settings);
} else {
logger.info("Snapshot already exists. Creation is a no-op.");
return Optional.empty();
}
}

/*
Expand Down Expand Up @@ -191,6 +203,7 @@ public Optional<ObjectNode> createDocument(String indexName, String documentId,
return Optional.of(body);
} else {
// The only response code that can end up here is HTTP_CONFLICT, as everything is an error above
// This indicates that the document already exists
return Optional.empty();
}
}
Expand Down Expand Up @@ -271,6 +284,7 @@ public Optional<ObjectNode> updateDocument(String indexName, String documentId,
return Optional.of(body);
} else {
// The only response code that can end up here is HTTP_CONFLICT, as everything is an error above
// This indicates that we didn't acquire the optimistic lock
return Optional.empty();
}
}
Expand Down
3 changes: 0 additions & 3 deletions RFS/src/main/java/com/rfs/worker/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@

import org.apache.logging.log4j.Logger;

import java.util.Arrays;
import java.util.Optional;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.rfs.cms.CmsEntry;
import com.rfs.common.RfsException;

Expand Down

0 comments on commit 8ab93f8

Please sign in to comment.