Skip to content

Commit

Permalink
Updated RDS Doc Migration entrypoint to use local snapshot
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Helma <chelma+github@amazon.com>
  • Loading branch information
chelma committed Jul 1, 2024
1 parent 3ec71a7 commit 7700c65
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 12 deletions.
16 changes: 16 additions & 0 deletions DocumentsFromSnapshotMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,27 @@ test {
}

task slowTest(type: Test) {
doFirst {
println "Running slow tests..."
}

useJUnitPlatform()
dependsOn(':TrafficCapture:dockerSolution:buildDockerImage_elasticsearchTestConsole')
jacoco {
enabled = true
}
testLogging {
events "passed", "skipped", "failed"
exceptionFormat "full"
showExceptions true
showCauses true
showStackTraces true
// showStandardStreams = true
}
reports {
html.required = true
html.destination file("${buildDir}/reports/tests/slowTest")
}
}

jacocoTestReport {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;

import java.io.IOException;
import java.net.URI;
Expand All @@ -22,6 +23,7 @@

import com.rfs.common.DefaultSourceRepoAccessor;
import com.rfs.common.DocumentReindexer;
import com.rfs.common.FileSystemRepo;
import com.rfs.common.IndexMetadata;
import com.rfs.common.LuceneDocumentsReader;
import com.rfs.common.OpenSearchClient;
Expand Down Expand Up @@ -49,20 +51,33 @@ public static class Args {
description = "The name of the snapshot to migrate")
public String snapshotName;

@Parameter(names = {"--snapshot-local-dir"},
required = false,
description = ("The absolute path to the directory on local disk where the snapshot exists. Use this parameter"
+ " if have a copy of the snapshot disk. Mutually exclusive with --s3-local-dir, --s3-repo-uri, and --s3-region."
))
public String snapshotLocalDirPath = null;

@Parameter(names = {"--s3-local-dir"},
required = true,
description = "The absolute path to the directory on local disk to download S3 files to")
public String s3LocalDirPath;
required = false,
description = ("The absolute path to the directory on local disk to download S3 files to. If you supply this, you must"
+ " also supply --s3-repo-uri and --s3-region. Mutually exclusive with --snapshot-local-dir."
))
public String s3LocalDirPath = null;

@Parameter(names = {"--s3-repo-uri"},
required = true,
description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2")
public String s3RepoUri;
required = false,
description = ("The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2. If you supply this, you must"
+ " also supply --s3-local-dir and --s3-region. Mutually exclusive with --snapshot-local-dir."
))
public String s3RepoUri = null;

@Parameter(names = {"--s3-region"},
required = true,
description = "The AWS Region the S3 bucket is in, like: us-east-2")
public String s3Region;
required = false,
description = ("The AWS Region the S3 bucket is in, like: us-east-2. If you supply this, you must"
+ " also supply --s3-local-dir and --s3-repo-uri. Mutually exclusive with --snapshot-local-dir."
))
public String s3Region = null;

@Parameter(names = {"--lucene-dir"},
required = true,
Expand Down Expand Up @@ -97,14 +112,34 @@ public NoWorkLeftException(String message) {
}
}

public static void validateArgs(Args args) {
boolean isSnapshotLocalDirProvided = args.snapshotLocalDirPath != null;
boolean areAllS3ArgsProvided = args.s3LocalDirPath != null && args.s3RepoUri != null && args.s3Region != null;
boolean areAnyS3ArgsProvided = args.s3LocalDirPath != null || args.s3RepoUri != null || args.s3Region != null;

if (isSnapshotLocalDirProvided && areAnyS3ArgsProvided) {
throw new ParameterException("You must provide either --snapshot-local-dir or --s3-local-dir, --s3-repo-uri, and --s3-region, but not both.");
}

if (areAnyS3ArgsProvided && !areAllS3ArgsProvided) {
throw new ParameterException("If provide the S3 Snapshot args, you must provide all of them (--s3-local-dir, --s3-repo-uri and --s3-region).");
}

if (!isSnapshotLocalDirProvided && !areAllS3ArgsProvided) {
throw new ParameterException("You must provide either --snapshot-local-dir or --s3-local-dir, --s3-repo-uri, and --s3-region.");
}

}

