Skip to content

Commit

Permalink
Updated RFS to filter out system indices by default (opensearch-proje…
Browse files Browse the repository at this point in the history
…ct#763)

Signed-off-by: Chris Helma <chelma+github@amazon.com>
  • Loading branch information
chelma authored Jun 25, 2024
1 parent 93fd822 commit 9b3c190
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Clock;
import java.util.List;
import java.util.UUID;
import java.util.function.Function;

Expand Down Expand Up @@ -81,6 +82,10 @@ public static class Args {
description = "Optional. The target password; if not provided, will assume no auth on target")
public String targetPass = null;

@Parameter(names = {"--index-allowlist"}, description = ("Optional. List of index names to migrate"
+ " (e.g. 'logs_2024_01, logs_2024_02'). Default: all non-system indices (e.g. those not starting with '.')"), required = false)
public List<String> indexAllowlist = List.of();

@Parameter(names = {"--max-shard-size-bytes"}, description = ("Optional. The maximum shard size, in bytes, to allow when"
+ " performing the document migration. Useful for preventing disk overflow. Default: 50 * 1024 * 1024 * 1024 (50 GB)"), required = false)
public long maxShardSizeBytes = 50 * 1024 * 1024 * 1024L;
Expand Down Expand Up @@ -126,7 +131,7 @@ public static void main(String[] args) throws Exception {
luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);

run(LuceneDocumentsReader::new, reindexer, workCoordinator, processManager, indexMetadataFactory,
arguments.snapshotName, shardMetadataFactory, unpackerFactory, arguments.maxShardSizeBytes);
arguments.snapshotName, arguments.indexAllowlist, shardMetadataFactory, unpackerFactory, arguments.maxShardSizeBytes);
});
}

