Skip to content

Commit

Permalink
Updated RFS to filter out system indices by default
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Helma <chelma+github@amazon.com>
  • Loading branch information
chelma committed Jun 25, 2024
1 parent 179aef1 commit c44ee89
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Clock;
import java.util.List;
import java.util.UUID;
import java.util.function.Function;

Expand Down Expand Up @@ -81,6 +82,10 @@ public static class Args {
description = "Optional. The target password; if not provided, will assume no auth on target")
public String targetPass = null;

@Parameter(names = {"--index-allowlist"}, description = ("Optional. List of index names to migrate"
+ " (e.g. 'logs_2024_01, logs_2024_02'). Default: all non-system indices (e.g. those not starting with '.')"), required = false)
public List<String> indexAllowlist = List.of();

@Parameter(names = {"--max-shard-size-bytes"}, description = ("Optional. The maximum shard size, in bytes, to allow when"
+ " performing the document migration. Useful for preventing disk overflow. Default: 50 * 1024 * 1024 * 1024 (50 GB)"), required = false)
public long maxShardSizeBytes = 50 * 1024 * 1024 * 1024L;
Expand Down Expand Up @@ -126,7 +131,7 @@ public static void main(String[] args) throws Exception {
luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);

run(LuceneDocumentsReader::new, reindexer, workCoordinator, processManager, indexMetadataFactory,
arguments.snapshotName, shardMetadataFactory, unpackerFactory, arguments.maxShardSizeBytes);
arguments.snapshotName, arguments.indexAllowlist, shardMetadataFactory, unpackerFactory, arguments.maxShardSizeBytes);
});
}