public static void main(String[] args) throws Exception {
// Grab out args
Args arguments = new Args();
JCommander.newBuilder()
.addObject(arguments)
.build()
.parse(args);

validateArgs(arguments);

var luceneDirPath = Paths.get(arguments.luceneDirPath);
try (var processManager = new LeaseExpireTrigger(workItemId->{
log.error("terminating RunRfsWorker because its lease has expired for " + workItemId);
Expand All @@ -120,8 +155,16 @@ public static void main(String[] args) throws Exception {
new OpenSearchClient(arguments.targetHost, arguments.targetUser, arguments.targetPass, false);
DocumentReindexer reindexer = new DocumentReindexer(targetClient);

SourceRepo sourceRepo = S3Repo.create(Paths.get(arguments.s3LocalDirPath),
new S3Uri(arguments.s3RepoUri), arguments.s3Region);
SourceRepo sourceRepo;
if (arguments.snapshotLocalDirPath == null) {
sourceRepo = S3Repo.create(
Paths.get(arguments.s3LocalDirPath),
new S3Uri(arguments.s3RepoUri),
arguments.s3Region
);
} else {
sourceRepo = new FileSystemRepo(Paths.get(arguments.snapshotLocalDirPath));
}
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);

IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
Expand Down
102 changes: 102 additions & 0 deletions DocumentsFromSnapshotMigration/src/test/java/com/rfs/FullTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.slf4j.event.Level;
import reactor.core.publisher.Flux;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
Expand All @@ -65,6 +66,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.UnaryOperator;
Expand All @@ -79,6 +81,7 @@ public class FullTest {
final static long TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 3600;
final static Pattern CAT_INDICES_INDEX_COUNT_PATTERN =
Pattern.compile("(?:\\S+\\s+){2}(\\S+)\\s+(?:\\S+\\s+){3}(\\S+)");
<<<<<<< Updated upstream
public static final String SOURCE_SERVER_ALIAS = "source";
public static final int MAX_SHARD_SIZE_BYTES = 64 * 1024 * 1024;

Expand Down Expand Up @@ -114,6 +117,25 @@ public void test(SearchClusterContainer.Version baseSourceImageVersion,

try (var esSourceContainer = new PreloadedSearchClusterContainer(baseSourceImageVersion,
SOURCE_SERVER_ALIAS, generatorImage, generatorArgs);
=======
final static List<String> SOURCE_IMAGE_NAMES = List.of("migrations/elasticsearch_rfs_source");
final static List<String> TARGET_IMAGE_NAMES = List.of("opensearchproject/opensearch:2.13.0", "opensearchproject/opensearch:1.3.0");

public static Stream<Arguments> makeDocumentMigrationArgs() {
var numWorkers = List.of(1, 3, 40);
return SOURCE_IMAGE_NAMES.stream()
.flatMap(a->
TARGET_IMAGE_NAMES.stream().flatMap(b->
numWorkers.stream().map(c->Arguments.of(a, b, c))));
}

@ParameterizedTest
@MethodSource("makeDocumentMigrationArgs")
public void testDocumentMigration(String sourceImageName, String targetImageName, int numWorkers) throws Exception {
try (ElasticsearchContainer esSourceContainer =
new ElasticsearchContainer(new ElasticsearchContainer.Version(sourceImageName,
"preloaded-ES_7_10"));
>>>>>>> Stashed changes
OpensearchContainer<?> osTargetContainer =
new OpensearchContainer<>(targetImageName)) {
esSourceContainer.start();
Expand Down Expand Up @@ -308,6 +330,86 @@ private DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(SourceRep
}
}

public static Stream<Arguments> makeProcessExitArgs() {
return SOURCE_IMAGE_NAMES.stream()
.flatMap(a->
TARGET_IMAGE_NAMES.stream().map(b->Arguments.of(a, b)));
}

@ParameterizedTest
@MethodSource("makeProcessExitArgs")
public void testProcessExitsAsExpected(String sourceImageName, String targetImageName) throws Exception {

try (ElasticsearchContainer esSourceContainer =
new ElasticsearchContainer(new ElasticsearchContainer.Version(sourceImageName,
"preloaded-ES_7_10"));
OpensearchContainer<?> osTargetContainer =
new OpensearchContainer<>(targetImageName)) {
esSourceContainer.start();
osTargetContainer.start();

final var SNAPSHOT_NAME = "test_snapshot";
final List<String> INDEX_ALLOWLIST = List.of();
CreateSnapshot.run(
c -> new FileSystemSnapshotCreator(SNAPSHOT_NAME, c, ElasticsearchContainer.CLUSTER_SNAPSHOT_DIR),
new OpenSearchClient(esSourceContainer.getUrl(), null),
false);
var tempDirSnapshot = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_snapshot");
var tempDirLucene = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene");

String[] args = {
"--snapshot-name", SNAPSHOT_NAME,
"--snapshot-local-dir", tempDirSnapshot.toString(),
"--lucene-dir", tempDirLucene.toString(),
"--target-host", osTargetContainer.getHttpHostAddress()
};

try {
esSourceContainer.copySnapshotData(tempDirSnapshot.toString());

var targetClient = new OpenSearchClient(osTargetContainer.getHttpHostAddress(), null);
var sourceRepo = new FileSystemRepo(tempDirSnapshot);
migrateMetadata(sourceRepo, targetClient, SNAPSHOT_NAME, INDEX_ALLOWLIST);

String classpath = System.getProperty("java.class.path");
String javaHome = System.getProperty("java.home");
String javaExecutable = javaHome + File.separator + "bin" + File.separator + "java";

// Kick off the doc migration process
log.atInfo().setMessage("Running RfsMigrateDocuments with args: " + Arrays.toString(args)).log();
ProcessBuilder processBuilder = new ProcessBuilder(
javaExecutable, "-cp", classpath, "com.rfs.RfsMigrateDocuments"
);
processBuilder.command().addAll(Arrays.asList(args));

Process process = processBuilder.start();
log.atInfo().setMessage("Process started with ID: " + Long.toString(process.toHandle().pid())).log();

// Kill the process and fail if we have to wait too long
boolean finished = process.waitFor(2, TimeUnit.MINUTES);
if (!finished) {
log.atError().setMessage("Process timed out, attempting to kill it...").log();
process.destroy(); // Try to be nice about things first...
if (!process.waitFor(10, TimeUnit.SECONDS)) {
log.atError().setMessage("Process still running, attempting to force kill it...").log();
process.destroyForcibly(); // ..then avada kedavra
}
Assertions.fail("The process did not finish within the timeout period.");
}

int exitCode = process.exitValue();

// Check if the exit code is as expected
int expectedExitCode = 0;
Assertions.assertEquals(expectedExitCode, exitCode, "The program did not exit with the expected status code.");

} finally {
deleteTree(tempDirSnapshot);
deleteTree(tempDirLucene);
}
}
}

private static void deleteTree(Path path) throws IOException {
try (var walk = Files.walk(path)) {
walk.sorted(Comparator.reverseOrder()).forEach(p -> {
Expand Down

0 comments on commit 7700c65

Please sign in to comment.