diff --git a/RFS/build-preloaded-source-image.gradle b/RFS/build-preloaded-source-image.gradle index c1dca9b17..b0b25b3fa 100644 --- a/RFS/build-preloaded-source-image.gradle +++ b/RFS/build-preloaded-source-image.gradle @@ -23,8 +23,8 @@ def createNetworkTask = task createNetwork(type: Exec) { } } task createInitialElasticsearchContainer(type: DockerCreateContainer) { - dependsOn createNetwork, buildDockerImage_emptyElasticsearchSource - targetImageId 'migrations/empty_elasticsearch_source:latest' + dependsOn createNetwork, buildDockerImage_emptyElasticsearchSource_7_17 + targetImageId 'migrations/emptyElasticsearchSource_7_17:latest' containerName = "elasticsearch-${uniqueId}" hostConfig.network = myNetworkName hostConfig.dns = ['elasticsearch'] diff --git a/RFS/build.gradle b/RFS/build.gradle index de25b72c2..e727e6270 100644 --- a/RFS/build.gradle +++ b/RFS/build.gradle @@ -108,11 +108,6 @@ jacocoTestReport { } } -task demoPrintOutSnapshot (type: JavaExec) { - classpath = sourceSets.main.runtimeClasspath - mainClass = 'com.rfs.DemoPrintOutSnapshot' -} - task copyDockerRuntimeJars (type: Sync) { duplicatesStrategy = DuplicatesStrategy.EXCLUDE description = 'Copy runtime JARs and app jar to docker build directory' @@ -132,9 +127,12 @@ DockerServiceProps[] dockerServices = [ dockerImageName:"reindex_from_snapshot", inputDir:"./docker", taskDependencies:["copyDockerRuntimeJars"]]), - new DockerServiceProps([projectName:"emptyElasticsearchSource", - dockerImageName:"empty_elasticsearch_source", + new DockerServiceProps([projectName:"emptyElasticsearchSource_7_10", + dockerImageName:"empty_elasticsearch_source_7_10", inputDir:"./docker/TestSource_ES_7_10"]), + new DockerServiceProps([projectName:"emptyElasticsearchSource_7_17", + dockerImageName:"empty_elasticsearch_source_7_17", + inputDir:"./docker/TestSource_ES_7_17"]), new DockerServiceProps([projectName:"trafficGenerator", dockerImageName:"osb_traffic_generator", inputDir:"./docker/TrafficGenerator", diff --git a/RFS/docker/TestSource_ES_7_17/Dockerfile b/RFS/docker/TestSource_ES_7_17/Dockerfile new file mode 100644 index 000000000..49a1abfca --- /dev/null +++ b/RFS/docker/TestSource_ES_7_17/Dockerfile @@ -0,0 +1,22 @@ +FROM docker.elastic.co/elasticsearch/elasticsearch:7.17.21 AS base + +# Configure Elastic +ENV ELASTIC_SEARCH_CONFIG_FILE=/usr/share/elasticsearch/config/elasticsearch.yml +# Prevents ES from complaining about nodes count +RUN echo "discovery.type: single-node" >> $ELASTIC_SEARCH_CONFIG_FILE +ENV PATH=${PATH}:/usr/share/elasticsearch/jdk/bin/ + +# Install the S3 Repo Plugin +RUN echo y | /usr/share/elasticsearch/bin/elasticsearch-plugin install repository-s3 + +# Install the AWS CLI for testing purposes +RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \ + unzip awscliv2.zip && \ + ./aws/install + +RUN mkdir /snapshots && chown elasticsearch /snapshots + +# Install our custom entrypoint script +COPY ./container-start.sh /usr/share/elasticsearch/container-start.sh + +CMD /usr/share/elasticsearch/container-start.sh diff --git a/RFS/docker/TestSource_ES_7_17/container-start.sh b/RFS/docker/TestSource_ES_7_17/container-start.sh new file mode 100755 index 000000000..b2507c11c --- /dev/null +++ b/RFS/docker/TestSource_ES_7_17/container-start.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +echo "Setting AWS Creds from ENV Variables" +bin/elasticsearch-keystore create +echo $AWS_ACCESS_KEY_ID | bin/elasticsearch-keystore add s3.client.default.access_key --stdin +echo $AWS_SECRET_ACCESS_KEY | bin/elasticsearch-keystore add s3.client.default.secret_key --stdin + +if [ -n "$AWS_SESSION_TOKEN" ]; then + echo $AWS_SESSION_TOKEN | bin/elasticsearch-keystore add s3.client.default.session_token --stdin +fi + +echo "Starting Elasticsearch" +/usr/local/bin/docker-entrypoint.sh eswrapper \ No newline at end of file diff --git a/RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java b/RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java deleted file mode 100644 index 6f8b4a3c7..000000000 --- a/RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java +++ /dev/null @@ -1,270 +0,0 @@ -package com.rfs; - -import java.nio.file.Paths; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; - -import org.apache.lucene.document.Document; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.IndexableField; -import org.apache.lucene.store.FSDirectory; -import org.apache.lucene.util.BytesRef; - -import com.rfs.common.Uid; -import com.rfs.version_es_6_8.*; -import com.rfs.common.GlobalMetadata; -import com.rfs.common.IndexMetadata; -import com.rfs.common.SourceRepo; -import com.rfs.common.ShardMetadata; -import com.rfs.common.SnapshotMetadata; -import com.rfs.common.SnapshotRepo; -import com.rfs.common.SnapshotShardUnpacker; -import com.rfs.common.ClusterVersion; -import com.rfs.common.FileSystemRepo; -import com.rfs.version_es_7_10.*; - -public class DemoPrintOutSnapshot { - - public static class Args { - @Parameter(names = {"-n", "--snapshot-name"}, description = "The name of the snapshot to read", required = true) - public String snapshotName; - - @Parameter(names = {"-d", "--snapshot-dir"}, description = "The absolute path to the snapshot directory", required = true) - public String snapshotDirPath; - - @Parameter(names = {"-l", "--lucene-dir"}, description = "The absolute path to the directory where we'll put the Lucene docs", required = true) - public String luceneBasePathString; - - @Parameter(names = {"-v", "--source-version"}, description = "Source version", required = true, converter = ClusterVersion.ArgsConverter.class) - public ClusterVersion sourceVersion; - } - - public static void main(String[] args) { - Args arguments = new Args(); - JCommander.newBuilder() - .addObject(arguments) - .build() - .parse(args); - - String snapshotName = arguments.snapshotName; - String snapshotDirPath = arguments.snapshotDirPath; - String luceneBasePathString = arguments.luceneBasePathString; - ClusterVersion sourceVersion = arguments.sourceVersion; - - if (!((sourceVersion == ClusterVersion.ES_6_8) || (sourceVersion == ClusterVersion.ES_7_10))) { - throw new IllegalArgumentException("Unsupported source version: " + sourceVersion); - } - - SourceRepo repo = new FileSystemRepo(Path.of(snapshotDirPath)); - - try { - // ========================================================================================================== - // Read the Repo data file - // ========================================================================================================== - System.out.println("=================================================================="); - System.out.println("Attempting to read Repo data file..."); - - SnapshotRepo.Provider repoDataProvider; - if (sourceVersion == ClusterVersion.ES_6_8) { - repoDataProvider = new SnapshotRepoProvider_ES_6_8(repo); - } else { - repoDataProvider = new SnapshotRepoProvider_ES_7_10(repo); - } - - System.out.println("--- Snapshots ---"); - repoDataProvider.getSnapshots().forEach(snapshot -> System.out.println(snapshot.getName() + " - " + snapshot.getId())); - - for (SnapshotRepo.Snapshot snapshot : repoDataProvider.getSnapshots()) { - System.out.println("--- Indices in " + snapshot.getName() + " ---"); - for (SnapshotRepo.Index index : repoDataProvider.getIndicesInSnapshot(snapshot.getName())) { - System.out.println(index.getName() + " - " + index.getId()); - } - } - System.out.println("Repo data read successfully"); - - // ========================================================================================================== - // Read the Snapshot details - // ========================================================================================================== - System.out.println("=================================================================="); - System.out.println("Attempting to read Snapshot details..."); - String snapshotIdString = repoDataProvider.getSnapshotId(snapshotName); - - if (snapshotIdString == null) { - System.out.println("Snapshot not found"); - return; - } - - SnapshotMetadata.Data snapshotMetadata; - if (sourceVersion == ClusterVersion.ES_6_8) { - snapshotMetadata = new SnapshotMetadataFactory_ES_6_8().fromRepo(repo, repoDataProvider, snapshotName); - } else { - snapshotMetadata = new SnapshotMetadataFactory_ES_7_10().fromRepo(repo, repoDataProvider, snapshotName); - } - - System.out.println("Snapshot Metadata State: " + snapshotMetadata.getState()); - System.out.println("Snapshot Metadata State Reason: " + snapshotMetadata.getReason()); - System.out.println("Snapshot Metadata Version: " + snapshotMetadata.getVersionId()); - System.out.println("Snapshot Metadata Indices: " + snapshotMetadata.getIndices()); - System.out.println("Snapshot Metadata Shards Total: " + snapshotMetadata.getTotalShards()); - System.out.println("Snapshot Metadata Shards Successful: " + snapshotMetadata.getSuccessfulShards()); - - // ========================================================================================================== - // Read the Global Metadata - // ========================================================================================================== - System.out.println("=================================================================="); - System.out.println("Attempting to read Global Metadata details..."); - - GlobalMetadata.Data globalMetadata; - if (sourceVersion == ClusterVersion.ES_6_8) { - globalMetadata = new GlobalMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName); - } else { - globalMetadata = new GlobalMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName); - } - - if (sourceVersion == ClusterVersion.ES_6_8) { - GlobalMetadataData_ES_6_8 globalMetadataES68 = (GlobalMetadataData_ES_6_8) globalMetadata; - - List templateKeys = new ArrayList<>(); - globalMetadataES68.getTemplates().fieldNames().forEachRemaining(templateKeys::add); - System.out.println("Templates Keys: " + templateKeys); - } else if (sourceVersion == ClusterVersion.ES_7_10) { - GlobalMetadataData_ES_7_10 globalMetadataES710 = (GlobalMetadataData_ES_7_10) globalMetadata; - - List indexTemplateKeys = new ArrayList<>(); - globalMetadataES710.getIndexTemplates().fieldNames().forEachRemaining(indexTemplateKeys::add); - System.out.println("Index Templates Keys: " + indexTemplateKeys); - - List componentTemplateKeys = new ArrayList<>(); - globalMetadataES710.getComponentTemplates().fieldNames().forEachRemaining(componentTemplateKeys::add); - System.out.println("Component Templates Keys: " + componentTemplateKeys); - } - - // ========================================================================================================== - // Read all the Index Metadata - // ========================================================================================================== - System.out.println("=================================================================="); - System.out.println("Attempting to read Index Metadata..."); - - Map indexMetadatas = new HashMap<>(); - if (sourceVersion == ClusterVersion.ES_6_8) { - for (SnapshotRepo.Index index : repoDataProvider.getIndicesInSnapshot(snapshotName)) { - IndexMetadata.Data indexMetadata = new IndexMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName, index.getName()); - indexMetadatas.put(index.getName(), indexMetadata); - } - } else { - for (SnapshotRepo.Index index : repoDataProvider.getIndicesInSnapshot(snapshotName)) { - IndexMetadata.Data indexMetadata = new IndexMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName, index.getName()); - indexMetadatas.put(index.getName(), indexMetadata); - } - } - - for (IndexMetadata.Data indexMetadata : indexMetadatas.values()) { - System.out.println("Reading Index Metadata for index: " + indexMetadata.getName()); - System.out.println("Index Id: " + indexMetadata.getId()); - System.out.println("Index Number of Shards: " + indexMetadata.getNumberOfShards()); - System.out.println("Index Settings: " + indexMetadata.getSettings().toString()); - System.out.println("Index Mappings: " + indexMetadata.getMappings().toString()); - System.out.println("Index Aliases: " + indexMetadata.getAliases().toString()); - } - - System.out.println("Index Metadata read successfully"); - - // ========================================================================================================== - // Read the Index Shard Metadata for the Snapshot - // ========================================================================================================== - System.out.println("=================================================================="); - System.out.println("Attempting to read Index Shard Metadata..."); - for (IndexMetadata.Data indexMetadata : indexMetadatas.values()) { - System.out.println("Reading Index Shard Metadata for index: " + indexMetadata.getName()); - for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) { - System.out.println("=== Shard ID: " + shardId + " ==="); - - // Get the file mapping for the shard - ShardMetadata.Data shardMetadata; - if (sourceVersion == ClusterVersion.ES_6_8) { - shardMetadata = new ShardMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId); - } else { - shardMetadata = new ShardMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId); - } - System.out.println("Shard Metadata: " + shardMetadata.toString()); - } - } - - // ========================================================================================================== - // Unpack the blob files - // ========================================================================================================== - System.out.println("=================================================================="); - System.out.println("Unpacking blob files to disk..."); - - for (IndexMetadata.Data indexMetadata : indexMetadatas.values()){ - for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) { - ShardMetadata.Data shardMetadata; - if (sourceVersion == ClusterVersion.ES_6_8) { - shardMetadata = new ShardMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId); - } else { - shardMetadata = new ShardMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId); - } - - // Unpack the shard - int bufferSize; - if (sourceVersion == ClusterVersion.ES_6_8) { - bufferSize = ElasticsearchConstants_ES_6_8.BUFFER_SIZE_IN_BYTES; - } else { - bufferSize = ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES; - } - SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repo, Paths.get(luceneBasePathString), bufferSize); - unpacker.unpack(shardMetadata); - - // Now, read the documents back out - System.out.println("--- Reading docs in the shard ---"); - Path luceneIndexDir = Paths.get(luceneBasePathString + "/" + shardMetadata.getIndexName() + "/" + shardMetadata.getShardId()); - readDocumentsFromLuceneIndex(luceneIndexDir); - } - - } - } catch (Exception e) { - e.printStackTrace(); - } - } - - private static void readDocumentsFromLuceneIndex(Path indexDirectoryPath) throws Exception { - // Opening the directory that contains the Lucene index - try (FSDirectory directory = FSDirectory.open(indexDirectoryPath); - IndexReader reader = DirectoryReader.open(directory)) { - - // Iterating over all documents in the index - for (int i = 0; i < reader.maxDoc(); i++) { - System.out.println("Reading Document"); - Document document = reader.document(i); - - BytesRef source_bytes = document.getBinaryValue("_source"); - if (source_bytes == null || source_bytes.bytes.length == 0) { // Skip deleted documents - String id = Uid.decodeId(reader.document(i).getBinaryValue("_id").bytes); - System.out.println("Document " + id + " is deleted"); - continue; - } - - // Iterate over all fields in the document - List fields = document.getFields(); - for (IndexableField field : fields) { - if ("_source".equals(field.name())){ - String source_string = source_bytes.utf8ToString(); - System.out.println("Field name: " + field.name() + ", Field value: " + source_string); - } else if ("_id".equals(field.name())){ - String uid = Uid.decodeId(document.getBinaryValue(field.name()).bytes); - System.out.println("Field name: " + field.name() + ", Field value: " + uid); - } else { - System.out.println("Field name: " + field.name()); - } - } - } - } - } -} \ No newline at end of file diff --git a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java index 31ec61cac..f8efcc61b 100644 --- a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java +++ b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java @@ -329,7 +329,8 @@ public static void main(String[] args) throws InterruptedException { } else { bufferSize = ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES; } - SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repo, luceneDirPath, bufferSize); + DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(repo); + SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,luceneDirPath, bufferSize); for (IndexMetadata.Data indexMetadata : indexMetadatas) { logger.info("Processing index: " + indexMetadata.getName()); @@ -345,7 +346,9 @@ public static void main(String[] args) throws InterruptedException { } // Unpack the shard - unpacker.unpack(shardMetadata); + try (SnapshotShardUnpacker unpacker = unpackerFactory.create(shardMetadata)) { + unpacker.unpack(); + } } } diff --git a/RFS/src/main/java/com/rfs/RunRfsWorker.java b/RFS/src/main/java/com/rfs/RunRfsWorker.java index f5734b594..245c712a6 100644 --- a/RFS/src/main/java/com/rfs/RunRfsWorker.java +++ b/RFS/src/main/java/com/rfs/RunRfsWorker.java @@ -21,6 +21,7 @@ import com.rfs.cms.OpenSearchCmsClient; import com.rfs.common.ClusterVersion; import com.rfs.common.ConnectionDetails; +import com.rfs.common.DefaultSourceRepoAccessor; import com.rfs.common.DocumentReindexer; import com.rfs.common.GlobalMetadata; import com.rfs.common.IndexMetadata; @@ -100,6 +101,10 @@ public static class Args { @Parameter(names = {"--component-template-allowlist"}, description = ("Optional. List of component template names to migrate" + " (e.g. 'posts_template1, posts_template2'). Default: empty list"), required = false) public List componentTemplateAllowlist = List.of(); + + @Parameter(names = {"--max-shard-size-bytes"}, description = ("Optional. The maximum shard size, in bytes, to allow when" + + " performing the document migration. Useful for preventing disk overflow. Default: 50 * 1024 * 1024 * 1024 (50 GB)"), required = false) + public long maxShardSizeBytes = 50 * 1024 * 1024 * 1024L; //https://opensearch.org/docs/2.11/api-reference/cluster-api/cluster-awareness/ @Parameter(names = {"--min-replicas"}, description = ("Optional. The minimum number of replicas configured for migrated indices on the target." @@ -119,56 +124,58 @@ public static void main(String[] args) throws Exception { .build() .parse(args); - String snapshotName = arguments.snapshotName; - Path s3LocalDirPath = Paths.get(arguments.s3LocalDirPath); - String s3RepoUri = arguments.s3RepoUri; - String s3Region = arguments.s3Region; - Path luceneDirPath = Paths.get(arguments.luceneDirPath); - String sourceHost = arguments.sourceHost; - String sourceUser = arguments.sourceUser; - String sourcePass = arguments.sourcePass; - String targetHost = arguments.targetHost; - String targetUser = arguments.targetUser; - String targetPass = arguments.targetPass; - List indexTemplateAllowlist = arguments.indexTemplateAllowlist; - List componentTemplateAllowlist = arguments.componentTemplateAllowlist; - int awarenessDimensionality = arguments.minNumberOfReplicas + 1; - Level logLevel = arguments.logLevel; + final String snapshotName = arguments.snapshotName; + final Path s3LocalDirPath = Paths.get(arguments.s3LocalDirPath); + final String s3RepoUri = arguments.s3RepoUri; + final String s3Region = arguments.s3Region; + final Path luceneDirPath = Paths.get(arguments.luceneDirPath); + final String sourceHost = arguments.sourceHost; + final String sourceUser = arguments.sourceUser; + final String sourcePass = arguments.sourcePass; + final String targetHost = arguments.targetHost; + final String targetUser = arguments.targetUser; + final String targetPass = arguments.targetPass; + final List indexTemplateAllowlist = arguments.indexTemplateAllowlist; + final List componentTemplateAllowlist = arguments.componentTemplateAllowlist; + final long maxShardSizeBytes = arguments.maxShardSizeBytes; + final int awarenessDimensionality = arguments.minNumberOfReplicas + 1; + final Level logLevel = arguments.logLevel; Logging.setLevel(logLevel); - ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass); - ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass); + final ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass); + final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass); try { logger.info("Running RfsWorker"); GlobalState globalState = GlobalState.getInstance(); OpenSearchClient sourceClient = new OpenSearchClient(sourceConnection); OpenSearchClient targetClient = new OpenSearchClient(targetConnection); - CmsClient cmsClient = new OpenSearchCmsClient(targetClient); + final CmsClient cmsClient = new OpenSearchCmsClient(targetClient); - SnapshotCreator snapshotCreator = new S3SnapshotCreator(snapshotName, sourceClient, s3RepoUri, s3Region); - SnapshotRunner snapshotWorker = new SnapshotRunner(globalState, cmsClient, snapshotCreator); + final SnapshotCreator snapshotCreator = new S3SnapshotCreator(snapshotName, sourceClient, s3RepoUri, s3Region); + final SnapshotRunner snapshotWorker = new SnapshotRunner(globalState, cmsClient, snapshotCreator); snapshotWorker.run(); - SourceRepo sourceRepo = S3Repo.create(s3LocalDirPath, new S3Uri(s3RepoUri), s3Region); - SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo); - GlobalMetadata.Factory metadataFactory = new GlobalMetadataFactory_ES_7_10(repoDataProvider); - GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(targetClient, List.of(), componentTemplateAllowlist, indexTemplateAllowlist); - Transformer transformer = TransformFunctions.getTransformer(ClusterVersion.ES_7_10, ClusterVersion.OS_2_11, awarenessDimensionality); + final SourceRepo sourceRepo = S3Repo.create(s3LocalDirPath, new S3Uri(s3RepoUri), s3Region); + final SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo); + final GlobalMetadata.Factory metadataFactory = new GlobalMetadataFactory_ES_7_10(repoDataProvider); + final GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(targetClient, List.of(), componentTemplateAllowlist, indexTemplateAllowlist); + final Transformer transformer = TransformFunctions.getTransformer(ClusterVersion.ES_7_10, ClusterVersion.OS_2_11, awarenessDimensionality); MetadataRunner metadataWorker = new MetadataRunner(globalState, cmsClient, snapshotName, metadataFactory, metadataCreator, transformer); metadataWorker.run(); - IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider); - IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient); - IndexRunner indexWorker = new IndexRunner(globalState, cmsClient, snapshotName, indexMetadataFactory, indexCreator, transformer); + final IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider); + final IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient); + final IndexRunner indexWorker = new IndexRunner(globalState, cmsClient, snapshotName, indexMetadataFactory, indexCreator, transformer); indexWorker.run(); - ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider); - SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(sourceRepo, luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES); - LuceneDocumentsReader reader = new LuceneDocumentsReader(luceneDirPath); - DocumentReindexer reindexer = new DocumentReindexer(targetClient); - DocumentsRunner documentsWorker = new DocumentsRunner(globalState, cmsClient, snapshotName, indexMetadataFactory, shardMetadataFactory, unpacker, reader, reindexer); + final ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider); + final DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo); + final SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor, luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES); + final LuceneDocumentsReader reader = new LuceneDocumentsReader(luceneDirPath); + final DocumentReindexer reindexer = new DocumentReindexer(targetClient); + DocumentsRunner documentsWorker = new DocumentsRunner(globalState, cmsClient, snapshotName, maxShardSizeBytes, indexMetadataFactory, shardMetadataFactory, unpackerFactory, reader, reindexer); documentsWorker.run(); } catch (Runner.PhaseFailed e) { diff --git a/RFS/src/main/java/com/rfs/common/DefaultSourceRepoAccessor.java b/RFS/src/main/java/com/rfs/common/DefaultSourceRepoAccessor.java new file mode 100644 index 000000000..385a79ac5 --- /dev/null +++ b/RFS/src/main/java/com/rfs/common/DefaultSourceRepoAccessor.java @@ -0,0 +1,25 @@ +package com.rfs.common; + +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; + +/* + * Provides "simple" access to the underlying files in the source repo without any special behavior + * + * TODO: find a better approach to this (see https://opensearch.atlassian.net/browse/MIGRATIONS-1786) + */ +public class DefaultSourceRepoAccessor extends SourceRepoAccessor { + public DefaultSourceRepoAccessor(SourceRepo repo) { + super(repo); + } + + @Override + protected InputStream load(Path path) { + try { + return Files.newInputStream(path); + } catch (Exception e) { + throw new CouldNotLoadRepoFile("Could not load file: " + path, e); + } + } +} diff --git a/RFS/src/main/java/com/rfs/common/EphemeralSourceRepoAccessor.java b/RFS/src/main/java/com/rfs/common/EphemeralSourceRepoAccessor.java new file mode 100644 index 000000000..43e9d23e0 --- /dev/null +++ b/RFS/src/main/java/com/rfs/common/EphemeralSourceRepoAccessor.java @@ -0,0 +1,54 @@ +package com.rfs.common; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +/* + * Provides access to the underlying files in the source repo and deletes the files after the Stream is closed. This + * is useful/interesting in the case where the files are large/numerous and you can easily re-acquire them - such as + * if they are being loaded from S3. + * + * TODO: find a better approach to this (see https://opensearch.atlassian.net/browse/MIGRATIONS-1786) + */ +public class EphemeralSourceRepoAccessor extends SourceRepoAccessor { + private static final Logger logger = LogManager.getLogger(EphemeralSourceRepoAccessor.class); + public EphemeralSourceRepoAccessor(SourceRepo repo) { + super(repo); + } + + @Override + protected InputStream load(Path path) { + try { + return new EphemeralFileInputStream(path); + } catch (Exception e) { + throw new CouldNotLoadRepoFile("Could not load file: " + path, e); + } + } + + public static class EphemeralFileInputStream extends FileInputStream { + private final Path filePath; + + public EphemeralFileInputStream(Path filePath) throws IOException { + super(filePath.toFile()); + this.filePath = filePath; + } + + @Override + public void close() throws IOException { + try { + super.close(); + } finally { + logger.info("Deleting local file: " + filePath.toString()); + logger.warn("See: https://opensearch.atlassian.net/browse/MIGRATIONS-1786"); + Files.deleteIfExists(filePath); + } + } + } +} diff --git a/RFS/src/main/java/com/rfs/common/PartSliceStream.java b/RFS/src/main/java/com/rfs/common/PartSliceStream.java index a58110af4..230e5b249 100644 --- a/RFS/src/main/java/com/rfs/common/PartSliceStream.java +++ b/RFS/src/main/java/com/rfs/common/PartSliceStream.java @@ -2,8 +2,6 @@ import java.io.IOException; import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Path; /** * Taken from Elasticsearch 6.8, combining the SlicedInputStream and PartSliceStream classes with our special sauce @@ -12,7 +10,7 @@ */ public class PartSliceStream extends InputStream { - private final SourceRepo repo; + private final SourceRepoAccessor repoAccessor; private final ShardMetadata.FileInfo fileMetadata; private final String indexId; private final int shardId; @@ -20,16 +18,15 @@ public class PartSliceStream extends InputStream { private InputStream currentStream; private boolean initialized = false; - public PartSliceStream(SourceRepo repo, ShardMetadata.FileInfo fileMetadata, String indexId, int shardId) { - this.repo = repo; + public PartSliceStream(SourceRepoAccessor repoAccessor, ShardMetadata.FileInfo fileMetadata, String indexId, int shardId) { + this.repoAccessor = repoAccessor; this.fileMetadata = fileMetadata; this.indexId = indexId; this.shardId = shardId; } - protected InputStream openSlice(long slice) throws IOException { - Path filePath = repo.getBlobFilePath(indexId, shardId, fileMetadata.partName(slice)); - return Files.newInputStream(filePath); + protected InputStream openSlice(long slice) { + return repoAccessor.getBlobFile(indexId, shardId, fileMetadata.partName(slice)); } private InputStream nextStream() throws IOException { diff --git a/RFS/src/main/java/com/rfs/common/ShardMetadata.java b/RFS/src/main/java/com/rfs/common/ShardMetadata.java index dc4ef4f7e..f4dcac6c1 100644 --- a/RFS/src/main/java/com/rfs/common/ShardMetadata.java +++ b/RFS/src/main/java/com/rfs/common/ShardMetadata.java @@ -77,7 +77,7 @@ public static interface Data { public long getStartTime(); public long getTime(); public int getNumberOfFiles(); - public long getTotalSize(); + public long getTotalSizeBytes(); public List getFiles(); } diff --git a/RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java b/RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java index 5c5a56522..073080e44 100644 --- a/RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java +++ b/RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java @@ -1,6 +1,8 @@ package com.rfs.common; +import java.io.IOException; import java.io.InputStream; +import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -16,19 +18,31 @@ import org.apache.lucene.util.BytesRef; @RequiredArgsConstructor -public class SnapshotShardUnpacker { +public class SnapshotShardUnpacker implements AutoCloseable { private static final Logger logger = LogManager.getLogger(SnapshotShardUnpacker.class); - protected final SourceRepo repo; - protected final Path luceneFilesBasePath; - protected final int bufferSize; + private final SourceRepoAccessor repoAccessor; + private final Path luceneFilesBasePath; + private final ShardMetadata.Data shardMetadata; + private final int bufferSize; - public void unpack(ShardMetadata.Data shardMetadata) { + @RequiredArgsConstructor + public static class Factory { + private final SourceRepoAccessor repoAccessor; + private final Path luceneFilesBasePath; + private final int bufferSize; + + public SnapshotShardUnpacker create(ShardMetadata.Data shardMetadata) { + return new SnapshotShardUnpacker(repoAccessor, luceneFilesBasePath, shardMetadata, bufferSize); + } + } + + public void unpack() { try { // Some constants NativeFSLockFactory lockFactory = NativeFSLockFactory.INSTANCE; // Ensure the blob files are prepped, if they need to be - repo.prepBlobFiles(shardMetadata); + repoAccessor.prepBlobFiles(shardMetadata); // Create the directory for the shard's lucene files Path luceneIndexDir = Paths.get(luceneFilesBasePath + "/" + shardMetadata.getIndexName() + "/" + shardMetadata.getShardId()); @@ -37,27 +51,56 @@ public void unpack(ShardMetadata.Data shardMetadata) { for (ShardMetadata.FileInfo fileMetadata : shardMetadata.getFiles()) { logger.info("Unpacking - Blob Name: " + fileMetadata.getName() + ", Lucene Name: " + fileMetadata.getPhysicalName()); - IndexOutput indexOutput = primaryDirectory.createOutput(fileMetadata.getPhysicalName(), IOContext.DEFAULT); - - if (fileMetadata.getName().startsWith("v__")) { - final BytesRef hash = fileMetadata.getMetaHash(); - indexOutput.writeBytes(hash.bytes, hash.offset, hash.length); - } else { - try (InputStream stream = new PartSliceStream(repo, fileMetadata, shardMetadata.getIndexId(), shardMetadata.getShardId())) { - final byte[] buffer = new byte[Math.toIntExact(Math.min(bufferSize, fileMetadata.getLength()))]; - int length; - while ((length = stream.read(buffer)) > 0) { - indexOutput.writeBytes(buffer, 0, length); + try (IndexOutput indexOutput = primaryDirectory.createOutput(fileMetadata.getPhysicalName(), IOContext.DEFAULT);){ + if (fileMetadata.getName().startsWith("v__")) { + final BytesRef hash = fileMetadata.getMetaHash(); + indexOutput.writeBytes(hash.bytes, hash.offset, hash.length); + } else { + try (InputStream stream = new PartSliceStream(repoAccessor, fileMetadata, shardMetadata.getIndexId(), shardMetadata.getShardId())) { + final byte[] buffer = new byte[Math.toIntExact(Math.min(bufferSize, fileMetadata.getLength()))]; + int length; + while ((length = stream.read(buffer)) > 0) { + indexOutput.writeBytes(buffer, 0, length); + } } } } - indexOutput.close(); } } catch (Exception e) { throw new CouldNotUnpackShard("Could not unpack shard: Index " + shardMetadata.getIndexId() + ", Shard " + shardMetadata.getShardId(), e); } } + @Override + public void close() { + try { + Path luceneIndexDir = Paths.get(luceneFilesBasePath + "/" + shardMetadata.getIndexName() + "/" + shardMetadata.getShardId()); + if (Files.exists(luceneIndexDir)) { + deleteRecursively(luceneIndexDir); + } + + } catch (Exception e) { + throw new CouldNotCleanUpShard("Could not clean up shard: Index " + shardMetadata.getIndexId() + ", Shard " + shardMetadata.getShardId(), e); + } + } + + protected void deleteRecursively(Path path) throws IOException { + if (Files.isDirectory(path)) { + try (DirectoryStream entries = Files.newDirectoryStream(path)) { + for (Path entry : entries) { + deleteRecursively(entry); + } + } + } + Files.delete(path); + } + + public static class CouldNotCleanUpShard extends RfsException { + public CouldNotCleanUpShard(String message, Exception e) { + super(message, e); + } + } + public static class CouldNotUnpackShard extends RfsException { public CouldNotUnpackShard(String message, Exception e) { super(message, e); diff --git a/RFS/src/main/java/com/rfs/common/SourceRepoAccessor.java b/RFS/src/main/java/com/rfs/common/SourceRepoAccessor.java new file mode 100644 index 000000000..c3a7d1e77 --- /dev/null +++ b/RFS/src/main/java/com/rfs/common/SourceRepoAccessor.java @@ -0,0 +1,57 @@ +package com.rfs.common; + +import java.io.InputStream; +import java.nio.file.Path; + +// TODO: find a better approach to this (see https://opensearch.atlassian.net/browse/MIGRATIONS-1786) +public abstract class SourceRepoAccessor { + private final SourceRepo repo; + + public SourceRepoAccessor(SourceRepo repo) { + this.repo = repo; + } + + public Path getRepoRootDir() { + return repo.getRepoRootDir(); + } + + public InputStream getSnapshotRepoDataFile(){ + return load(repo.getSnapshotRepoDataFilePath()); + }; + + public InputStream getGlobalMetadataFile(String snapshotId) { + return load(repo.getGlobalMetadataFilePath(snapshotId)); + } + + public InputStream getSnapshotMetadataFile(String snapshotId) { + return load(repo.getSnapshotMetadataFilePath(snapshotId)); + } + + public InputStream getIndexMetadataFile(String indexId, String indexFileId){ + return load(repo.getIndexMetadataFilePath(indexId, indexFileId)); + } + + public InputStream getShardDir(String indexId, int shardId){ + return load(repo.getShardDirPath(indexId, shardId)); + } + + public InputStream getShardMetadataFile(String snapshotId, String indexId, int shardId){ + return load(repo.getShardMetadataFilePath(snapshotId, indexId, shardId)); + } + + public InputStream getBlobFile(String indexId, int shardId, String blobName){ + return load(repo.getBlobFilePath(indexId, shardId, blobName)); + } + + public void prepBlobFiles(ShardMetadata.Data shardMetadata){ + repo.prepBlobFiles(shardMetadata); + } + + protected abstract InputStream load(Path path); + + public static class CouldNotLoadRepoFile extends RuntimeException { + public CouldNotLoadRepoFile(String message, Throwable cause) { + super(message, cause); + } + } +} diff --git a/RFS/src/main/java/com/rfs/version_es_6_8/ShardMetadataData_ES_6_8.java b/RFS/src/main/java/com/rfs/version_es_6_8/ShardMetadataData_ES_6_8.java index 88db38052..b528e95f9 100644 --- a/RFS/src/main/java/com/rfs/version_es_6_8/ShardMetadataData_ES_6_8.java +++ b/RFS/src/main/java/com/rfs/version_es_6_8/ShardMetadataData_ES_6_8.java @@ -101,7 +101,7 @@ public int getNumberOfFiles() { } @Override - public long getTotalSize() { + public long getTotalSizeBytes() { return totalSize; } diff --git a/RFS/src/main/java/com/rfs/version_es_7_10/ShardMetadataData_ES_7_10.java b/RFS/src/main/java/com/rfs/version_es_7_10/ShardMetadataData_ES_7_10.java index 77f7cf4ea..40e5a7710 100644 --- a/RFS/src/main/java/com/rfs/version_es_7_10/ShardMetadataData_ES_7_10.java +++ b/RFS/src/main/java/com/rfs/version_es_7_10/ShardMetadataData_ES_7_10.java @@ -100,7 +100,7 @@ public int getNumberOfFiles() { } @Override - public long getTotalSize() { + public long getTotalSizeBytes() { return totalSize; } diff --git a/RFS/src/main/java/com/rfs/worker/DocumentsRunner.java b/RFS/src/main/java/com/rfs/worker/DocumentsRunner.java index e76ea739d..758236e55 100644 --- a/RFS/src/main/java/com/rfs/worker/DocumentsRunner.java +++ b/RFS/src/main/java/com/rfs/worker/DocumentsRunner.java @@ -18,10 +18,21 @@ public class DocumentsRunner implements Runner { private final DocumentsStep.SharedMembers members; - public DocumentsRunner(GlobalState globalState, CmsClient cmsClient, String snapshotName, IndexMetadata.Factory metadataFactory, - ShardMetadata.Factory shardMetadataFactory, SnapshotShardUnpacker unpacker, LuceneDocumentsReader reader, - DocumentReindexer reindexer) { - this.members = new DocumentsStep.SharedMembers(globalState, cmsClient, snapshotName, metadataFactory, shardMetadataFactory, unpacker, reader, reindexer); + public DocumentsRunner( + GlobalState globalState, CmsClient cmsClient, String snapshotName, long maxShardSizeBytes, + IndexMetadata.Factory metadataFactory, ShardMetadata.Factory shardMetadataFactory, + SnapshotShardUnpacker.Factory unpackerFactory, LuceneDocumentsReader reader, DocumentReindexer reindexer) { + this.members = new DocumentsStep.SharedMembers( + globalState, + cmsClient, + snapshotName, + maxShardSizeBytes, + metadataFactory, + shardMetadataFactory, + unpackerFactory, + reader, + reindexer + ); } @Override diff --git a/RFS/src/main/java/com/rfs/worker/DocumentsStep.java b/RFS/src/main/java/com/rfs/worker/DocumentsStep.java index 21d666acb..4eddb407e 100644 --- a/RFS/src/main/java/com/rfs/worker/DocumentsStep.java +++ b/RFS/src/main/java/com/rfs/worker/DocumentsStep.java @@ -27,9 +27,10 @@ public static class SharedMembers { protected final GlobalState globalState; protected final CmsClient cmsClient; protected final String snapshotName; + protected final long maxShardSizeBytes; protected final IndexMetadata.Factory metadataFactory; protected final ShardMetadata.Factory shardMetadataFactory; - protected final SnapshotShardUnpacker unpacker; + protected final SnapshotShardUnpacker.Factory unpackerFactory; protected final LuceneDocumentsReader reader; protected final DocumentReindexer reindexer; protected Optional cmsEntry = Optional.empty(); @@ -364,20 +365,28 @@ public void run() { )); logger.info("Work item set"); + ShardMetadata.Data shardMetadata = null; try { logger.info("Migrating docs: Index " + workItem.indexName + ", Shard " + workItem.shardId); - ShardMetadata.Data shardMetadata = members.shardMetadataFactory.fromRepo(members.snapshotName, workItem.indexName, workItem.shardId); - members.unpacker.unpack(shardMetadata); - - Flux documents = members.reader.readDocuments(shardMetadata.getIndexName(), shardMetadata.getShardId()); + shardMetadata = members.shardMetadataFactory.fromRepo(members.snapshotName, workItem.indexName, workItem.shardId); + + logger.info("Shard size: " + shardMetadata.getTotalSizeBytes()); + if (shardMetadata.getTotalSizeBytes() > members.maxShardSizeBytes) { + throw new ShardTooLarge(shardMetadata.getTotalSizeBytes(), members.maxShardSizeBytes); + } - final int finalShardId = shardMetadata.getShardId(); // Define in local context for the lambda - members.reindexer.reindex(shardMetadata.getIndexName(), documents) - .doOnError(error -> logger.error("Error during reindexing: " + error)) - .doOnSuccess(done -> logger.info("Reindexing completed for Index " + shardMetadata.getIndexName() + ", Shard " + finalShardId)) - // Wait for the reindexing to complete before proceeding - .block(); - logger.info("Docs migrated"); + try (SnapshotShardUnpacker unpacker = members.unpackerFactory.create(shardMetadata)) { + unpacker.unpack(); + + Flux documents = members.reader.readDocuments(shardMetadata.getIndexName(), shardMetadata.getShardId()); + final ShardMetadata.Data finalShardMetadata = shardMetadata; // Define in local context for the lambda + members.reindexer.reindex(shardMetadata.getIndexName(), documents) + .doOnError(error -> logger.error("Error during reindexing: " + error)) + .doOnSuccess(done -> logger.info("Reindexing completed for Index " + finalShardMetadata.getIndexName() + ", Shard " + finalShardMetadata.getShardId())) + // Wait for the reindexing to complete before proceeding + .block(); + logger.info("Docs migrated"); + } logger.info("Updating the Documents Work Item to indicate it has been completed..."); CmsEntry.DocumentsWorkItem updatedEntry = new CmsEntry.DocumentsWorkItem( @@ -386,7 +395,7 @@ public void run() { CmsEntry.DocumentsWorkItemStatus.COMPLETED, workItem.leaseExpiry, workItem.numAttempts - ); + ); members.cmsWorkEntry = Optional.of(members.cmsClient.updateDocumentsWorkItemForceful(updatedEntry)); logger.info("Documents Work Item updated"); @@ -506,6 +515,12 @@ public WorkerStep nextStep() { } } + public static class ShardTooLarge extends RfsException { + public ShardTooLarge(long shardSizeBytes, long maxShardSize) { + super("The shard size of " + shardSizeBytes + " bytes exceeds the maximum shard size of " + maxShardSize + " bytes"); + } + } + public static class DocumentsMigrationFailed extends RfsException { public DocumentsMigrationFailed(String message) { super("The Documents Migration has failed. Reason: " + message); diff --git a/RFS/src/test/java/com/rfs/framework/SimpleRestoreFromSnapshot_ES_7_10.java b/RFS/src/test/java/com/rfs/framework/SimpleRestoreFromSnapshot_ES_7_10.java index 37c98b35d..dedc3d8ae 100644 --- a/RFS/src/test/java/com/rfs/framework/SimpleRestoreFromSnapshot_ES_7_10.java +++ b/RFS/src/test/java/com/rfs/framework/SimpleRestoreFromSnapshot_ES_7_10.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.IOUtils; +import com.rfs.common.DefaultSourceRepoAccessor; import com.rfs.common.DocumentReindexer; import com.rfs.common.FileSystemRepo; import com.rfs.common.IndexMetadata; @@ -45,8 +46,9 @@ public List extractSnapshotIndexData(final String localPath, for (final IndexMetadata.Data index : indices) { for (int shardId = 0; shardId < index.getNumberOfShards(); shardId++) { var shardMetadata = new ShardMetadataFactory_ES_7_10(snapShotProvider).fromRepo(snapshotName, index.getName(), shardId); - SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repo, unpackedShardDataDir, Integer.MAX_VALUE); - unpacker.unpack(shardMetadata); + DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(repo); + SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repoAccessor, unpackedShardDataDir, shardMetadata, Integer.MAX_VALUE); + unpacker.unpack(); } } return indices; diff --git a/RFS/src/test/java/com/rfs/worker/DocumentsRunnerTest.java b/RFS/src/test/java/com/rfs/worker/DocumentsRunnerTest.java index 3729bc23e..1f50b176e 100644 --- a/RFS/src/test/java/com/rfs/worker/DocumentsRunnerTest.java +++ b/RFS/src/test/java/com/rfs/worker/DocumentsRunnerTest.java @@ -25,10 +25,11 @@ void run_encountersAnException_asExpected() { GlobalState globalState = Mockito.mock(GlobalState.class); CmsClient cmsClient = Mockito.mock(CmsClient.class); String snapshotName = "testSnapshot"; + long maxShardSizeBytes = 50 * 1024 * 1024 * 1024L; IndexMetadata.Factory metadataFactory = Mockito.mock(IndexMetadata.Factory.class); ShardMetadata.Factory shardMetadataFactory = Mockito.mock(ShardMetadata.Factory.class); - SnapshotShardUnpacker unpacker = Mockito.mock(SnapshotShardUnpacker.class); + SnapshotShardUnpacker.Factory unpackerFactory = Mockito.mock(SnapshotShardUnpacker.Factory.class); LuceneDocumentsReader reader = Mockito.mock(LuceneDocumentsReader.class); DocumentReindexer reindexer = Mockito.mock(DocumentReindexer.class); RfsException testException = new RfsException("Unit test"); @@ -37,7 +38,7 @@ void run_encountersAnException_asExpected() { when(globalState.getPhase()).thenReturn(GlobalState.Phase.DOCUMENTS_IN_PROGRESS); // Run the test - DocumentsRunner testRunner = new DocumentsRunner(globalState, cmsClient, snapshotName, metadataFactory, shardMetadataFactory, unpacker, reader, reindexer); + DocumentsRunner testRunner = new DocumentsRunner(globalState, cmsClient, snapshotName, maxShardSizeBytes, metadataFactory, shardMetadataFactory, unpackerFactory, reader, reindexer); final var e = assertThrows(DocumentsRunner.DocumentsMigrationPhaseFailed.class, () -> testRunner.run()); // Verify the results diff --git a/RFS/src/test/java/com/rfs/worker/DocumentsStepTest.java b/RFS/src/test/java/com/rfs/worker/DocumentsStepTest.java index 5ffa034fb..c86bca9ac 100644 --- a/RFS/src/test/java/com/rfs/worker/DocumentsStepTest.java +++ b/RFS/src/test/java/com/rfs/worker/DocumentsStepTest.java @@ -45,19 +45,25 @@ @ExtendWith(MockitoExtension.class) public class DocumentsStepTest { private SharedMembers testMembers; + private SnapshotShardUnpacker unpacker; @BeforeEach void setUp() { GlobalState globalState = Mockito.mock(GlobalState.class); CmsClient cmsClient = Mockito.mock(CmsClient.class); String snapshotName = "test"; + long maxShardSizeBytes = 50 * 1024 * 1024 * 1024L; IndexMetadata.Factory metadataFactory = Mockito.mock(IndexMetadata.Factory.class); ShardMetadata.Factory shardMetadataFactory = Mockito.mock(ShardMetadata.Factory.class); - SnapshotShardUnpacker unpacker = Mockito.mock(SnapshotShardUnpacker.class); + + unpacker = Mockito.mock(SnapshotShardUnpacker.class); + SnapshotShardUnpacker.Factory unpackerFactory = Mockito.mock(SnapshotShardUnpacker.Factory.class); + lenient().when(unpackerFactory.create(any())).thenReturn(unpacker); + LuceneDocumentsReader reader = Mockito.mock(LuceneDocumentsReader.class); DocumentReindexer reindexer = Mockito.mock(DocumentReindexer.class); - testMembers = new SharedMembers(globalState, cmsClient, snapshotName, metadataFactory, shardMetadataFactory, unpacker, reader, reindexer); + testMembers = new SharedMembers(globalState, cmsClient, snapshotName, maxShardSizeBytes, metadataFactory, shardMetadataFactory, unpackerFactory, reader, reindexer); } @Test @@ -524,6 +530,8 @@ void MigrateDocuments_workToDo_AsExpected(CmsEntry.DocumentsWorkItem workItem, C // Check the results Mockito.verify(testMembers.reindexer, times(1)).reindex(workItem.indexName, documents); + Mockito.verify(testMembers.unpackerFactory, times(1)).create(shardMetadata); + Mockito.verify(unpacker, times(1)).close(); Mockito.verify(testMembers.cmsClient, times(1)).updateDocumentsWorkItemForceful(updatedItem); assertEquals(DocumentsStep.GetDocumentsToMigrate.class, nextStep.getClass()); } @@ -557,7 +565,37 @@ void MigrateDocuments_failedItem_AsExpected() { // Check the results Mockito.verify(testMembers.reindexer, times(1)).reindex(workItem.indexName, documents); + Mockito.verify(testMembers.unpackerFactory, times(1)).create(shardMetadata); + Mockito.verify(unpacker, times(1)).close(); + Mockito.verify(testMembers.cmsClient, times(1)).updateDocumentsWorkItem(updatedItem, workItem); + assertEquals(DocumentsStep.GetDocumentsToMigrate.class, nextStep.getClass()); + } + + @Test + void MigrateDocuments_largeShard_AsExpected() { + // Set up the test + CmsEntry.DocumentsWorkItem workItem = new CmsEntry.DocumentsWorkItem( + "index1", 0, CmsEntry.DocumentsWorkItemStatus.NOT_STARTED, "42", 1 + ); + testMembers.cmsWorkEntry = Optional.of(workItem); + + CmsEntry.DocumentsWorkItem updatedItem = new CmsEntry.DocumentsWorkItem( + "index1", 0, CmsEntry.DocumentsWorkItemStatus.NOT_STARTED, "42", 2 + ); + + ShardMetadata.Data shardMetadata = Mockito.mock(ShardMetadata.Data.class); + Mockito.when(shardMetadata.getTotalSizeBytes()).thenReturn(testMembers.maxShardSizeBytes + 1); + Mockito.when(testMembers.shardMetadataFactory.fromRepo(testMembers.snapshotName, workItem.indexName, workItem.shardId)).thenReturn(shardMetadata); + + // Run the test + DocumentsStep.MigrateDocuments testStep = new DocumentsStep.MigrateDocuments(testMembers); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results Mockito.verify(testMembers.cmsClient, times(1)).updateDocumentsWorkItem(updatedItem, workItem); + Mockito.verify(testMembers.unpackerFactory, times(0)).create(shardMetadata); + Mockito.verify(unpacker, times(0)).close(); assertEquals(DocumentsStep.GetDocumentsToMigrate.class, nextStep.getClass()); }