diff --git a/DocumentsFromSnapshotMigration/build.gradle b/DocumentsFromSnapshotMigration/build.gradle index 4f928b962..b0b636124 100644 --- a/DocumentsFromSnapshotMigration/build.gradle +++ b/DocumentsFromSnapshotMigration/build.gradle @@ -148,11 +148,27 @@ test { } task slowTest(type: Test) { + doFirst { + println "Running slow tests..." + } + useJUnitPlatform() dependsOn(':TrafficCapture:dockerSolution:buildDockerImage_elasticsearchTestConsole') jacoco { enabled = true } + testLogging { + events "passed", "skipped", "failed" + exceptionFormat "full" + showExceptions true + showCauses true + showStackTraces true + // showStandardStreams = true + } + reports { + html.required = true + html.destination file("${buildDir}/reports/tests/slowTest") + } } jacocoTestReport { diff --git a/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java b/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java index e7a62551f..5fab68579 100644 --- a/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java +++ b/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java @@ -2,6 +2,7 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; import java.io.IOException; import java.net.URI; @@ -22,6 +23,7 @@ import com.rfs.common.DefaultSourceRepoAccessor; import com.rfs.common.DocumentReindexer; +import com.rfs.common.FileSystemRepo; import com.rfs.common.IndexMetadata; import com.rfs.common.LuceneDocumentsReader; import com.rfs.common.OpenSearchClient; @@ -49,20 +51,33 @@ public static class Args { description = "The name of the snapshot to migrate") public String snapshotName; + @Parameter(names = {"--snapshot-local-dir"}, + required = false, + description = ("The absolute path to the directory on local disk where the snapshot exists. Use this parameter" + + " if have a copy of the snapshot disk. Mutually exclusive with --s3-local-dir, --s3-repo-uri, and --s3-region." + )) + public String snapshotLocalDirPath = null; + @Parameter(names = {"--s3-local-dir"}, - required = true, - description = "The absolute path to the directory on local disk to download S3 files to") - public String s3LocalDirPath; + required = false, + description = ("The absolute path to the directory on local disk to download S3 files to. If you supply this, you must" + + " also supply --s3-repo-uri and --s3-region. Mutually exclusive with --snapshot-local-dir." + )) + public String s3LocalDirPath = null; @Parameter(names = {"--s3-repo-uri"}, - required = true, - description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2") - public String s3RepoUri; + required = false, + description = ("The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2. If you supply this, you must" + + " also supply --s3-local-dir and --s3-region. Mutually exclusive with --snapshot-local-dir." + )) + public String s3RepoUri = null; @Parameter(names = {"--s3-region"}, - required = true, - description = "The AWS Region the S3 bucket is in, like: us-east-2") - public String s3Region; + required = false, + description = ("The AWS Region the S3 bucket is in, like: us-east-2. If you supply this, you must" + + " also supply --s3-local-dir and --s3-repo-uri. Mutually exclusive with --snapshot-local-dir." + )) + public String s3Region = null; @Parameter(names = {"--lucene-dir"}, required = true, @@ -97,14 +112,34 @@ public NoWorkLeftException(String message) { } } + public static void validateArgs(Args args) { + boolean isSnapshotLocalDirProvided = args.snapshotLocalDirPath != null; + boolean areAllS3ArgsProvided = args.s3LocalDirPath != null && args.s3RepoUri != null && args.s3Region != null; + boolean areAnyS3ArgsProvided = args.s3LocalDirPath != null || args.s3RepoUri != null || args.s3Region != null; + + if (isSnapshotLocalDirProvided && areAnyS3ArgsProvided) { + throw new ParameterException("You must provide either --snapshot-local-dir or --s3-local-dir, --s3-repo-uri, and --s3-region, but not both."); + } + + if (areAnyS3ArgsProvided && !areAllS3ArgsProvided) { + throw new ParameterException("If provide the S3 Snapshot args, you must provide all of them (--s3-local-dir, --s3-repo-uri and --s3-region)."); + } + + if (!isSnapshotLocalDirProvided && !areAllS3ArgsProvided) { + throw new ParameterException("You must provide either --snapshot-local-dir or --s3-local-dir, --s3-repo-uri, and --s3-region."); + } + + } + public static void main(String[] args) throws Exception { - // Grab out args Args arguments = new Args(); JCommander.newBuilder() .addObject(arguments) .build() .parse(args); + validateArgs(arguments); + var luceneDirPath = Paths.get(arguments.luceneDirPath); try (var processManager = new LeaseExpireTrigger(workItemId->{ log.error("terminating RunRfsWorker because its lease has expired for " + workItemId); @@ -120,8 +155,16 @@ public static void main(String[] args) throws Exception { new OpenSearchClient(arguments.targetHost, arguments.targetUser, arguments.targetPass, false); DocumentReindexer reindexer = new DocumentReindexer(targetClient); - SourceRepo sourceRepo = S3Repo.create(Paths.get(arguments.s3LocalDirPath), - new S3Uri(arguments.s3RepoUri), arguments.s3Region); + SourceRepo sourceRepo; + if (arguments.snapshotLocalDirPath == null) { + sourceRepo = S3Repo.create( + Paths.get(arguments.s3LocalDirPath), + new S3Uri(arguments.s3RepoUri), + arguments.s3Region + ); + } else { + sourceRepo = new FileSystemRepo(Paths.get(arguments.snapshotLocalDirPath)); + } SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo); IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider); diff --git a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/FullTest.java b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/FullTest.java index 5d65c46dd..11d2a0b92 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/FullTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/FullTest.java @@ -47,6 +47,7 @@ import org.slf4j.event.Level; import reactor.core.publisher.Flux; +import java.io.File; import java.io.IOException; import java.net.URI; import java.nio.file.Files; @@ -65,6 +66,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.UnaryOperator; @@ -79,6 +81,7 @@ public class FullTest { final static long TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 3600; final static Pattern CAT_INDICES_INDEX_COUNT_PATTERN = Pattern.compile("(?:\\S+\\s+){2}(\\S+)\\s+(?:\\S+\\s+){3}(\\S+)"); +<<<<<<< Updated upstream public static final String SOURCE_SERVER_ALIAS = "source"; public static final int MAX_SHARD_SIZE_BYTES = 64 * 1024 * 1024; @@ -114,6 +117,25 @@ public void test(SearchClusterContainer.Version baseSourceImageVersion, try (var esSourceContainer = new PreloadedSearchClusterContainer(baseSourceImageVersion, SOURCE_SERVER_ALIAS, generatorImage, generatorArgs); +======= + final static List SOURCE_IMAGE_NAMES = List.of("migrations/elasticsearch_rfs_source"); + final static List TARGET_IMAGE_NAMES = List.of("opensearchproject/opensearch:2.13.0", "opensearchproject/opensearch:1.3.0"); + + public static Stream makeDocumentMigrationArgs() { + var numWorkers = List.of(1, 3, 40); + return SOURCE_IMAGE_NAMES.stream() + .flatMap(a-> + TARGET_IMAGE_NAMES.stream().flatMap(b-> + numWorkers.stream().map(c->Arguments.of(a, b, c)))); + } + + @ParameterizedTest + @MethodSource("makeDocumentMigrationArgs") + public void testDocumentMigration(String sourceImageName, String targetImageName, int numWorkers) throws Exception { + try (ElasticsearchContainer esSourceContainer = + new ElasticsearchContainer(new ElasticsearchContainer.Version(sourceImageName, + "preloaded-ES_7_10")); +>>>>>>> Stashed changes OpensearchContainer osTargetContainer = new OpensearchContainer<>(targetImageName)) { esSourceContainer.start(); @@ -308,6 +330,86 @@ private DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(SourceRep } } + public static Stream makeProcessExitArgs() { + return SOURCE_IMAGE_NAMES.stream() + .flatMap(a-> + TARGET_IMAGE_NAMES.stream().map(b->Arguments.of(a, b))); + } + + @ParameterizedTest + @MethodSource("makeProcessExitArgs") + public void testProcessExitsAsExpected(String sourceImageName, String targetImageName) throws Exception { + + try (ElasticsearchContainer esSourceContainer = + new ElasticsearchContainer(new ElasticsearchContainer.Version(sourceImageName, + "preloaded-ES_7_10")); + OpensearchContainer osTargetContainer = + new OpensearchContainer<>(targetImageName)) { + esSourceContainer.start(); + osTargetContainer.start(); + + final var SNAPSHOT_NAME = "test_snapshot"; + final List INDEX_ALLOWLIST = List.of(); + CreateSnapshot.run( + c -> new FileSystemSnapshotCreator(SNAPSHOT_NAME, c, ElasticsearchContainer.CLUSTER_SNAPSHOT_DIR), + new OpenSearchClient(esSourceContainer.getUrl(), null), + false); + var tempDirSnapshot = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_snapshot"); + var tempDirLucene = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene"); + + String[] args = { + "--snapshot-name", SNAPSHOT_NAME, + "--snapshot-local-dir", tempDirSnapshot.toString(), + "--lucene-dir", tempDirLucene.toString(), + "--target-host", osTargetContainer.getHttpHostAddress() + }; + + try { + esSourceContainer.copySnapshotData(tempDirSnapshot.toString()); + + var targetClient = new OpenSearchClient(osTargetContainer.getHttpHostAddress(), null); + var sourceRepo = new FileSystemRepo(tempDirSnapshot); + migrateMetadata(sourceRepo, targetClient, SNAPSHOT_NAME, INDEX_ALLOWLIST); + + String classpath = System.getProperty("java.class.path"); + String javaHome = System.getProperty("java.home"); + String javaExecutable = javaHome + File.separator + "bin" + File.separator + "java"; + + // Kick off the doc migration process + log.atInfo().setMessage("Running RfsMigrateDocuments with args: " + Arrays.toString(args)).log(); + ProcessBuilder processBuilder = new ProcessBuilder( + javaExecutable, "-cp", classpath, "com.rfs.RfsMigrateDocuments" + ); + processBuilder.command().addAll(Arrays.asList(args)); + + Process process = processBuilder.start(); + log.atInfo().setMessage("Process started with ID: " + Long.toString(process.toHandle().pid())).log(); + + // Kill the process and fail if we have to wait too long + boolean finished = process.waitFor(2, TimeUnit.MINUTES); + if (!finished) { + log.atError().setMessage("Process timed out, attempting to kill it...").log(); + process.destroy(); // Try to be nice about things first... + if (!process.waitFor(10, TimeUnit.SECONDS)) { + log.atError().setMessage("Process still running, attempting to force kill it...").log(); + process.destroyForcibly(); // ..then avada kedavra + } + Assertions.fail("The process did not finish within the timeout period."); + } + + int exitCode = process.exitValue(); + + // Check if the exit code is as expected + int expectedExitCode = 0; + Assertions.assertEquals(expectedExitCode, exitCode, "The program did not exit with the expected status code."); + + } finally { + deleteTree(tempDirSnapshot); + deleteTree(tempDirLucene); + } + } + } + private static void deleteTree(Path path) throws IOException { try (var walk = Files.walk(path)) { walk.sorted(Comparator.reverseOrder()).forEach(p -> {