Skip to content

Commit

Permalink
[RFS] Updated RFS to perform coordinated index migration (opensearch-…
Browse files Browse the repository at this point in the history
…project#693)

* Coordinated index creation working; need to add whitelist and tests

Signed-off-by: Chris Helma <chelma+github@amazon.com>

* Fixed unit tests; addred refresh before RFS searches

Signed-off-by: Chris Helma <chelma+github@amazon.com>

* Unit tested coordinated RFS index creation

Signed-off-by: Chris Helma <chelma+github@amazon.com>

* Updated per PR discussion

Signed-off-by: Chris Helma <chelma+github@amazon.com>

---------

Signed-off-by: Chris Helma <chelma+github@amazon.com>
  • Loading branch information
chelma authored Jun 6, 2024
1 parent 4c93fc8 commit 22d3f74
Show file tree
Hide file tree
Showing 25 changed files with 1,847 additions and 158 deletions.
4 changes: 2 additions & 2 deletions RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,12 @@ public static void main(String[] args) {
Map<String, IndexMetadata.Data> indexMetadatas = new HashMap<>();
if (sourceVersion == ClusterVersion.ES_6_8) {
for (SnapshotRepo.Index index : repoDataProvider.getIndicesInSnapshot(snapshotName)) {
IndexMetadata.Data indexMetadata = new IndexMetadataFactory_ES_6_8().fromRepo(repo, repoDataProvider, snapshotName, index.getName());
IndexMetadata.Data indexMetadata = new IndexMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName, index.getName());
indexMetadatas.put(index.getName(), indexMetadata);
}
} else {
for (SnapshotRepo.Index index : repoDataProvider.getIndicesInSnapshot(snapshotName)) {
IndexMetadata.Data indexMetadata = new IndexMetadataFactory_ES_7_10().fromRepo(repo, repoDataProvider, snapshotName, index.getName());
IndexMetadata.Data indexMetadata = new IndexMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName, index.getName());
indexMetadatas.put(index.getName(), indexMetadata);
}
}
Expand Down
8 changes: 4 additions & 4 deletions RFS/src/main/java/com/rfs/ReindexFromSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,9 @@ public static void main(String[] args) throws InterruptedException {
logger.info("Reading Index Metadata for index: " + index.getName());
IndexMetadata.Data indexMetadata;
if (sourceVersion == ClusterVersion.ES_6_8) {
indexMetadata = new IndexMetadataFactory_ES_6_8().fromRepo(repo, repoDataProvider, snapshotName, index.getName());
indexMetadata = new IndexMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName, index.getName());
} else {
indexMetadata = new IndexMetadataFactory_ES_7_10().fromRepo(repo, repoDataProvider, snapshotName, index.getName());
indexMetadata = new IndexMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName, index.getName());
}
indexMetadatas.add(indexMetadata);
}
Expand All @@ -305,14 +305,14 @@ public static void main(String[] args) throws InterruptedException {
// ==========================================================================================================
logger.info("==================================================================");
logger.info("Attempting to recreate the indices...");
IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient);
for (IndexMetadata.Data indexMetadata : indexMetadatas) {
String reindexName = indexMetadata.getName() + indexSuffix;
logger.info("Recreating index " + indexMetadata.getName() + " as " + reindexName + " on target...");

ObjectNode root = indexMetadata.toObjectNode();
ObjectNode transformedRoot = transformer.transformIndexMetadata(root);
IndexMetadataData_OS_2_11 indexMetadataOS211 = new IndexMetadataData_OS_2_11(transformedRoot, indexMetadata.getId(), reindexName);
IndexCreator_OS_2_11.create(reindexName, indexMetadataOS211, targetClient);
indexCreator.create(transformedRoot, reindexName, indexMetadata.getId());
}
}

