Skip to content

Commit

Permalink
Stashing changes on branch for Mikayla
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Helma <chelma+github@amazon.com>
  • Loading branch information
chelma committed Jan 24, 2025
1 parent c6a98d4 commit f492422
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,5 +180,28 @@ private void checkDocsWithRouting(
Assertions.assertEquals("1", routing);
}
}


private static Stream<Arguments> scenarios_experimental() {
var scenarios = Stream.<Arguments>builder();

for (var sourceCluster : SupportedClusters.sources()) {
scenarios.add(Arguments.of(sourceCluster, SearchClusterContainer.ES_V6_8_23));
}

return scenarios.build();
}

// @ParameterizedTest(name = "Source {0} to Target {1}")
// @MethodSource(value = "scenarios_experimental")
// public void migrationDocumentsExperimental(
// final SearchClusterContainer.ContainerVersion sourceVersion,
// final SearchClusterContainer.ContainerVersion targetVersion) throws Exception {
// try (
// final var sourceCluster = new SearchClusterContainer(sourceVersion);
// final var targetCluster = new SearchClusterContainer(targetVersion)
// ) {
// migrationDocumentsWithClusters(sourceCluster, targetCluster);
// }
// }

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public void multiTypeTransformationTest_union() {
indexCreatedOperations.createDocument(originalIndexName, "2", "{\"field1\":\"string\", \"field2\":123}", null, "type2");
indexCreatedOperations.createDocument(originalIndexName, "3", "{\"field3\":1.1}", null, "type3");

indexCreatedOperations.createSnapshotRepository(SearchClusterContainer.CLUSTER_SNAPSHOT_DIR, es5Repo);
indexCreatedOperations.takeSnapshot(es5Repo, snapshotName, originalIndexName);
indexCreatedOperations.registerSnapshotRepository(SearchClusterContainer.CLUSTER_SNAPSHOT_DIR, es5Repo);
indexCreatedOperations.createSnapshot(es5Repo, snapshotName, originalIndexName);
indexCreatedCluster.copySnapshotData(localDirectory.toString());
}

