Skip to content

Commit

Permalink
Checkpoint: adding version-specification to doc migration
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Helma <chelma+github@amazon.com>
  • Loading branch information
chelma committed Aug 23, 2024
1 parent 5a33d59 commit 19ba404
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.rfs.cms.LeaseExpireTrigger;
import com.rfs.cms.OpenSearchWorkCoordinator;
import com.rfs.cms.ScopedWorkCoordinator;
import com.rfs.common.ClusterVersion;
import com.rfs.common.DefaultSourceRepoAccessor;
import com.rfs.common.DocumentReindexer;
import com.rfs.common.FileSystemRepo;
Expand All @@ -35,15 +36,13 @@
import com.rfs.common.SnapshotRepo;
import com.rfs.common.SnapshotShardUnpacker;
import com.rfs.common.SourceRepo;
import com.rfs.common.SourceResourceProvider;
import com.rfs.common.SourceResourceProviderFactory;
import com.rfs.common.TryHandlePhaseFailure;
import com.rfs.common.http.ConnectionContext;
import com.rfs.models.IndexMetadata;
import com.rfs.models.ShardMetadata;
import com.rfs.tracing.RootWorkCoordinationContext;
import com.rfs.version_es_7_10.ElasticsearchConstants_ES_7_10;
import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.ShardMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10;
import com.rfs.worker.DocumentsRunner;
import com.rfs.worker.ShardWorkPreparer;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -132,6 +131,11 @@ public static class Args {
description = "Optional. The maximum number of connections to simultaneously " +
"used to communicate to the target, default 10")
int maxConnections = 10;

@Parameter(names = { "--source-version" }, description = ("Optional. Version of the source cluster. Possible "
+ "values include: ES_6_8, ES_7_10, ES_7_17. Default: ES_7_10"), required = false,
converter = ClusterVersion.ArgsConverter.class)
public ClusterVersion sourceVersion = ClusterVersion.ES_7_10;
}

public static class NoWorkLeftException extends Exception {
Expand Down Expand Up @@ -212,20 +216,22 @@ public static void main(String[] args) throws Exception {
} else {
sourceRepo = new FileSystemRepo(snapshotLocalDirPath);
}
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);

IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);

SourceResourceProvider sourceResourceProvider = SourceResourceProviderFactory.getProvider(arguments.sourceVersion);

