Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@
import org.opensearch.index.translog.RemoteFsTranslog;
import org.opensearch.index.translog.RemoteTranslogStats;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.Translog.Durability;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.index.translog.TranslogRecoveryRunner;
Expand Down Expand Up @@ -569,6 +570,32 @@ private long getInitialGlobalCheckpointForShard(IndexSettings indexSettings) {
return UNASSIGNED_SEQ_NO;
}

/**
* Initializes primary term routing for the store directory if supported.
* This method should be called after the IndexShard is fully constructed
* to enable primary term-based segment file routing.
*/
private void initializePrimaryTermRouting() {
logger.info("intializing primary term based routing");
try {
Directory storeDirectory = store.directory();
logger.info("intializing primary term based routing {}", storeDirectory);
// if (storeDirectory instanceof org.opensearch.index.store.distributed.PrimaryTermAwareDirectoryWrapper) {
org.opensearch.index.store.distributed.PrimaryTermAwareDirectoryWrapper wrapper =
(org.opensearch.index.store.distributed.PrimaryTermAwareDirectoryWrapper) ((FilterDirectory) ((FilterDirectory)storeDirectory).getDelegate()).getDelegate();
logger.info("intializing primary term based routing");

if (!wrapper.isPrimaryTermRoutingEnabled()) {
wrapper.enablePrimaryTermRouting(this);
logger.info("Enabled primary term routing for shard {}", shardId);
}
// }
} catch (Exception e) {
logger.warn("Failed to initialize primary term routing for shard {}", shardId, e);
// Don't fail shard creation if primary term routing setup fails
}
}

public ThreadPool getThreadPool() {
return this.threadPool;
}
Expand Down Expand Up @@ -2444,6 +2471,9 @@ public void postRecovery(String reason) throws IndexShardStartedException, Index
}
recoveryState.setStage(RecoveryState.Stage.DONE);
changeState(IndexShardState.POST_RECOVERY, reason);

// Initialize primary term routing after recovery is complete
initializePrimaryTermRouting();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.distributed.DistributedSegmentDirectory;
import org.opensearch.index.store.distributed.PrimaryTermAwareDirectoryWrapper;
import org.opensearch.plugins.IndexStorePlugin;

