diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/EndToEndTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/EndToEndTest.java index bb2a3be42..dc03639c3 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/EndToEndTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/EndToEndTest.java @@ -180,5 +180,28 @@ private void checkDocsWithRouting( Assertions.assertEquals("1", routing); } } - + + private static Stream scenarios_experimental() { + var scenarios = Stream.builder(); + + for (var sourceCluster : SupportedClusters.sources()) { + scenarios.add(Arguments.of(sourceCluster, SearchClusterContainer.ES_V6_8_23)); + } + + return scenarios.build(); + } + + // @ParameterizedTest(name = "Source {0} to Target {1}") + // @MethodSource(value = "scenarios_experimental") + // public void migrationDocumentsExperimental( + // final SearchClusterContainer.ContainerVersion sourceVersion, + // final SearchClusterContainer.ContainerVersion targetVersion) throws Exception { + // try ( + // final var sourceCluster = new SearchClusterContainer(sourceVersion); + // final var targetCluster = new SearchClusterContainer(targetVersion) + // ) { + // migrationDocumentsWithClusters(sourceCluster, targetCluster); + // } + // } + } diff --git a/MetadataMigration/src/test/java/org/opensearch/migrations/MultiTypeMappingTransformationTest.java b/MetadataMigration/src/test/java/org/opensearch/migrations/MultiTypeMappingTransformationTest.java index a1a50cdbe..2e2afdc1a 100644 --- a/MetadataMigration/src/test/java/org/opensearch/migrations/MultiTypeMappingTransformationTest.java +++ b/MetadataMigration/src/test/java/org/opensearch/migrations/MultiTypeMappingTransformationTest.java @@ -47,8 +47,8 @@ public void multiTypeTransformationTest_union() { indexCreatedOperations.createDocument(originalIndexName, "2", "{\"field1\":\"string\", \"field2\":123}", null, "type2"); indexCreatedOperations.createDocument(originalIndexName, "3", "{\"field3\":1.1}", null, "type3"); - indexCreatedOperations.createSnapshotRepository(SearchClusterContainer.CLUSTER_SNAPSHOT_DIR, es5Repo); - indexCreatedOperations.takeSnapshot(es5Repo, snapshotName, originalIndexName); + indexCreatedOperations.registerSnapshotRepository(SearchClusterContainer.CLUSTER_SNAPSHOT_DIR, es5Repo); + indexCreatedOperations.createSnapshot(es5Repo, snapshotName, originalIndexName); indexCreatedCluster.copySnapshotData(localDirectory.toString()); } @@ -66,7 +66,7 @@ public void multiTypeTransformationTest_union() { var upgradedSourceOperations = new ClusterOperations(upgradedSourceCluster.getUrl()); // Register snapshot repository and restore snapshot in ES 6 cluster - upgradedSourceOperations.createSnapshotRepository(SearchClusterContainer.CLUSTER_SNAPSHOT_DIR, es5Repo); + upgradedSourceOperations.registerSnapshotRepository(SearchClusterContainer.CLUSTER_SNAPSHOT_DIR, es5Repo); upgradedSourceOperations.restoreSnapshot(es5Repo, snapshotName); // Verify index exists on upgraded cluster diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java index aee2eecbc..68226cdaa 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java @@ -247,9 +247,10 @@ public void createSnapshot( String repoName, String snapshotName, ObjectNode settings, + boolean waitForCompletion, IRfsContexts.ICreateSnapshotContext context ) { - String targetPath = SNAPSHOT_PREFIX_STR + repoName + "/" + snapshotName; + String targetPath = SNAPSHOT_PREFIX_STR + repoName + "/" + snapshotName + "?wait_for_completion=" + String.valueOf(waitForCompletion).toLowerCase(); client.putAsync(targetPath, settings.toString(), context.createSnapshotContext()).flatMap(resp -> { if (resp.statusCode == HttpURLConnection.HTTP_OK) { return Mono.just(resp); diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java index 79b549827..40df30001 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java @@ -69,7 +69,7 @@ public void createSnapshot() { // Create the snapshot; idempotent operation try { - client.createSnapshot(getRepoName(), snapshotName, body, context); + client.createSnapshot(getRepoName(), snapshotName, body, false, context); log.atInfo().setMessage("Snapshot {} creation initiated").addArgument(snapshotName).log(); } catch (Exception e) { log.atError().setCause(e).setMessage("Snapshot {} creation failed").addArgument(snapshotName).log(); diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_6_8/OpenSearchWorkCoordinator_ES_6_8.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_6_8/OpenSearchWorkCoordinator_ES_6_8.java new file mode 100644 index 000000000..faa4bcab0 --- /dev/null +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_6_8/OpenSearchWorkCoordinator_ES_6_8.java @@ -0,0 +1,92 @@ +package org.opensearch.migrations.bulkload.version_es_6_8; + +import java.time.Clock; +import java.util.function.Consumer; + +import org.opensearch.migrations.bulkload.workcoordination.AbstractedHttpClient; +import org.opensearch.migrations.bulkload.workcoordination.OpenSearchWorkCoordinator; + +import com.fasterxml.jackson.databind.JsonNode; + +public class OpenSearchWorkCoordinator_ES_6_8 extends OpenSearchWorkCoordinator { + public OpenSearchWorkCoordinator_ES_6_8( + AbstractedHttpClient httpClient, + long tolerableClientServerClockDifferenceSeconds, + String workerId + ) { + super(httpClient, tolerableClientServerClockDifferenceSeconds, workerId); + } + + public OpenSearchWorkCoordinator_ES_6_8( + AbstractedHttpClient httpClient, + long tolerableClientServerClockDifferenceSeconds, + String workerId, + Clock clock + ) { + super(httpClient, tolerableClientServerClockDifferenceSeconds, workerId, clock); + } + + + public OpenSearchWorkCoordinator_ES_6_8( + AbstractedHttpClient httpClient, + long tolerableClientServerClockDifferenceSeconds, + String workerId, + Clock clock, + Consumer workItemConsumer + ) { + super(httpClient, tolerableClientServerClockDifferenceSeconds, workerId, clock, workItemConsumer); + } + + protected String getCoordinationIndexSettingsBody(){ + return "{\n" + + " \"settings\": {\n" + + " \"index\": {" + + " \"number_of_shards\": 1,\n" + + " \"number_of_replicas\": 1\n" + + " }\n" + + " },\n" + + " \"mappings\": {\n" + + " \"doc\": {\n" + + " \"properties\": {\n" + + " \"" + EXPIRATION_FIELD_NAME + "\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"" + COMPLETED_AT_FIELD_NAME + "\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"leaseHolderId\": {\n" + + " \"type\": \"keyword\",\n" + + " \"norms\": false\n" + + " },\n" + + " \"status\": {\n" + + " \"type\": \"keyword\",\n" + + " \"norms\": false\n" + + " },\n" + + " \"" + SUCCESSOR_ITEMS_FIELD_NAME + "\": {\n" + + " \"type\": \"keyword\",\n" + + " \"norms\": false\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + "}\n"; + } + + protected String getPathForUpdates(String workItemId) { + return INDEX_NAME + "/doc/" + workItemId + "/_update"; + } + + protected String getPathForGets(String workItemId) { + return INDEX_NAME + "/doc/" + workItemId; + } + + protected String getPathForSearches() { + return INDEX_NAME + "/doc/_search"; + } + + protected int getTotalHitsFromSearchResponse(JsonNode searchResponse) { + return searchResponse.path("hits").path("total").intValue(); + } + + +} diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorFactory.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorFactory.java index 178ee4bea..05f8ee817 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorFactory.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorFactory.java @@ -5,6 +5,7 @@ import org.opensearch.migrations.Version; import org.opensearch.migrations.VersionMatchers; +import org.opensearch.migrations.bulkload.version_es_6_8.OpenSearchWorkCoordinator_ES_6_8; import org.opensearch.migrations.bulkload.version_os_2_11.OpenSearchWorkCoordinator_OS_2_11; import org.opensearch.migrations.bulkload.workcoordination.IWorkCoordinator.WorkItemAndDuration; @@ -23,6 +24,8 @@ public OpenSearchWorkCoordinator get( ) { if (VersionMatchers.isOS_1_X.test(version) || VersionMatchers.isOS_2_X.test(version)) { return new OpenSearchWorkCoordinator_OS_2_11(httpClient, tolerableClientServerClockDifferenceSeconds, workerId); + } else if (VersionMatchers.isES_6_X.test(version)) { + return new OpenSearchWorkCoordinator_ES_6_8(httpClient, tolerableClientServerClockDifferenceSeconds, workerId); } else { throw new IllegalArgumentException("Unsupported version: " + version); } @@ -36,6 +39,8 @@ public OpenSearchWorkCoordinator get( ) { if (VersionMatchers.isOS_1_X.test(version) || VersionMatchers.isOS_2_X.test(version)) { return new OpenSearchWorkCoordinator_OS_2_11(httpClient, tolerableClientServerClockDifferenceSeconds, workerId, clock); + } else if (VersionMatchers.isES_6_X.test(version)) { + return new OpenSearchWorkCoordinator_ES_6_8(httpClient, tolerableClientServerClockDifferenceSeconds, workerId, clock); } else { throw new IllegalArgumentException("Unsupported version: " + version); } @@ -50,6 +55,8 @@ public OpenSearchWorkCoordinator get( ) { if (VersionMatchers.isOS_1_X.test(version) || VersionMatchers.isOS_2_X.test(version)) { return new OpenSearchWorkCoordinator_OS_2_11(httpClient, tolerableClientServerClockDifferenceSeconds, workerId, clock, workItemConsumer); + } else if (VersionMatchers.isES_6_X.test(version)) { + return new OpenSearchWorkCoordinator_ES_6_8(httpClient, tolerableClientServerClockDifferenceSeconds, workerId, clock, workItemConsumer); } else { throw new IllegalArgumentException("Unsupported version: " + version); } diff --git a/RFS/src/test/java/org/opensearch/migrations/bulkload/integration/SnapshotStateTest.java b/RFS/src/test/java/org/opensearch/migrations/bulkload/integration/SnapshotStateTest.java index d9721a880..ccc3287c4 100644 --- a/RFS/src/test/java/org/opensearch/migrations/bulkload/integration/SnapshotStateTest.java +++ b/RFS/src/test/java/org/opensearch/migrations/bulkload/integration/SnapshotStateTest.java @@ -53,7 +53,7 @@ public void setUp() throws Exception { // Configure operations and rfs implementation operations = new ClusterOperations(cluster.getUrl()); - operations.createSnapshotRepository(SearchClusterContainer.CLUSTER_SNAPSHOT_DIR, "test-repo"); + operations.registerSnapshotRepository(SearchClusterContainer.CLUSTER_SNAPSHOT_DIR, "test-repo"); srfs = new SimpleRestoreFromSnapshot_ES_7_10(); } @@ -73,7 +73,7 @@ public void SingleSnapshot_SingleDocument() throws Exception { final var snapshotName = "snapshot-1"; final var repoName = "test-repo"; - operations.takeSnapshot(repoName, snapshotName, indexName); + operations.createSnapshot(repoName, snapshotName, indexName); final File snapshotCopy = new File(localDirectory + "/snapshotCopy"); cluster.copySnapshotData(snapshotCopy.getAbsolutePath()); @@ -112,7 +112,7 @@ public void SingleSnapshot_SingleDocument_Then_DeletedDocument() throws Exceptio operations.deleteDocument(indexName, document1Id); final var snapshotName = "snapshot-delete-item"; var repoName = "test-repo"; - operations.takeSnapshot(repoName, snapshotName, indexName); + operations.createSnapshot(repoName, snapshotName, indexName); final File snapshotCopy = new File(localDirectory + "/snapshotCopy"); cluster.copySnapshotData(snapshotCopy.getAbsolutePath()); @@ -148,7 +148,7 @@ public void SingleSnapshot_SingleDocument_Then_UpdateDocument() throws Exception final var snapshotName = "snapshot-delete-item"; final var repoName = "test-repo"; - operations.takeSnapshot(repoName, snapshotName, indexName); + operations.createSnapshot(repoName, snapshotName, indexName); final File snapshotCopy = new File(localDirectory + "/snapshotCopy"); cluster.copySnapshotData(snapshotCopy.getAbsolutePath()); diff --git a/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoodinatorTest.java b/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoodinatorTest.java index b3133354d..18d1d4302 100644 --- a/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoodinatorTest.java +++ b/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoodinatorTest.java @@ -29,7 +29,10 @@ class OpenSearchWorkCoodinatorTest { public static final String THROTTLE_RESULT_VALUE = "slow your roll, dude"; - public static final List testedVersions = SupportedClusters.targets().stream().map(ContainerVersion::getVersion).collect(Collectors.toList()); + public static final List testedVersions = Stream.concat( + SupportedClusters.targets().stream().map(ContainerVersion::getVersion), // All the officially supported versions + Stream.of(Version.fromString("ES 6.8")) // The "experimental" versions + ).collect(Collectors.toUnmodifiableList()); static Stream provideTestedVersions() { return testedVersions.stream().map(Arguments::of); diff --git a/RFS/src/testFixtures/java/org/opensearch/migrations/bulkload/http/ClusterOperations.java b/RFS/src/testFixtures/java/org/opensearch/migrations/bulkload/http/ClusterOperations.java index ca87c7ab7..782c26414 100644 --- a/RFS/src/testFixtures/java/org/opensearch/migrations/bulkload/http/ClusterOperations.java +++ b/RFS/src/testFixtures/java/org/opensearch/migrations/bulkload/http/ClusterOperations.java @@ -5,6 +5,13 @@ import java.util.Map; import java.util.Optional; +import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory; +import org.opensearch.migrations.bulkload.common.http.ConnectionContext; +import org.opensearch.migrations.bulkload.tracing.IRfsContexts; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.SneakyThrows; import org.apache.hc.client5.http.classic.methods.HttpDelete; import org.apache.hc.client5.http.classic.methods.HttpGet; @@ -15,6 +22,10 @@ import org.apache.hc.core5.http.io.entity.EntityUtils; import org.apache.hc.core5.http.io.entity.StringEntity; +import static org.mockito.Mockito.mock; + +import org.junit.jupiter.api.Assertions; + import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -24,17 +35,27 @@ */ public class ClusterOperations { + private final ObjectMapper objectMapper = new ObjectMapper(); + private final String clusterUrl; private final CloseableHttpClient httpClient; + private final OpenSearchClient client; + public ClusterOperations(final String clusterUrl) { this.clusterUrl = clusterUrl; httpClient = HttpClients.createDefault(); + + + // Create an instance of TargetArgs with no auth + ConnectionContext.TargetArgs targetArgs = new ConnectionContext.TargetArgs(); + targetArgs.host = clusterUrl; + ConnectionContext connectionContext = targetArgs.toConnectionContext(); + client = new OpenSearchClientFactory(connectionContext).determineVersionAndCreate(); } - public void createSnapshotRepository(final String repoPath, final String repoName) throws IOException { - // Create snapshot repository - final var repositoryJson = "{\n" + public void registerSnapshotRepository(final String repoPath, final String repoName) throws IOException { + final var settingsJson = "{\n" + " \"type\": \"fs\",\n" + " \"settings\": {\n" + " \"location\": \"" @@ -44,12 +65,12 @@ public void createSnapshotRepository(final String repoPath, final String repoNam + " }\n" + "}"; - final var createRepoRequest = new HttpPut(clusterUrl + "/_snapshot/" + repoName); - createRepoRequest.setEntity(new StringEntity(repositoryJson)); - createRepoRequest.setHeader("Content-Type", "application/json"); + ObjectNode settings = (ObjectNode) objectMapper.readTree(settingsJson); - try (var response = httpClient.execute(createRepoRequest)) { - assertThat(response.getCode(), equalTo(200)); + try { + client.registerSnapshotRepo(repoName, settings, mock(IRfsContexts.ICreateSnapshotContext.class)); + } catch (Exception e) { + Assertions.fail("Snapshot Registration failed - " + e.getClass().getName() + ": " + e.getMessage()); } } @@ -147,23 +168,20 @@ public Map.Entry get(final String path) { } } - public void takeSnapshot(final String repoName, final String snapshotName, final String indexPattern) throws IOException { - final var snapshotJson = "{\n" + public void createSnapshot(final String repoName, final String snapshotName, final String indexPattern) throws IOException { + final var settingsJson = "{\n" + " \"indices\": \"" + indexPattern + "\",\n" + " \"ignore_unavailable\": true,\n" + " \"include_global_state\": true\n" + "}"; + ObjectNode settings = (ObjectNode) objectMapper.readTree(settingsJson); - final var createSnapshotRequest = new HttpPut( - clusterUrl + "/_snapshot/" + repoName + "/" + snapshotName + "?wait_for_completion=true" - ); - createSnapshotRequest.setEntity(new StringEntity(snapshotJson)); - createSnapshotRequest.setHeader("Content-Type", "application/json"); - - try (var response = httpClient.execute(createSnapshotRequest)) { - assertThat(response.getCode(), equalTo(200)); + try { + client.createSnapshot(repoName, snapshotName, settings, true, mock(IRfsContexts.ICreateSnapshotContext.class)); + } catch (Exception e) { + Assertions.fail("Snapshot Registration failed - " + e.getClass().getName() + ": " + e.getMessage()); } }