Expand All @@ -136,12 +141,13 @@ public static DocumentsRunner.CompletionStatus run(Function<Path,LuceneDocuments
LeaseExpireTrigger leaseExpireTrigger,
IndexMetadata.Factory indexMetadataFactory,
String snapshotName,
List<String> indexAllowlist,
ShardMetadata.Factory shardMetadataFactory,
SnapshotShardUnpacker.Factory unpackerFactory,
long maxShardSizeBytes)
throws IOException, InterruptedException, NoWorkLeftException {
var scopedWorkCoordinator = new ScopedWorkCoordinator(workCoordinator, leaseExpireTrigger);
confirmShardPrepIsComplete(indexMetadataFactory, snapshotName, scopedWorkCoordinator);
confirmShardPrepIsComplete(indexMetadataFactory, snapshotName, indexAllowlist, scopedWorkCoordinator);
if (!workCoordinator.workItemsArePending()) {
throw new NoWorkLeftException("No work items are pending/all work items have been processed. Returning.");
}
Expand All @@ -159,6 +165,7 @@ public static DocumentsRunner.CompletionStatus run(Function<Path,LuceneDocuments

private static void confirmShardPrepIsComplete(IndexMetadata.Factory indexMetadataFactory,
String snapshotName,
List<String> indexAllowlist,
ScopedWorkCoordinator scopedWorkCoordinator)
throws IOException, InterruptedException
{
Expand All @@ -168,7 +175,7 @@ private static void confirmShardPrepIsComplete(IndexMetadata.Factory indexMetada
long lockRenegotiationMillis = 1000;
for (int shardSetupAttemptNumber=0; ; ++shardSetupAttemptNumber) {
try {
new ShardWorkPreparer().run(scopedWorkCoordinator, indexMetadataFactory, snapshotName);
new ShardWorkPreparer().run(scopedWorkCoordinator, indexMetadataFactory, snapshotName, indexAllowlist);
return;
} catch (IWorkCoordinator.LeaseLockHeldElsewhereException e) {
long finalLockRenegotiationMillis = lockRenegotiationMillis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public void test(String sourceImageName, String targetImageName, int numWorkers)
osTargetContainer.start();

final var SNAPSHOT_NAME = "test_snapshot";
final List<String> INDEX_ALLOWLIST = List.of();
CreateSnapshot.run(
c -> new FileSystemSnapshotCreator(SNAPSHOT_NAME, c, ElasticsearchContainer.CLUSTER_SNAPSHOT_DIR),
new OpenSearchClient(esSourceContainer.getUrl(), null));
Expand All @@ -106,13 +107,13 @@ public void test(String sourceImageName, String targetImageName, int numWorkers)

var targetClient = new OpenSearchClient(osTargetContainer.getHttpHostAddress(), null);
var sourceRepo = new FileSystemRepo(tempDir);
migrateMetadata(sourceRepo, targetClient, SNAPSHOT_NAME);
migrateMetadata(sourceRepo, targetClient, SNAPSHOT_NAME, INDEX_ALLOWLIST);

var workerFutures = new ArrayList<CompletableFuture<Void>>();
var runCounter = new AtomicInteger();
for (int i = 0; i < numWorkers; ++i) {
workerFutures.add(CompletableFuture.supplyAsync(() ->
migrateDocumentsSequentially(sourceRepo, SNAPSHOT_NAME,
migrateDocumentsSequentially(sourceRepo, SNAPSHOT_NAME, INDEX_ALLOWLIST,
osTargetContainer.getHttpHostAddress(), runCounter)));
}
var thrownException = Assertions.assertThrows(ExecutionException.class, () ->
Expand Down Expand Up @@ -176,11 +177,12 @@ private void checkClusterMigrationOnFinished(ElasticsearchContainer esSourceCont
@SneakyThrows
private Void migrateDocumentsSequentially(FileSystemRepo sourceRepo,
String snapshotName,
List<String> indexAllowlist,
String targetAddress,
AtomicInteger runCounter) {
for (int runNumber=0; ; ++runNumber) {
try {
var workResult = migrateDocumentsWithOneWorker(sourceRepo, snapshotName, targetAddress);
var workResult = migrateDocumentsWithOneWorker(sourceRepo, snapshotName, indexAllowlist, targetAddress);
if (workResult == DocumentsRunner.CompletionStatus.NOTHING_DONE) {
return null;
} else {
Expand All @@ -197,7 +199,7 @@ private Void migrateDocumentsSequentially(FileSystemRepo sourceRepo,
}
}

private static void migrateMetadata(SourceRepo sourceRepo, OpenSearchClient targetClient, String snapshotName) {
private static void migrateMetadata(SourceRepo sourceRepo, OpenSearchClient targetClient, String snapshotName, List<String> indexAllowlist) {
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);
GlobalMetadata.Factory metadataFactory = new GlobalMetadataFactory_ES_7_10(repoDataProvider);
GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(targetClient,
Expand All @@ -208,7 +210,7 @@ private static void migrateMetadata(SourceRepo sourceRepo, OpenSearchClient targ

IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient);
new IndexRunner(snapshotName, indexMetadataFactory, indexCreator, transformer).migrateIndices();
new IndexRunner(snapshotName, indexMetadataFactory, indexCreator, transformer, indexAllowlist).migrateIndices();
}

private static class FilteredLuceneDocumentsReader extends LuceneDocumentsReader {
Expand All @@ -230,6 +232,7 @@ static class LeasePastError extends Error { }
@SneakyThrows
private DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(SourceRepo sourceRepo,
String snapshotName,
List<String> indexAllowlist,
String targetAddress)
throws RfsMigrateDocuments.NoWorkLeftException
{
Expand Down Expand Up @@ -265,6 +268,7 @@ private DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(SourceRep
processManager,
indexMetadataFactory,
snapshotName,
indexAllowlist,
shardMetadataFactory,
unpackerFactory,
16*1024*1024);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public static class Args {
public boolean targetInsecure = false;

@Parameter(names = {"--index-allowlist"}, description = ("Optional. List of index names to migrate"
+ " (e.g. 'logs_2024_01, logs_2024_02'). Default: all indices"), required = false)
+ " (e.g. 'logs_2024_01, logs_2024_02'). Default: all non-system indices (e.g. those not starting with '.')"), required = false)
public List<String> indexAllowlist = List.of();

@Parameter(names = {"--index-template-allowlist"}, description = ("Optional. List of index template names to migrate"
Expand Down Expand Up @@ -106,6 +106,7 @@ public static void main(String[] args) throws Exception {
final String targetHost = arguments.targetHost;
final String targetUser = arguments.targetUser;
final String targetPass = arguments.targetPass;
final List<String> indexAllowlist = arguments.indexAllowlist;
final boolean targetInsecure = arguments.targetInsecure;
final List<String> indexTemplateAllowlist = arguments.indexTemplateAllowlist;
final List<String> componentTemplateAllowlist = arguments.componentTemplateAllowlist;
Expand All @@ -114,9 +115,6 @@ public static void main(String[] args) throws Exception {
final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass, targetInsecure);





TryHandlePhaseFailure.executeWithTryCatch(() -> {
log.info("Running RfsWorker");
OpenSearchClient targetClient = new OpenSearchClient(targetConnection);
Expand All @@ -132,7 +130,7 @@ public static void main(String[] args) throws Exception {

final IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
final IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient);
new IndexRunner(snapshotName, indexMetadataFactory, indexCreator, transformer).migrateIndices();
new IndexRunner(snapshotName, indexMetadataFactory, indexCreator, transformer, indexAllowlist).migrateIndices();
});
}
}
7 changes: 4 additions & 3 deletions RFS/src/main/java/com/rfs/RunRfsWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ public static void main(String[] args) throws Exception {
final String targetHost = arguments.targetHost;
final String targetUser = arguments.targetUser;
final String targetPass = arguments.targetPass;
final List<String> indexTemplateAllowlist = arguments.indexAllowlist;
final List<String> indexAllowlist = arguments.indexAllowlist;
final List<String> indexTemplateAllowlist = arguments.indexTemplateAllowlist;
final List<String> componentTemplateAllowlist = arguments.componentTemplateAllowlist;
final long maxShardSizeBytes = arguments.maxShardSizeBytes;
final int awarenessDimensionality = arguments.minNumberOfReplicas + 1;
Expand Down Expand Up @@ -160,7 +161,7 @@ public static void main(String[] args) throws Exception {

IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient);
new IndexRunner(snapshotName, indexMetadataFactory, indexCreator, transformer).migrateIndices();
new IndexRunner(snapshotName, indexMetadataFactory, indexCreator, transformer, indexAllowlist).migrateIndices();

ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
Expand All @@ -174,7 +175,7 @@ public static void main(String[] args) throws Exception {
var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(targetHost)),
5, UUID.randomUUID().toString());
var scopedWorkCoordinator = new ScopedWorkCoordinator(workCoordinator, processManager);
new ShardWorkPreparer().run(scopedWorkCoordinator, indexMetadataFactory, snapshotName);
new ShardWorkPreparer().run(scopedWorkCoordinator, indexMetadataFactory, snapshotName, indexAllowlist);
new DocumentsRunner(scopedWorkCoordinator,
(name,shard) -> shardMetadataFactory.fromRepo(snapshotName,name,shard),
unpackerFactory,
Expand Down
23 changes: 23 additions & 0 deletions RFS/src/main/java/com/rfs/common/FilterScheme.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.rfs.common;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Predicate;

public class FilterScheme {

public static Predicate<SnapshotRepo.Index> filterIndicesByAllowList(List<String> indexAllowlist, BiConsumer<String, Boolean> indexNameAcceptanceObserver) {
return index -> {
boolean accepted;
if (indexAllowlist.isEmpty()) {
accepted = !index.getName().startsWith(".");
} else {
accepted = indexAllowlist.contains(index.getName());
}

indexNameAcceptanceObserver.accept(index.getName(), accepted);

return accepted;
};
}
}
33 changes: 22 additions & 11 deletions RFS/src/main/java/com/rfs/worker/IndexRunner.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.rfs.worker;

import java.util.Optional;
import java.util.List;
import java.util.function.BiConsumer;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.rfs.common.SnapshotRepo;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import com.rfs.common.FilterScheme;
import com.rfs.common.IndexMetadata;
import com.rfs.transformers.Transformer;
import com.rfs.version_os_2_11.IndexCreator_OS_2_11;
Expand All @@ -19,18 +20,28 @@ public class IndexRunner {
private final IndexMetadata.Factory metadataFactory;
private final IndexCreator_OS_2_11 indexCreator;
private final Transformer transformer;
private final List<String> indexAllowlist;

public void migrateIndices() {
SnapshotRepo.Provider repoDataProvider = metadataFactory.getRepoDataProvider();
// TODO - parallelize this, maybe ~400-1K requests per thread and do it asynchronously
for (SnapshotRepo.Index index : repoDataProvider.getIndicesInSnapshot(snapshotName)) {
var indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName());
var root = indexMetadata.toObjectNode();
var transformedRoot = transformer.transformIndexMetadata(root);
var resultOp = indexCreator.create(transformedRoot, index.getName(), indexMetadata.getId());
resultOp.ifPresentOrElse(value -> log.info("Index " + index.getName() + " created successfully"),
() -> log.info("Index " + index.getName() + " already existed; no work required")
);
}

BiConsumer<String, Boolean> logger = (indexName, accepted) -> {
if (!accepted) {
log.info("Index " + indexName + " rejected by allowlist");
}
};
repoDataProvider.getIndicesInSnapshot(snapshotName).stream()
.filter(FilterScheme.filterIndicesByAllowList(indexAllowlist, logger))
.peek(index -> {
var indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName());
var root = indexMetadata.toObjectNode();
var transformedRoot = transformer.transformIndexMetadata(root);
var resultOp = indexCreator.create(transformedRoot, index.getName(), indexMetadata.getId());
resultOp.ifPresentOrElse(value -> log.info("Index " + index.getName() + " created successfully"),
() -> log.info("Index " + index.getName() + " already existed; no work required")
);
})
.count(); // Force the stream to execute
}
}
Loading

0 comments on commit 9b3c190

Please sign in to comment.