Skip to content

Commit

Permalink
Updated RFS Doc Migration to loop
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Helma <chelma+github@amazon.com>
  • Loading branch information
chelma committed Jun 25, 2024
1 parent 9b3c190 commit 27c2e95
Showing 1 changed file with 35 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
@Slf4j
public class RfsMigrateDocuments {
public static final int PROCESS_TIMED_OUT = 1;
public static final int SHARD_TOO_LARGE = 2;
public static final int GENERAL_EXCEPTION = 3;
public static final int TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 5;

public static class Args {
Expand Down Expand Up @@ -130,8 +132,19 @@ public static void main(String[] args) throws Exception {
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,
luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);

run(LuceneDocumentsReader::new, reindexer, workCoordinator, processManager, indexMetadataFactory,
arguments.snapshotName, arguments.indexAllowlist, shardMetadataFactory, unpackerFactory, arguments.maxShardSizeBytes);
try {
run(LuceneDocumentsReader::new, reindexer, workCoordinator, processManager, indexMetadataFactory,
arguments.snapshotName, arguments.indexAllowlist, shardMetadataFactory, unpackerFactory, arguments.maxShardSizeBytes);
} catch (NoWorkLeftException e) {
log.info("No work left to do. Exiting.");
System.exit(0);
} catch (DocumentsRunner.ShardTooLargeException e) {
log.error("Shard too large: " + e.getMessage());
System.exit(SHARD_TOO_LARGE);
} catch (Exception e) {
log.error("Error during migration: " + e.getMessage());
System.exit(GENERAL_EXCEPTION);
}
});
}

Expand All @@ -146,6 +159,26 @@ public static DocumentsRunner.CompletionStatus run(Function<Path,LuceneDocuments
SnapshotShardUnpacker.Factory unpackerFactory,
long maxShardSizeBytes)
throws IOException, InterruptedException, NoWorkLeftException {

// Will throw an exception when either there's no work to be done, or something goes wrong. Rely on this to
// break the loop.
while (true) {
return runOnce(readerFactory, reindexer, workCoordinator, leaseExpireTrigger, indexMetadataFactory,
snapshotName, indexAllowlist, shardMetadataFactory, unpackerFactory, maxShardSizeBytes);
}
}

public static DocumentsRunner.CompletionStatus runOnce(Function<Path,LuceneDocumentsReader> readerFactory,
DocumentReindexer reindexer,
IWorkCoordinator workCoordinator,
LeaseExpireTrigger leaseExpireTrigger,
IndexMetadata.Factory indexMetadataFactory,
String snapshotName,
List<String> indexAllowlist,
ShardMetadata.Factory shardMetadataFactory,
SnapshotShardUnpacker.Factory unpackerFactory,
long maxShardSizeBytes)
throws IOException, InterruptedException, NoWorkLeftException {
var scopedWorkCoordinator = new ScopedWorkCoordinator(workCoordinator, leaseExpireTrigger);
confirmShardPrepIsComplete(indexMetadataFactory, snapshotName, indexAllowlist, scopedWorkCoordinator);
if (!workCoordinator.workItemsArePending()) {
Expand Down

0 comments on commit 27c2e95

Please sign in to comment.