import java.io.IOException;
Expand Down Expand Up @@ -96,15 +98,29 @@ protected Directory newFSDirectory(Path location, LockFactory lockFactory, Index
Set<String> preLoadExtensions = new HashSet<>(indexSettings.getValue(IndexModule.INDEX_STORE_PRE_LOAD_SETTING));
switch (type) {
case HYBRIDFS:
// Use Lucene defaults
final FSDirectory primaryDirectory = FSDirectory.open(location, lockFactory);
final Set<String> nioExtensions = new HashSet<>(indexSettings.getValue(IndexModule.INDEX_STORE_HYBRID_NIO_EXTENSIONS));
if (primaryDirectory instanceof MMapDirectory) {
MMapDirectory mMapDirectory = (MMapDirectory) primaryDirectory;
return new HybridDirectory(lockFactory, setPreload(mMapDirectory, preLoadExtensions), nioExtensions);
} else {
return primaryDirectory;
}
// Create primary directory
final FSDirectory primaryDirectory = new NIOFSDirectory(location, lockFactory);

return new PrimaryTermAwareDirectoryWrapper(primaryDirectory, location);
// // Check if primary term routing should be enabled
// boolean enablePrimaryTermRouting = indexSettings.getSettings()
// .getAsBoolean("index.store.distributed_segment.enable_primary_term_routing", true);

// if (enablePrimaryTermRouting) {
// // Use wrapper that can be configured for primary term routing later
// return new PrimaryTermAwareDirectoryWrapper(primaryDirectory, location);
// } else {
// // Use legacy hash-based routing
// return new DistributedSegmentDirectory(primaryDirectory, location);
// }

// final Set<String> nioExtensions = new HashSet<>(indexSettings.getValue(IndexModule.INDEX_STORE_HYBRID_NIO_EXTENSIONS));
// if (primaryDirectory instanceof MMapDirectory) {
// MMapDirectory mMapDirectory = (MMapDirectory) primaryDirectory;
// return new HybridDirectory(lockFactory, setPreload(mMapDirectory, preLoadExtensions), nioExtensions);
// } else {
// return primaryDirectory;
// }
case MMAPFS:
return setPreload(new MMapDirectory(location, lockFactory), preLoadExtensions);
// simplefs was removed in Lucene 9; support for enum is maintained for bwc
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store.distributed;

import java.util.Set;

/**
* Default implementation of FilenameHasher that uses consistent hashing
* to distribute segment files across multiple directories while keeping
* critical files like segments_N in the base directory.
*
* @opensearch.internal
*/
public class DefaultFilenameHasher implements FilenameHasher {

private static final int NUM_DIRECTORIES = 5;
private static final Set<String> EXCLUDED_PREFIXES = Set.of("segments_", "pending_segments_", "write.lock");

/**
* Maps a filename to a directory index using consistent hashing.
* Files with excluded prefixes (like segments_N) are always mapped to index 0.
*
* @param filename the segment filename to hash
* @return directory index between 0 and 4 (inclusive)
* @throws IllegalArgumentException if filename is null or empty
*/
@Override
public int getDirectoryIndex(String filename) {
if (filename == null || filename.isEmpty()) {
throw new IllegalArgumentException("Filename cannot be null or empty");
}

if (isExcludedFile(filename)) {
return 0; // Base directory for excluded files
}

// Use consistent hashing with absolute value to ensure positive result
return Math.abs(filename.hashCode()) % NUM_DIRECTORIES;
}

/**
* Checks if a filename should be excluded from distribution.
* Currently excludes files starting with "segments_" prefix.
*
* @param filename the filename to check
* @return true if file should remain in base directory, false if it can be distributed
*/
@Override
public boolean isExcludedFile(String filename) {
if (filename == null || filename.isEmpty()) {
return true; // Treat invalid filenames as excluded
}

if (filename.endsWith(".tmp")) {
return true;
}

return EXCLUDED_PREFIXES.stream().anyMatch(filename::startsWith);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store.distributed;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FSLockFactory;
import org.apache.lucene.store.NIOFSDirectory;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;

/**
* Manages the creation and access to subdirectories for distributed segment storage.
* Creates up to 5 subdirectories for distributing segment files while keeping
* the base directory (index 0) for critical files like segments_N.
*
* @opensearch.internal
*/
public class DirectoryManager {

private static final int NUM_DIRECTORIES = 5;
private final Path baseDirectory;
private final Directory[] subdirectories;

/**
* Creates a new DirectoryManager with the specified base directory.
*
* @param baseDirectory the base Directory instance (used as subdirectory 0)
* @param basePath the base filesystem path for creating subdirectories
* @throws IOException if subdirectory creation fails
*/
public DirectoryManager(Directory baseDirectory, Path basePath) throws IOException {
this.baseDirectory = basePath;
this.subdirectories = createSubdirectories(baseDirectory, basePath);
}

/**
* Creates the array of subdirectories, with index 0 being the base directory
* and indices 1-4 being newly created subdirectories.
*
* @param base the base Directory instance
* @param basePath the base filesystem path
* @return array of Directory instances
* @throws IOException if subdirectory creation fails
*/
private Directory[] createSubdirectories(Directory base, Path basePath) throws IOException {
Directory[] dirs = new Directory[NUM_DIRECTORIES];
dirs[0] = base; // Base directory for segments_N and excluded files

try {
for (int i = 1; i < NUM_DIRECTORIES; i++) {
Path subPath = basePath.resolve("varun_segments_" + i);

// Create directory if it doesn't exist
if (!Files.exists(subPath)) {
Files.createDirectories(subPath);
}

// Validate directory is writable
if (!Files.isWritable(subPath)) {
throw new IOException("Subdirectory is not writable: " + subPath);
}

dirs[i] = new NIOFSDirectory(subPath, FSLockFactory.getDefault());
}
} catch (IOException e) {
// Clean up any successfully created directories
closeDirectories(dirs);
throw new DistributedDirectoryException(
"Failed to create subdirectories",
-1,
"createSubdirectories",
e
);
}

return dirs;
}

/**
* Gets the Directory instance for the specified index.
*
* @param index the directory index (0-4)
* @return the Directory instance
* @throws IllegalArgumentException if index is out of range
*/
public Directory getDirectory(int index) {
if (index < 0 || index >= NUM_DIRECTORIES) {
throw new IllegalArgumentException("Directory index must be between 0 and " + (NUM_DIRECTORIES - 1) +
", got: " + index);
}
return subdirectories[index];
}

/**
* Gets the number of managed directories.
*
* @return the number of directories (always 5)
*/
public int getNumDirectories() {
return NUM_DIRECTORIES;
}

/**
* Gets the base filesystem path.
*
* @return the base path
*/
public Path getBasePath() {
return baseDirectory;
}

/**
* Closes all managed directories except the base directory (index 0).
* The base directory should be closed by the caller since it was provided
* during construction.
*
* @throws IOException if any directory fails to close
*/
public void close() throws IOException {
closeDirectories(subdirectories);
}

/**
* Helper method to close directories and collect exceptions.
*
* @param dirs array of directories to close
* @throws IOException if any directory fails to close
*/
private void closeDirectories(Directory[] dirs) throws IOException {
IOException exception = null;

// Close subdirectories (skip index 0 as it's the base directory managed externally)
for (int i = 1; i < dirs.length && dirs[i] != null; i++) {
try {
dirs[i].close();
} catch (IOException e) {
if (exception == null) {
exception = new DistributedDirectoryException(
"Failed to close subdirectory",
i,
"close",
e
);
} else {
exception.addSuppressed(e);
}
}
}

if (exception != null) {
throw exception;
}
}
}
Loading
Loading