Expand All @@ -66,7 +66,7 @@ public void multiTypeTransformationTest_union() {
var upgradedSourceOperations = new ClusterOperations(upgradedSourceCluster.getUrl());

// Register snapshot repository and restore snapshot in ES 6 cluster
upgradedSourceOperations.createSnapshotRepository(SearchClusterContainer.CLUSTER_SNAPSHOT_DIR, es5Repo);
upgradedSourceOperations.registerSnapshotRepository(SearchClusterContainer.CLUSTER_SNAPSHOT_DIR, es5Repo);
upgradedSourceOperations.restoreSnapshot(es5Repo, snapshotName);

// Verify index exists on upgraded cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,10 @@ public void createSnapshot(
String repoName,
String snapshotName,
ObjectNode settings,
boolean waitForCompletion,
IRfsContexts.ICreateSnapshotContext context
) {
String targetPath = SNAPSHOT_PREFIX_STR + repoName + "/" + snapshotName;
String targetPath = SNAPSHOT_PREFIX_STR + repoName + "/" + snapshotName + "?wait_for_completion=" + String.valueOf(waitForCompletion).toLowerCase();
client.putAsync(targetPath, settings.toString(), context.createSnapshotContext()).flatMap(resp -> {
if (resp.statusCode == HttpURLConnection.HTTP_OK) {
return Mono.just(resp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void createSnapshot() {

// Create the snapshot; idempotent operation
try {
client.createSnapshot(getRepoName(), snapshotName, body, context);
client.createSnapshot(getRepoName(), snapshotName, body, false, context);
log.atInfo().setMessage("Snapshot {} creation initiated").addArgument(snapshotName).log();
} catch (Exception e) {
log.atError().setCause(e).setMessage("Snapshot {} creation failed").addArgument(snapshotName).log();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package org.opensearch.migrations.bulkload.version_es_6_8;

import java.time.Clock;
import java.util.function.Consumer;

import org.opensearch.migrations.bulkload.workcoordination.AbstractedHttpClient;
import org.opensearch.migrations.bulkload.workcoordination.OpenSearchWorkCoordinator;

import com.fasterxml.jackson.databind.JsonNode;

public class OpenSearchWorkCoordinator_ES_6_8 extends OpenSearchWorkCoordinator {
public OpenSearchWorkCoordinator_ES_6_8(
AbstractedHttpClient httpClient,
long tolerableClientServerClockDifferenceSeconds,
String workerId
) {
super(httpClient, tolerableClientServerClockDifferenceSeconds, workerId);
}

public OpenSearchWorkCoordinator_ES_6_8(
AbstractedHttpClient httpClient,
long tolerableClientServerClockDifferenceSeconds,
String workerId,
Clock clock
) {
super(httpClient, tolerableClientServerClockDifferenceSeconds, workerId, clock);
}


public OpenSearchWorkCoordinator_ES_6_8(
AbstractedHttpClient httpClient,
long tolerableClientServerClockDifferenceSeconds,
String workerId,
Clock clock,
Consumer<WorkItemAndDuration> workItemConsumer
) {
super(httpClient, tolerableClientServerClockDifferenceSeconds, workerId, clock, workItemConsumer);
}

protected String getCoordinationIndexSettingsBody(){
return "{\n"
+ " \"settings\": {\n"
+ " \"index\": {"
+ " \"number_of_shards\": 1,\n"
+ " \"number_of_replicas\": 1\n"
+ " }\n"
+ " },\n"
+ " \"mappings\": {\n"
+ " \"doc\": {\n"
+ " \"properties\": {\n"
+ " \"" + EXPIRATION_FIELD_NAME + "\": {\n"
+ " \"type\": \"long\"\n"
+ " },\n"
+ " \"" + COMPLETED_AT_FIELD_NAME + "\": {\n"
+ " \"type\": \"long\"\n"
+ " },\n"
+ " \"leaseHolderId\": {\n"
+ " \"type\": \"keyword\",\n"
+ " \"norms\": false\n"
+ " },\n"
+ " \"status\": {\n"
+ " \"type\": \"keyword\",\n"
+ " \"norms\": false\n"
+ " },\n"
+ " \"" + SUCCESSOR_ITEMS_FIELD_NAME + "\": {\n"
+ " \"type\": \"keyword\",\n"
+ " \"norms\": false\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ "}\n";
}

protected String getPathForUpdates(String workItemId) {
return INDEX_NAME + "/doc/" + workItemId + "/_update";
}

protected String getPathForGets(String workItemId) {
return INDEX_NAME + "/doc/" + workItemId;
}

protected String getPathForSearches() {
return INDEX_NAME + "/doc/_search";
}

protected int getTotalHitsFromSearchResponse(JsonNode searchResponse) {
return searchResponse.path("hits").path("total").intValue();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import org.opensearch.migrations.Version;
import org.opensearch.migrations.VersionMatchers;
import org.opensearch.migrations.bulkload.version_es_6_8.OpenSearchWorkCoordinator_ES_6_8;
import org.opensearch.migrations.bulkload.version_os_2_11.OpenSearchWorkCoordinator_OS_2_11;
import org.opensearch.migrations.bulkload.workcoordination.IWorkCoordinator.WorkItemAndDuration;

Expand All @@ -23,6 +24,8 @@ public OpenSearchWorkCoordinator get(
) {
if (VersionMatchers.isOS_1_X.test(version) || VersionMatchers.isOS_2_X.test(version)) {
return new OpenSearchWorkCoordinator_OS_2_11(httpClient, tolerableClientServerClockDifferenceSeconds, workerId);
} else if (VersionMatchers.isES_6_X.test(version)) {
return new OpenSearchWorkCoordinator_ES_6_8(httpClient, tolerableClientServerClockDifferenceSeconds, workerId);
} else {
throw new IllegalArgumentException("Unsupported version: " + version);
}
Expand All @@ -36,6 +39,8 @@ public OpenSearchWorkCoordinator get(
) {
if (VersionMatchers.isOS_1_X.test(version) || VersionMatchers.isOS_2_X.test(version)) {
return new OpenSearchWorkCoordinator_OS_2_11(httpClient, tolerableClientServerClockDifferenceSeconds, workerId, clock);
} else if (VersionMatchers.isES_6_X.test(version)) {
return new OpenSearchWorkCoordinator_ES_6_8(httpClient, tolerableClientServerClockDifferenceSeconds, workerId, clock);
} else {
throw new IllegalArgumentException("Unsupported version: " + version);
}
Expand All @@ -50,6 +55,8 @@ public OpenSearchWorkCoordinator get(
) {
if (VersionMatchers.isOS_1_X.test(version) || VersionMatchers.isOS_2_X.test(version)) {
return new OpenSearchWorkCoordinator_OS_2_11(httpClient, tolerableClientServerClockDifferenceSeconds, workerId, clock, workItemConsumer);
} else if (VersionMatchers.isES_6_X.test(version)) {
return new OpenSearchWorkCoordinator_ES_6_8(httpClient, tolerableClientServerClockDifferenceSeconds, workerId, clock, workItemConsumer);
} else {
throw new IllegalArgumentException("Unsupported version: " + version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void setUp() throws Exception {

// Configure operations and rfs implementation
operations = new ClusterOperations(cluster.getUrl());
operations.createSnapshotRepository(SearchClusterContainer.CLUSTER_SNAPSHOT_DIR, "test-repo");
operations.registerSnapshotRepository(SearchClusterContainer.CLUSTER_SNAPSHOT_DIR, "test-repo");
srfs = new SimpleRestoreFromSnapshot_ES_7_10();
}

Expand All @@ -73,7 +73,7 @@ public void SingleSnapshot_SingleDocument() throws Exception {

final var snapshotName = "snapshot-1";
final var repoName = "test-repo";
operations.takeSnapshot(repoName, snapshotName, indexName);
operations.createSnapshot(repoName, snapshotName, indexName);

final File snapshotCopy = new File(localDirectory + "/snapshotCopy");
cluster.copySnapshotData(snapshotCopy.getAbsolutePath());
Expand Down Expand Up @@ -112,7 +112,7 @@ public void SingleSnapshot_SingleDocument_Then_DeletedDocument() throws Exceptio
operations.deleteDocument(indexName, document1Id);
final var snapshotName = "snapshot-delete-item";
var repoName = "test-repo";
operations.takeSnapshot(repoName, snapshotName, indexName);
operations.createSnapshot(repoName, snapshotName, indexName);

final File snapshotCopy = new File(localDirectory + "/snapshotCopy");
cluster.copySnapshotData(snapshotCopy.getAbsolutePath());
Expand Down Expand Up @@ -148,7 +148,7 @@ public void SingleSnapshot_SingleDocument_Then_UpdateDocument() throws Exception

final var snapshotName = "snapshot-delete-item";
final var repoName = "test-repo";
operations.takeSnapshot(repoName, snapshotName, indexName);
operations.createSnapshot(repoName, snapshotName, indexName);

final File snapshotCopy = new File(localDirectory + "/snapshotCopy");
cluster.copySnapshotData(snapshotCopy.getAbsolutePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
class OpenSearchWorkCoodinatorTest {

public static final String THROTTLE_RESULT_VALUE = "slow your roll, dude";
public static final List<Version> testedVersions = SupportedClusters.targets().stream().map(ContainerVersion::getVersion).collect(Collectors.toList());
public static final List<Version> testedVersions = Stream.concat(
SupportedClusters.targets().stream().map(ContainerVersion::getVersion), // All the officially supported versions
Stream.of(Version.fromString("ES 6.8")) // The "experimental" versions
).collect(Collectors.toUnmodifiableList());

static Stream<Arguments> provideTestedVersions() {
return testedVersions.stream().map(Arguments::of);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
import java.util.Map;
import java.util.Optional;

import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
import org.opensearch.migrations.bulkload.tracing.IRfsContexts;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.SneakyThrows;
import org.apache.hc.client5.http.classic.methods.HttpDelete;
import org.apache.hc.client5.http.classic.methods.HttpGet;
Expand All @@ -15,6 +22,10 @@
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.entity.StringEntity;

import static org.mockito.Mockito.mock;

import org.junit.jupiter.api.Assertions;

import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -24,17 +35,27 @@
*/
public class ClusterOperations {

private final ObjectMapper objectMapper = new ObjectMapper();

private final String clusterUrl;
private final CloseableHttpClient httpClient;
private final OpenSearchClient client;


public ClusterOperations(final String clusterUrl) {
this.clusterUrl = clusterUrl;
httpClient = HttpClients.createDefault();


// Create an instance of TargetArgs with no auth
ConnectionContext.TargetArgs targetArgs = new ConnectionContext.TargetArgs();
targetArgs.host = clusterUrl;
ConnectionContext connectionContext = targetArgs.toConnectionContext();
client = new OpenSearchClientFactory(connectionContext).determineVersionAndCreate();
}

public void createSnapshotRepository(final String repoPath, final String repoName) throws IOException {
// Create snapshot repository
final var repositoryJson = "{\n"
public void registerSnapshotRepository(final String repoPath, final String repoName) throws IOException {
final var settingsJson = "{\n"
+ " \"type\": \"fs\",\n"
+ " \"settings\": {\n"
+ " \"location\": \""
Expand All @@ -44,12 +65,12 @@ public void createSnapshotRepository(final String repoPath, final String repoNam
+ " }\n"
+ "}";

final var createRepoRequest = new HttpPut(clusterUrl + "/_snapshot/" + repoName);
createRepoRequest.setEntity(new StringEntity(repositoryJson));
createRepoRequest.setHeader("Content-Type", "application/json");
ObjectNode settings = (ObjectNode) objectMapper.readTree(settingsJson);

try (var response = httpClient.execute(createRepoRequest)) {
assertThat(response.getCode(), equalTo(200));
try {
client.registerSnapshotRepo(repoName, settings, mock(IRfsContexts.ICreateSnapshotContext.class));
} catch (Exception e) {
Assertions.fail("Snapshot Registration failed - " + e.getClass().getName() + ": " + e.getMessage());
}
}

Expand Down Expand Up @@ -147,23 +168,20 @@ public Map.Entry<Integer, String> get(final String path) {
}
}

public void takeSnapshot(final String repoName, final String snapshotName, final String indexPattern) throws IOException {
final var snapshotJson = "{\n"
public void createSnapshot(final String repoName, final String snapshotName, final String indexPattern) throws IOException {
final var settingsJson = "{\n"
+ " \"indices\": \""
+ indexPattern
+ "\",\n"
+ " \"ignore_unavailable\": true,\n"
+ " \"include_global_state\": true\n"
+ "}";
ObjectNode settings = (ObjectNode) objectMapper.readTree(settingsJson);

final var createSnapshotRequest = new HttpPut(
clusterUrl + "/_snapshot/" + repoName + "/" + snapshotName + "?wait_for_completion=true"
);
createSnapshotRequest.setEntity(new StringEntity(snapshotJson));
createSnapshotRequest.setHeader("Content-Type", "application/json");

try (var response = httpClient.execute(createSnapshotRequest)) {
assertThat(response.getCode(), equalTo(200));
try {
client.createSnapshot(repoName, snapshotName, settings, true, mock(IRfsContexts.ICreateSnapshotContext.class));
} catch (Exception e) {
Assertions.fail("Snapshot Registration failed - " + e.getClass().getName() + ": " + e.getMessage());
}
}

Expand Down

0 comments on commit f492422

Please sign in to comment.