Skip to content

Commit

Permalink
bootstrap code
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitkala committed Feb 15, 2024
1 parent c8ae7f0 commit 72ebe06
Show file tree
Hide file tree
Showing 14 changed files with 309 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.replication.common.ReplicationType;

import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_TYPE_WARM_ENABLED;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class WarmIndexIT extends RemoteStoreBaseIntegTestCase {
public void testWarmIndexCreate() throws Exception {
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.INDEX_TYPE_WARM_ENABLED, true)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build();
assertAcked(client().admin().indices().prepareCreate("test-idx-1").setSettings(settings).get());
GetIndexResponse getIndexResponse = client().admin()
.indices()
.getIndex(new GetIndexRequest().indices("test-idx-1").includeDefaults(true))
.get();
Settings indexSettings = getIndexResponse.settings().get("test-idx-1");
assertEquals(ReplicationType.SEGMENT.toString(), indexSettings.get(SETTING_REPLICATION_TYPE));
assertEquals("true", indexSettings.get(SETTING_REMOTE_STORE_ENABLED));
assertEquals(REPOSITORY_NAME, indexSettings.get(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY));
assertEquals(REPOSITORY_2_NAME, indexSettings.get(SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY));
assertEquals("true", indexSettings.get(INDEX_TYPE_WARM_ENABLED));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,10 @@ public Iterator<Setting<?>> settings() {
Property.IndexScope
);

public static final String INDEX_TYPE_WARM_ENABLED = "index.type.warm.enabled";
public static final Setting<Boolean> WARM_TYPE_SETTING = Setting.boolSetting(INDEX_TYPE_WARM_ENABLED, false,
Property.IndexScope, Property.Final);

/**
* Used to specify the replication type for the index. By default, document replication is used.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetadata.INDEX_FORMAT_SETTING,
IndexMetadata.INDEX_HIDDEN_SETTING,
IndexMetadata.INDEX_REPLICATION_TYPE_SETTING,
IndexMetadata.WARM_TYPE_SETTING,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG_SETTING,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_WARN_SETTING,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_INFO_SETTING,
Expand Down
27 changes: 20 additions & 7 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.remote.directory.CompositeBlockDirectoryFactory;
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.translog.TranslogFactory;
Expand Down Expand Up @@ -505,7 +506,8 @@ public enum Type {
MMAPFS("mmapfs"),
SIMPLEFS("simplefs"),
FS("fs"),
REMOTE_SNAPSHOT("remote_snapshot");
REMOTE_SNAPSHOT("remote_snapshot"),
TIERED_WARM("tiered_warm");

private final String settingsKey;
private final boolean deprecated;
Expand Down Expand Up @@ -611,7 +613,7 @@ public IndexService newIndexService(
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
.get() == null ? (shard) -> null : indexReaderWrapper.get();
eventListener.beforeIndexCreated(indexSettings.getIndex(), indexSettings.getSettings());
final IndexStorePlugin.DirectoryFactory directoryFactory = getDirectoryFactory(indexSettings, directoryFactories);
final IndexStorePlugin.DirectoryFactory directoryFactory = getDirectoryFactory(indexSettings, directoryFactories, remoteDirectoryFactory);
final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory = getRecoveryStateFactory(indexSettings, recoveryStateFactories);
QueryCache queryCache = null;
IndexAnalyzers indexAnalyzers = null;
Expand Down Expand Up @@ -677,9 +679,16 @@ public IndexService newIndexService(

private static IndexStorePlugin.DirectoryFactory getDirectoryFactory(
final IndexSettings indexSettings,
final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories
) {
final String storeType = indexSettings.getValue(INDEX_STORE_TYPE_SETTING);
final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory) {
final String storeType;
//TODO: Move from setting to store maybe.
if (indexSettings.isWarmIndex()) {
storeType = Type.TIERED_WARM.settingsKey;
} else {
storeType = indexSettings.getValue(INDEX_STORE_TYPE_SETTING);
}

final Type type;
final Boolean allowMmap = NODE_STORE_ALLOW_MMAP.get(indexSettings.getNodeSettings());
if (storeType.isEmpty() || Type.FS.getSettingsKey().equals(storeType)) {
Expand Down Expand Up @@ -769,8 +778,8 @@ private void ensureNotFrozen() {
public static Map<String, IndexStorePlugin.DirectoryFactory> createBuiltInDirectoryFactories(
Supplier<RepositoriesService> repositoriesService,
ThreadPool threadPool,
FileCache remoteStoreFileCache
) {
FileCache remoteStoreFileCache,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory) {
final Map<String, IndexStorePlugin.DirectoryFactory> factories = new HashMap<>();
for (Type type : Type.values()) {
switch (type) {
Expand All @@ -787,6 +796,10 @@ public static Map<String, IndexStorePlugin.DirectoryFactory> createBuiltInDirect
new RemoteSnapshotDirectoryFactory(repositoriesService, threadPool, remoteStoreFileCache)
);
break;
case TIERED_WARM:
// TODO: Add composite directory factory here.
factories.put(type.getSettingsKey(), new CompositeBlockDirectoryFactory(repositoriesService, threadPool, remoteStoreFileCache, DEFAULT_DIRECTORY_FACTORY, remoteDirectoryFactory));
break;
default:
throw new IllegalStateException("No directory factory mapping for built-in type " + type);
}
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.remote.RemoteStoreInterface;
import org.opensearch.index.store.remote.directory.CompositeBlockDirectory;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.cluster.IndicesClusterStateService;
Expand Down Expand Up @@ -486,12 +488,21 @@ public synchronized IndexShard createShard(
};

Store remoteStore = null;

if (this.indexSettings.isRemoteStoreEnabled()) {
Directory remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path);
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY, path);
}

Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
if (directory instanceof CompositeBlockDirectory) {
((CompositeBlockDirectory) directory).setRemote(new RemoteStoreInterface() {
@Override
public String toString() {
return super.toString();
}
});
}
store = new Store(
shardId,
this.indexSettings,
Expand All @@ -500,6 +511,7 @@ public synchronized IndexShard createShard(
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)),
path
);

eventListener.onStoreCreated(shardId);
indexShard = new IndexShard(
routing,
Expand Down
8 changes: 7 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,8 @@ public static IndexMergePolicy fromString(String text) {
private final String remoteStoreRepository;
private final boolean isRemoteSnapshot;
private int remoteTranslogKeepExtraGen;

private boolean isWarmIndex;
private Version extendedCompatibilitySnapshotVersion;

// volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock
Expand Down Expand Up @@ -867,7 +869,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
} else {
extendedCompatibilitySnapshotVersion = Version.CURRENT.minimumIndexCompatibilityVersion();
}

this.isWarmIndex = isRemoteStoreEnabled && settings.getAsBoolean(IndexMetadata.INDEX_TYPE_WARM_ENABLED, false);
this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
this.shouldCleanupUnreferencedFiles = INDEX_UNREFERENCED_FILE_CLEANUP.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
Expand Down Expand Up @@ -1176,6 +1178,10 @@ public boolean isRemoteStoreEnabled() {
return isRemoteStoreEnabled;
}

public boolean isWarmIndex() {
return isWarmIndex;
}

/**
* Returns if remote translog store is enabled for this index.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store.remote;

import java.util.Set;

public interface RemoteStoreInterface {
Set<String> getTrackedFiles();
FileInfo getFileInfo(String name);

void delete(String name);

interface FileInfo {
long length();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store.remote.directory;

import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.opensearch.index.store.remote.RemoteStoreInterface;
import org.opensearch.index.store.remote.filecache.FileCache;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;

public class CompositeBlockDirectory extends Directory {
private static String BLOCK_EXTENSION = "._block";

private FilterDirectory baseDirectory;
private final FSDirectory fsDirectory;
private RemoteStoreInterface remote;

private FileCache cache;
boolean isOpen;
public CompositeBlockDirectory(FilterDirectory baseDirectory) {
this.baseDirectory = baseDirectory;
this.fsDirectory = (FSDirectory) FilterDirectory.unwrap(baseDirectory);
isOpen = false;

}
public void setRemote(RemoteStoreInterface remoteStore) {
remote = remoteStore;
isOpen = false;
}

@Override
protected void ensureOpen() throws AlreadyClosedException {
// check for isOpen
}
private boolean isBlockFile(String file) {
return file.contains(BLOCK_EXTENSION);
}

@Override
public String[] listAll() throws IOException {
Set<String> allFiles = Arrays.asList(baseDirectory.listAll()).stream().filter(file -> isBlockFile(file) == false).collect(Collectors.toSet());
allFiles.addAll(remote.getTrackedFiles());

String[] files = new String[allFiles.size()];
allFiles.toArray(files);
Arrays.sort(files);

return files;
}

@Override
public void deleteFile(String name) throws IOException {
// TODO: Add support for pending deletions.
// TODO: Add support for deleting block files? needed?

if(remote.getTrackedFiles().contains(name)) {
remote.delete(name);
}
// assuming its tracked in cache.
if (!cache.remove(fsDirectory.getDirectory().resolve(name))) {
baseDirectory.deleteFile(name);
}
}

@Override
public long fileLength(String name) throws IOException {
if(remote.getTrackedFiles().contains(name)) {
return remote.getFileInfo(name).length();
} else {
return baseDirectory.fileLength(name);
}
}

@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
return baseDirectory.createOutput(name, context);
}

@Override
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
return baseDirectory.createTempOutput(prefix, suffix, context);
}

@Override
public void sync(Collection<String> names) throws IOException {
// TODO: what sync means for composite directory.
Set<String> remoteFiles = remote.getTrackedFiles();
baseDirectory.sync(names.stream().filter(remoteFiles::contains).collect(Collectors.toSet()));
}

@Override
public void syncMetaData() throws IOException {
// TODO: what sync metadata means for composite directory.
}

@Override
public void rename(String source, String dest) throws IOException {

}

@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
// TODO:
return baseDirectory.openInput(name, context);
}

@Override
public Lock obtainLock(String name) throws IOException {
// TODO: Lock implications for composite dir,
return null;
}

@Override
public void close() throws IOException {

}

@Override
public Set<String> getPendingDeletions() throws IOException {
return null; // pending deletions integration pending.
}
}
Loading

0 comments on commit 72ebe06

Please sign in to comment.