SnapshotRepo.Provider repoDataProvider = sourceResourceProvider.getSnapshotRepoProvider(sourceRepo);
IndexMetadata.Factory indexMetadataFactory = sourceResourceProvider.getIndexMetadataFactory(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = sourceResourceProvider.getShardMetadataFactory(repoDataProvider);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(
repoAccessor,
luceneDirPath,
ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES
sourceResourceProvider.getBufferSizeInBytes()
);

run(
LuceneDocumentsReader.getFactory(ElasticsearchConstants_ES_7_10.SOFT_DELETES_POSSIBLE,
ElasticsearchConstants_ES_7_10.SOFT_DELETES_FIELD),
LuceneDocumentsReader.getFactory(sourceResourceProvider.getSoftDeletesPossible(),
sourceResourceProvider.getSoftDeletesFieldData()),
reindexer,
workCoordinator,
arguments.initialLeaseDuration,
Expand Down
12 changes: 11 additions & 1 deletion RFS/src/main/java/com/rfs/common/ClusterVersion.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.rfs.common;

import java.util.ArrayList;
import java.util.List;

import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.ParameterException;

Expand All @@ -9,16 +12,23 @@
public enum ClusterVersion {
ES_6_8,
ES_7_10,
ES_7_17,
OS_2_11;

public static final List<ClusterVersion> SOURCE_VERSIONS = List.of(ES_6_8, ES_7_10, ES_7_17);
public static final List<ClusterVersion> TARGET_VERSIONS = List.of(OS_2_11);

public static class ArgsConverter implements IStringConverter<ClusterVersion> {
@Override
public ClusterVersion convert(String value) {
switch (value) {
String lowerCasedValue = value.toLowerCase();
switch (lowerCasedValue) {
case "es_6_8":
return ClusterVersion.ES_6_8;
case "es_7_10":
return ClusterVersion.ES_7_10;
case "es_7_17":
return ClusterVersion.ES_7_17;
case "os_2_11":
return ClusterVersion.OS_2_11;
default:
Expand Down
14 changes: 14 additions & 0 deletions RFS/src/main/java/com/rfs/common/SourceResourceProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.rfs.common;

import com.rfs.models.IndexMetadata;
import com.rfs.models.ShardMetadata;

public interface SourceResourceProvider {
SnapshotRepo.Provider getSnapshotRepoProvider(SourceRepo sourceRepo);
IndexMetadata.Factory getIndexMetadataFactory(SnapshotRepo.Provider repoDataProvider);
ShardMetadata.Factory getShardMetadataFactory(SnapshotRepo.Provider repoDataProvider);

int getBufferSizeInBytes();
boolean getSoftDeletesPossible();
String getSoftDeletesFieldData();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.rfs.common;

import com.rfs.version_es_6_8.SourceResourceProvider_ES_6_8;
import com.rfs.version_es_7_10.SourceResourceProvider_ES_7_10;

public class SourceResourceProviderFactory {
public static SourceResourceProvider getProvider(ClusterVersion version) {
switch (version) {
case ES_6_8:
return new SourceResourceProvider_ES_6_8();
case ES_7_10:
return new SourceResourceProvider_ES_7_10();
case ES_7_17:
// We don't currently distinguish between 7.10 and 7.17
return new SourceResourceProvider_ES_7_10();
default:
throw new IllegalArgumentException("Invalid version: " + version);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.rfs.version_es_6_8;

import com.rfs.common.SnapshotRepo;
import com.rfs.common.SourceRepo;
import com.rfs.common.SourceResourceProvider;
import com.rfs.models.IndexMetadata;
import com.rfs.models.ShardMetadata;

public class SourceResourceProvider_ES_6_8 implements SourceResourceProvider {
@Override
public SnapshotRepo.Provider getSnapshotRepoProvider(SourceRepo sourceRepo) {
return new SnapshotRepoProvider_ES_6_8(sourceRepo);
}

@Override
public IndexMetadata.Factory getIndexMetadataFactory(SnapshotRepo.Provider repoDataProvider) {
return new IndexMetadataFactory_ES_6_8(repoDataProvider);
}

@Override
public ShardMetadata.Factory getShardMetadataFactory(SnapshotRepo.Provider repoDataProvider) {
return new ShardMetadataFactory_ES_6_8(repoDataProvider);
}

@Override
public int getBufferSizeInBytes() {
return ElasticsearchConstants_ES_6_8.BUFFER_SIZE_IN_BYTES;
}

@Override
public boolean getSoftDeletesPossible() {
return ElasticsearchConstants_ES_6_8.SOFT_DELETES_POSSIBLE;
}

@Override
public String getSoftDeletesFieldData() {
return ElasticsearchConstants_ES_6_8.SOFT_DELETES_FIELD;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.rfs.version_es_7_10;

import com.rfs.common.SnapshotRepo;
import com.rfs.common.SourceRepo;
import com.rfs.common.SourceResourceProvider;
import com.rfs.models.IndexMetadata;
import com.rfs.models.ShardMetadata;

public class SourceResourceProvider_ES_7_10 implements SourceResourceProvider {
@Override
public SnapshotRepo.Provider getSnapshotRepoProvider(SourceRepo sourceRepo) {
return new SnapshotRepoProvider_ES_7_10(sourceRepo);
}

@Override
public IndexMetadata.Factory getIndexMetadataFactory(SnapshotRepo.Provider repoDataProvider) {
return new IndexMetadataFactory_ES_7_10(repoDataProvider);
}

@Override
public ShardMetadata.Factory getShardMetadataFactory(SnapshotRepo.Provider repoDataProvider) {
return new ShardMetadataFactory_ES_7_10(repoDataProvider);
}

@Override
public int getBufferSizeInBytes() {
return ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES;
}

@Override
public boolean getSoftDeletesPossible() {
return ElasticsearchConstants_ES_7_10.SOFT_DELETES_POSSIBLE;
}

@Override
public String getSoftDeletesFieldData() {
return ElasticsearchConstants_ES_7_10.SOFT_DELETES_FIELD;
}

}

0 comments on commit 19ba404

Please sign in to comment.