Expand Down
15 changes: 14 additions & 1 deletion RFS/src/main/java/com/rfs/RunRfsWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.rfs.common.ClusterVersion;
import com.rfs.common.ConnectionDetails;
import com.rfs.common.GlobalMetadata;
import com.rfs.common.IndexMetadata;
import com.rfs.common.Logging;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.S3Uri;
Expand All @@ -33,9 +34,12 @@
import com.rfs.transformers.TransformFunctions;
import com.rfs.transformers.Transformer;
import com.rfs.version_es_7_10.GlobalMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10;
import com.rfs.version_os_2_11.GlobalMetadataCreator_OS_2_11;
import com.rfs.version_os_2_11.IndexCreator_OS_2_11;
import com.rfs.worker.GlobalState;
import com.rfs.worker.IndexRunner;
import com.rfs.worker.MetadataRunner;
import com.rfs.worker.Runner;
import com.rfs.worker.SnapshotRunner;
Expand Down Expand Up @@ -75,7 +79,11 @@ public static class Args {
@Parameter(names = {"--target-password"}, description = "Optional. The target password; if not provided, will assume no auth on target", required = false)
public String targetPass = null;

@Parameter(names = {"--index-template-whitelist"}, description = ("Optional. List of template names to migrate"
@Parameter(names = {"--index-whitelist"}, description = ("Optional. List of index names to migrate"
+ " (e.g. 'logs_2024_01, logs_2024_02'). Default: all indices"), required = false)
public List<String> indexWhitelist = List.of();

@Parameter(names = {"--index-template-whitelist"}, description = ("Optional. List of index template names to migrate"
+ " (e.g. 'posts_index_template1, posts_index_template2'). Default: empty list"), required = false)
public List<String> indexTemplateWhitelist = List.of();

Expand Down Expand Up @@ -139,6 +147,11 @@ public static void main(String[] args) throws Exception {
Transformer transformer = TransformFunctions.getTransformer(ClusterVersion.ES_7_10, ClusterVersion.OS_2_11, awarenessDimensionality);
MetadataRunner metadataWorker = new MetadataRunner(globalState, cmsClient, snapshotName, metadataFactory, metadataCreator, transformer);
metadataWorker.run();

IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient);
IndexRunner indexWorker = new IndexRunner(globalState, cmsClient, snapshotName, indexMetadataFactory, indexCreator, transformer);
indexWorker.run();

} catch (Runner.PhaseFailed e) {
logPhaseFailureRecord(e.phase, e.nextStep, e.cmsEntry, e.e);
Expand Down
46 changes: 44 additions & 2 deletions RFS/src/main/java/com/rfs/cms/CmsClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.rfs.cms;

import java.util.List;
import java.util.Optional;

/*
Expand All @@ -23,7 +24,7 @@ public interface CmsClient {
* Updates the Snapshot entry in the CMS. Returns an Optional; if the document was updated, it will be
* the updated entry and empty otherwise.
*/
public Optional<CmsEntry.Snapshot> updateSnapshotEntry(String snapshotName, CmsEntry.SnapshotStatus status);
public Optional<CmsEntry.Snapshot> updateSnapshotEntry(CmsEntry.Snapshot newEntry, CmsEntry.Snapshot lastEntry);

/*
* Creates a new entry in the CMS for the Metadata Migration's progress. Returns an Optional; if the document was
Expand All @@ -41,5 +42,46 @@ public interface CmsClient {
* Updates the Metadata Migration entry in the CMS. Returns an Optional; if the document was updated,
* it will be the updated entry and empty otherwise.
*/
public Optional<CmsEntry.Metadata> updateMetadataEntry(CmsEntry.MetadataStatus status, String leaseExpiry, Integer numAttempts);
public Optional<CmsEntry.Metadata> updateMetadataEntry(CmsEntry.Metadata newEntry, CmsEntry.Metadata lastEntry);

/*
* Creates a new entry in the CMS for the Index Migration's progress. Returns an Optional; if the document was
* created, it will be the created entry and empty otherwise.
*/
public Optional<CmsEntry.Index> createIndexEntry();

/*
* Attempt to retrieve the Index Migration entry from the CMS, if it exists. Returns an Optional; if the document
* exists, it will be the retrieved entry and empty otherwise.
*/
public Optional<CmsEntry.Index> getIndexEntry();

/*
* Updates the Index Migration entry in the CMS. Returns an Optional; if the document was updated,
* it will be the updated entry and empty otherwise.
*/
public Optional<CmsEntry.Index> updateIndexEntry(CmsEntry.Index newEntry, CmsEntry.Index lastEntry);

/*
* Creates a new entry in the CMS for an Index Work Item. Returns an Optional; if the document was
* created, it will be the created entry and empty otherwise.
*/
public Optional<CmsEntry.IndexWorkItem> createIndexWorkItem(String name, int numShards);

/*
* Updates the Index Work Item in the CMS. Returns an Optional; if the document was updated,
* it will be the updated entry and empty otherwise.
*/
public Optional<CmsEntry.IndexWorkItem> updateIndexWorkItem(CmsEntry.IndexWorkItem newEntry, CmsEntry.IndexWorkItem lastEntry);

/*
* Forcefully updates the Index Work Item in the CMS. This method should be used when you don't care about collisions
* and just want to overwrite the existing entry no matter what. Returns the updated entry.
*/
public CmsEntry.IndexWorkItem updateIndexWorkItemForceful(CmsEntry.IndexWorkItem newEntry);

/*
* Retrieves a set of Index Work Items from the CMS that appear ready to be worked on, up to the specified limit.
*/
public List<CmsEntry.IndexWorkItem> getAvailableIndexWorkItems(int maxItems);
}
153 changes: 135 additions & 18 deletions RFS/src/main/java/com/rfs/cms/CmsEntry.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,64 @@
package com.rfs.cms;


import com.rfs.common.RfsException;

public class CmsEntry {
public static enum EntryType {
SNAPSHOT,
METADATA,
INDEX,
INDEX_WORK_ITEM,
}

public abstract static class Base {
protected Base() {}

@Override
public abstract String toString();

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || !(obj instanceof Base)) {
return false;
}
Base other = (Base) obj;
return this.toString().equals(other.toString());
}

@Override
public int hashCode() {
return this.toString().hashCode();
}
}

/*
* Provides a base class for leasable entry types. Doesn't allow for customization of lease mechanics, but it's
* unclear how to achieve that in Java given the constraints around static methods.
*/
public abstract static class Leasable extends Base {
public static final int LEASE_MS = 1 * 60 * 1000; // 1 minute, arbitrarily chosen
public static final int MAX_ATTEMPTS = 3; // arbitrarily chosen

protected Leasable() {}

public static int getLeaseDurationMs(int numAttempts) {
if (numAttempts > MAX_ATTEMPTS) {
throw new CouldNotFindNextLeaseDuration("numAttempts=" + numAttempts + " is greater than MAX_ATTEMPTS=" + MAX_ATTEMPTS);
} else if (numAttempts < 1) {
throw new CouldNotFindNextLeaseDuration("numAttempts=" + numAttempts + " is less than 1");
}
return LEASE_MS * numAttempts; // Arbitratily chosen algorithm
}

// TODO: We should be ideally setting the lease expiry using the server's clock, but it's unclear on the best
// way to do this. For now, we'll just use the client's clock.
public static String getLeaseExpiry(long currentTime, int numAttempts) {
return Long.toString(currentTime + getLeaseDurationMs(numAttempts));
}
}

public static enum SnapshotStatus {
Expand All @@ -15,7 +68,11 @@ public static enum SnapshotStatus {
FAILED,
}

/*
* Used to track the progress of taking a snapshot of the source cluster
*/
public static class Snapshot extends Base {
public final EntryType type = EntryType.SNAPSHOT;
public final String name;
public final SnapshotStatus status;

Expand All @@ -28,8 +85,9 @@ public Snapshot(String name, SnapshotStatus status) {
@Override
public String toString() {
return "Snapshot("
+ "type='" + type.toString() + ","
+ "name='" + name + ","
+ "status=" + status +
+ "status=" + status.toString() +
")";
}
}
Expand All @@ -40,30 +98,50 @@ public static enum MetadataStatus {
FAILED,
}

public static class Metadata extends Base {
public static final int METADATA_LEASE_MS = 1 * 60 * 1000; // 1 minute, arbitrarily chosen
public static final int MAX_ATTEMPTS = 3; // arbitrarily chosen
/*
* Used to track the progress of migrating all the templates from the source cluster
*/
public static class Metadata extends Leasable {
public final EntryType type = EntryType.METADATA;
public final MetadataStatus status;
public final String leaseExpiry;
public final Integer numAttempts;

public static int getLeaseDurationMs(int numAttempts) {
if (numAttempts > MAX_ATTEMPTS) {
throw new CouldNotFindNextLeaseDuration("numAttempts=" + numAttempts + " is greater than MAX_ATTEMPTS=" + MAX_ATTEMPTS);
} else if (numAttempts < 1) {
throw new CouldNotFindNextLeaseDuration("numAttempts=" + numAttempts + " is less than 1");
}
return METADATA_LEASE_MS * numAttempts; // Arbitratily chosen algorithm
public Metadata(MetadataStatus status, String leaseExpiry, int numAttempts) {
super();
this.status = status;
this.leaseExpiry = leaseExpiry;
this.numAttempts = numAttempts;
}

// TODO: We should be ideally setting the lease expiry using the server's clock, but it's unclear on the best
// way to do this. For now, we'll just use the client's clock.
public static String getLeaseExpiry(long currentTime, int numAttempts) {
return Long.toString(currentTime + getLeaseDurationMs(numAttempts));
@Override
public String toString() {
return "Metadata("
+ "type='" + type.toString() + ","
+ "status=" + status.toString() + ","
+ "leaseExpiry=" + leaseExpiry + ","
+ "numAttempts=" + numAttempts.toString() +
")";
}
}

public final MetadataStatus status;
public static enum IndexStatus {
SETUP,
IN_PROGRESS,
COMPLETED,
FAILED,
}

/*
* Used to track the progress of migrating all the indices from the soruce cluster
*/
public static class Index extends Leasable {
public final EntryType type = EntryType.INDEX;
public final IndexStatus status;
public final String leaseExpiry;
public final Integer numAttempts;

public Metadata(MetadataStatus status, String leaseExpiry, int numAttempts) {
public Index(IndexStatus status, String leaseExpiry, int numAttempts) {
super();
this.status = status;
this.leaseExpiry = leaseExpiry;
Expand All @@ -72,14 +150,53 @@ public Metadata(MetadataStatus status, String leaseExpiry, int numAttempts) {

@Override
public String toString() {
return "Metadata("
return "Index("
+ "type='" + type.toString() + ","
+ "status=" + status.toString() + ","
+ "leaseExpiry=" + leaseExpiry + ","
+ "numAttempts=" + numAttempts.toString() +
")";
}
}

public static enum IndexWorkItemStatus {
NOT_STARTED,
COMPLETED,
FAILED,
}

/*
* Used to track the migration of a particular index from the source cluster
*/
public static class IndexWorkItem extends Base {
public final EntryType type = EntryType.INDEX_WORK_ITEM;
public static final int ATTEMPTS_SOFT_LIMIT = 3; // will make at least this many attempts; arbitrarily chosen

public final String name;
public final IndexWorkItemStatus status;
public final Integer numAttempts;
public final Integer numShards;

public IndexWorkItem(String name, IndexWorkItemStatus status, int numAttempts, int numShards) {
super();
this.name = name;
this.status = status;
this.numAttempts = numAttempts;
this.numShards = numShards;
}

@Override
public String toString() {
return "IndexWorkItem("
+ "type='" + type.toString() + ","
+ "name=" + name.toString() + ","
+ "status=" + status.toString() + ","
+ "numAttempts=" + numAttempts.toString() + ","
+ "numShards=" + numShards.toString() +
")";
}
}

public static class CouldNotFindNextLeaseDuration extends RfsException {
public CouldNotFindNextLeaseDuration(String message) {
super("Could not find next lease duration. Reason: " + message);
Expand Down
Loading

0 comments on commit 22d3f74

Please sign in to comment.