Expand All @@ -136,12 +141,13 @@ public static DocumentsRunner.CompletionStatus run(Function<Path,LuceneDocuments
LeaseExpireTrigger leaseExpireTrigger,
IndexMetadata.Factory indexMetadataFactory,
String snapshotName,
List<String> indexAllowlist,
ShardMetadata.Factory shardMetadataFactory,
SnapshotShardUnpacker.Factory unpackerFactory,
long maxShardSizeBytes)
throws IOException, InterruptedException, NoWorkLeftException {
var scopedWorkCoordinator = new ScopedWorkCoordinator(workCoordinator, leaseExpireTrigger);
confirmShardPrepIsComplete(indexMetadataFactory, snapshotName, scopedWorkCoordinator);
confirmShardPrepIsComplete(indexMetadataFactory, snapshotName, indexAllowlist, scopedWorkCoordinator);
if (!workCoordinator.workItemsArePending()) {
throw new NoWorkLeftException("No work items are pending/all work items have been processed. Returning.");
}
Expand All @@ -159,6 +165,7 @@ public static DocumentsRunner.CompletionStatus run(Function<Path,LuceneDocuments

private static void confirmShardPrepIsComplete(IndexMetadata.Factory indexMetadataFactory,
String snapshotName,
List<String> indexAllowlist,
ScopedWorkCoordinator scopedWorkCoordinator)
throws IOException, InterruptedException
{
Expand All @@ -168,7 +175,7 @@ private static void confirmShardPrepIsComplete(IndexMetadata.Factory indexMetada
long lockRenegotiationMillis = 1000;
for (int shardSetupAttemptNumber=0; ; ++shardSetupAttemptNumber) {
try {
new ShardWorkPreparer().run(scopedWorkCoordinator, indexMetadataFactory, snapshotName);
new ShardWorkPreparer().run(scopedWorkCoordinator, indexMetadataFactory, snapshotName, indexAllowlist);
return;
} catch (IWorkCoordinator.LeaseLockHeldElsewhereException e) {
long finalLockRenegotiationMillis = lockRenegotiationMillis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public void test(String sourceImageName, String targetImageName, int numWorkers)
osTargetContainer.start();

final var SNAPSHOT_NAME = "test_snapshot";
final List<String> INDEX_ALLOWLIST = List.of();
CreateSnapshot.run(
c -> new FileSystemSnapshotCreator(SNAPSHOT_NAME, c, ElasticsearchContainer.CLUSTER_SNAPSHOT_DIR),
new OpenSearchClient(esSourceContainer.getUrl(), null));
Expand All @@ -106,13 +107,13 @@ public void test(String sourceImageName, String targetImageName, int numWorkers)

var targetClient = new OpenSearchClient(osTargetContainer.getHttpHostAddress(), null);
var sourceRepo = new FileSystemRepo(tempDir);
migrateMetadata(sourceRepo, targetClient, SNAPSHOT_NAME);
migrateMetadata(sourceRepo, targetClient, SNAPSHOT_NAME, INDEX_ALLOWLIST);

var workerFutures = new ArrayList<CompletableFuture<Void>>();
var runCounter = new AtomicInteger();
for (int i = 0; i < numWorkers; ++i) {
workerFutures.add(CompletableFuture.supplyAsync(() ->
migrateDocumentsSequentially(sourceRepo, SNAPSHOT_NAME,
migrateDocumentsSequentially(sourceRepo, SNAPSHOT_NAME, INDEX_ALLOWLIST,
osTargetContainer.getHttpHostAddress(), runCounter)));
}
var thrownException = Assertions.assertThrows(ExecutionException.class, () ->
Expand Down Expand Up @@ -176,11 +177,12 @@ private void checkClusterMigrationOnFinished(ElasticsearchContainer esSourceCont
@SneakyThrows
private Void migrateDocumentsSequentially(FileSystemRepo sourceRepo,
String snapshotName,
List<String> indexAllowlist,
String targetAddress,
AtomicInteger runCounter) {
for (int runNumber=0; ; ++runNumber) {
try {
var workResult = migrateDocumentsWithOneWorker(sourceRepo, snapshotName, targetAddress);
var workResult = migrateDocumentsWithOneWorker(sourceRepo, snapshotName, indexAllowlist, targetAddress);
if (workResult == DocumentsRunner.CompletionStatus.NOTHING_DONE) {
return null;
} else {
Expand All @@ -197,7 +199,7 @@ private Void migrateDocumentsSequentially(FileSystemRepo sourceRepo,
}
}

private static void migrateMetadata(SourceRepo sourceRepo, OpenSearchClient targetClient, String snapshotName) {
private static void migrateMetadata(SourceRepo sourceRepo, OpenSearchClient targetClient, String snapshotName, List<String> indexAllowlist) {
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);
GlobalMetadata.Factory metadataFactory = new GlobalMetadataFactory_ES_7_10(repoDataProvider);
GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(targetClient,
Expand All @@ -208,7 +210,7 @@ private static void migrateMetadata(SourceRepo sourceRepo, OpenSearchClient targ

IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient);
new IndexRunner(snapshotName, indexMetadataFactory, indexCreator, transformer).migrateIndices();
new IndexRunner(snapshotName, indexMetadataFactory, indexCreator, transformer, indexAllowlist).migrateIndices();
}

private static class FilteredLuceneDocumentsReader extends LuceneDocumentsReader {
Expand All @@ -230,6 +232,7 @@ static class LeasePastError extends Error { }
@SneakyThrows
private DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(SourceRepo sourceRepo,
String snapshotName,
List<String> indexAllowlist,
String targetAddress)
throws RfsMigrateDocuments.NoWorkLeftException
{
Expand Down Expand Up @@ -265,6 +268,7 @@ private DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(SourceRep
processManager,
indexMetadataFactory,
snapshotName,
indexAllowlist,
shardMetadataFactory,
unpackerFactory,
16*1024*1024);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static class Args {
public String targetPass = null;

@Parameter(names = {"--index-allowlist"}, description = ("Optional. List of index names to migrate"
+ " (e.g. 'logs_2024_01, logs_2024_02'). Default: all indices"), required = false)
+ " (e.g. 'logs_2024_01, logs_2024_02'). Default: all non-system indices (e.g. those not starting with '.')"), required = false)
public List<String> indexAllowlist = List.of();

@Parameter(names = {"--index-template-allowlist"}, description = ("Optional. List of index template names to migrate"
Expand Down Expand Up @@ -103,6 +103,7 @@ public static void main(String[] args) throws Exception {
final String targetHost = arguments.targetHost;
final String targetUser = arguments.targetUser;
final String targetPass = arguments.targetPass;
final List<String> indexAllowlist = arguments.indexAllowlist;
final List<String> indexTemplateAllowlist = arguments.indexTemplateAllowlist;
final List<String> componentTemplateAllowlist = arguments.componentTemplateAllowlist;
final int awarenessDimensionality = arguments.minNumberOfReplicas + 1;
Expand All @@ -126,7 +127,7 @@ public static void main(String[] args) throws Exception {

final IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
final IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient);
new IndexRunner(snapshotName, indexMetadataFactory, indexCreator, transformer).migrateIndices();
new IndexRunner(snapshotName, indexMetadataFactory, indexCreator, transformer, indexAllowlist).migrateIndices();
});
}
}
7 changes: 4 additions & 3 deletions RFS/src/main/java/com/rfs/RunRfsWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ public static void main(String[] args) throws Exception {
final String targetHost = arguments.targetHost;
final String targetUser = arguments.targetUser;
final String targetPass = arguments.targetPass;
final List<String> indexTemplateAllowlist = arguments.indexAllowlist;
final List<String> indexAllowlist = arguments.indexAllowlist;
final List<String> indexTemplateAllowlist = arguments.indexTemplateAllowlist;
final List<String> componentTemplateAllowlist = arguments.componentTemplateAllowlist;
final long maxShardSizeBytes = arguments.maxShardSizeBytes;
final int awarenessDimensionality = arguments.minNumberOfReplicas + 1;
Expand Down Expand Up @@ -160,7 +161,7 @@ public static void main(String[] args) throws Exception {

IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient);
new IndexRunner(snapshotName, indexMetadataFactory, indexCreator, transformer).migrateIndices();
new IndexRunner(snapshotName, indexMetadataFactory, indexCreator, transformer, indexAllowlist).migrateIndices();

ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
Expand All @@ -174,7 +175,7 @@ public static void main(String[] args) throws Exception {
var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(targetHost)),
5, UUID.randomUUID().toString());
var scopedWorkCoordinator = new ScopedWorkCoordinator(workCoordinator, processManager);
new ShardWorkPreparer().run(scopedWorkCoordinator, indexMetadataFactory, snapshotName);
new ShardWorkPreparer().run(scopedWorkCoordinator, indexMetadataFactory, snapshotName, indexAllowlist);
new DocumentsRunner(scopedWorkCoordinator,
(name,shard) -> shardMetadataFactory.fromRepo(snapshotName,name,shard),
unpackerFactory,
Expand Down
23 changes: 23 additions & 0 deletions RFS/src/main/java/com/rfs/common/FilterScheme.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.rfs.common;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Predicate;

public class FilterScheme {

public static Predicate<SnapshotRepo.Index> filterIndicesByAllowList(List<String> indexAllowlist, BiConsumer<String, Boolean> indexNameAcceptanceObserver) {
return index -> {
boolean accepted;
if (indexAllowlist.isEmpty()) {
accepted = !index.getName().startsWith(".");
} else {
accepted = indexAllowlist.contains(index.getName());
}

indexNameAcceptanceObserver.accept(index.getName(), accepted);

return accepted;
};
}
}
33 changes: 22 additions & 11 deletions RFS/src/main/java/com/rfs/worker/IndexRunner.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.rfs.worker;

import java.util.Optional;
import java.util.List;
import java.util.function.BiConsumer;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.rfs.common.SnapshotRepo;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import com.rfs.common.FilterScheme;
import com.rfs.common.IndexMetadata;
import com.rfs.transformers.Transformer;
import com.rfs.version_os_2_11.IndexCreator_OS_2_11;
Expand All @@ -19,18 +20,28 @@ public class IndexRunner {
private final IndexMetadata.Factory metadataFactory;
private final IndexCreator_OS_2_11 indexCreator;
private final Transformer transformer;
private final List<String> indexAllowlist;

public void migrateIndices() {
SnapshotRepo.Provider repoDataProvider = metadataFactory.getRepoDataProvider();
// TODO - parallelize this, maybe ~400-1K requests per thread and do it asynchronously
for (SnapshotRepo.Index index : repoDataProvider.getIndicesInSnapshot(snapshotName)) {
var indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName());
var root = indexMetadata.toObjectNode();
var transformedRoot = transformer.transformIndexMetadata(root);
var resultOp = indexCreator.create(transformedRoot, index.getName(), indexMetadata.getId());
resultOp.ifPresentOrElse(value -> log.info("Index " + index.getName() + " created successfully"),
() -> log.info("Index " + index.getName() + " already existed; no work required")
);
}

BiConsumer<String, Boolean> logger = (indexName, accepted) -> {
if (!accepted) {
log.info("Index " + indexName + " rejected by allowlist");
}
};
repoDataProvider.getIndicesInSnapshot(snapshotName).stream()
.filter(FilterScheme.filterIndicesByAllowList(indexAllowlist, logger))
.peek(index -> {
var indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName());
var root = indexMetadata.toObjectNode();
var transformedRoot = transformer.transformIndexMetadata(root);
var resultOp = indexCreator.create(transformedRoot, index.getName(), indexMetadata.getId());
resultOp.ifPresentOrElse(value -> log.info("Index " + index.getName() + " created successfully"),
() -> log.info("Index " + index.getName() + " already existed; no work required")
);
})
.count(); // Force the stream to execute
}
}
40 changes: 29 additions & 11 deletions RFS/src/main/java/com/rfs/worker/ShardWorkPreparer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.rfs.cms.IWorkCoordinator;
import com.rfs.cms.ScopedWorkCoordinator;
import com.rfs.common.FilterScheme;
import com.rfs.common.IndexMetadata;
import com.rfs.common.SnapshotRepo;
import lombok.Lombok;
Expand All @@ -10,6 +11,9 @@

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.stream.IntStream;

/**
* This class adds workitemes (leasable mutexes) via the WorkCoordinator so that future
Expand All @@ -22,7 +26,7 @@ public class ShardWorkPreparer {
public static final String SHARD_SETUP_WORK_ITEM_ID = "shard_setup";

public void run(ScopedWorkCoordinator scopedWorkCoordinator, IndexMetadata.Factory metadataFactory,
String snapshotName)
String snapshotName, List<String> indexAllowlist)
throws IOException, InterruptedException {

// ensure that there IS an index to house the shared state that we're going to be manipulating
Expand All @@ -44,7 +48,7 @@ public Void onAlreadyCompleted() throws IOException {

@Override
public Void onAcquiredWork(IWorkCoordinator.WorkItemAndDuration workItem) throws IOException {
prepareShardWorkItems(scopedWorkCoordinator.workCoordinator, metadataFactory, snapshotName);
prepareShardWorkItems(scopedWorkCoordinator.workCoordinator, metadataFactory, snapshotName, indexAllowlist);
return null;
}

Expand All @@ -56,18 +60,32 @@ public Void onNoAvailableWorkToBeDone() throws IOException {
}

@SneakyThrows
private static void prepareShardWorkItems(IWorkCoordinator workCoordinator,
IndexMetadata.Factory metadataFactory, String snapshotName) {
private static void prepareShardWorkItems(IWorkCoordinator workCoordinator, IndexMetadata.Factory metadataFactory,
String snapshotName, List<String> indexAllowlist) {
log.info("Setting up the Documents Work Items...");
SnapshotRepo.Provider repoDataProvider = metadataFactory.getRepoDataProvider();
for (SnapshotRepo.Index index : repoDataProvider.getIndicesInSnapshot(snapshotName)) {
IndexMetadata.Data indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName());
log.info("Index " + indexMetadata.getName() + " has " + indexMetadata.getNumberOfShards() + " shards");
for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) {
log.info("Creating Documents Work Item for index: " + indexMetadata.getName() + ", shard: " + shardId);
workCoordinator.createUnassignedWorkItem(IndexAndShard.formatAsWorkItemString(indexMetadata.getName(), shardId));

BiConsumer<String, Boolean> logger = (indexName, accepted) -> {
if (!accepted) {
log.info("Index " + indexName + " rejected by allowlist");
}
}
};
repoDataProvider.getIndicesInSnapshot(snapshotName).stream()
.filter(FilterScheme.filterIndicesByAllowList(indexAllowlist, logger))
.peek(index -> {
IndexMetadata.Data indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName());
log.info("Index " + indexMetadata.getName() + " has " + indexMetadata.getNumberOfShards() + " shards");
IntStream.range(0, indexMetadata.getNumberOfShards()).forEach(shardId -> {
log.info("Creating Documents Work Item for index: " + indexMetadata.getName() + ", shard: " + shardId);
try {
workCoordinator.createUnassignedWorkItem(IndexAndShard.formatAsWorkItemString(indexMetadata.getName(), shardId));
} catch (IOException e) {
throw Lombok.sneakyThrow(e);
}
});
})
.count(); // Force the stream to execute

log.info("Finished setting up the Documents Work Items.");
}
}

0 comments on commit c44ee89

Please sign in to comment.