diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 609a6290d36ce..d7e67f17a0d90 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -184,6 +184,7 @@ import org.opensearch.index.translog.RemoteFsTranslog; import org.opensearch.index.translog.RemoteTranslogStats; import org.opensearch.index.translog.Translog; +import org.opensearch.index.translog.Translog.Durability; import org.opensearch.index.translog.TranslogConfig; import org.opensearch.index.translog.TranslogFactory; import org.opensearch.index.translog.TranslogRecoveryRunner; @@ -569,6 +570,32 @@ private long getInitialGlobalCheckpointForShard(IndexSettings indexSettings) { return UNASSIGNED_SEQ_NO; } + /** + * Initializes primary term routing for the store directory if supported. + * This method should be called after the IndexShard is fully constructed + * to enable primary term-based segment file routing. + */ + private void initializePrimaryTermRouting() { + logger.info("intializing primary term based routing"); + try { + Directory storeDirectory = store.directory(); + logger.info("intializing primary term based routing {}", storeDirectory); + // if (storeDirectory instanceof org.opensearch.index.store.distributed.PrimaryTermAwareDirectoryWrapper) { + org.opensearch.index.store.distributed.PrimaryTermAwareDirectoryWrapper wrapper = + (org.opensearch.index.store.distributed.PrimaryTermAwareDirectoryWrapper) ((FilterDirectory) ((FilterDirectory)storeDirectory).getDelegate()).getDelegate(); + logger.info("intializing primary term based routing"); + + if (!wrapper.isPrimaryTermRoutingEnabled()) { + wrapper.enablePrimaryTermRouting(this); + logger.info("Enabled primary term routing for shard {}", shardId); + } + // } + } catch (Exception e) { + logger.warn("Failed to initialize primary term routing for shard {}", shardId, e); + // Don't fail shard creation if primary term routing setup fails + } + } + public ThreadPool getThreadPool() { return this.threadPool; } @@ -2444,6 +2471,9 @@ public void postRecovery(String reason) throws IndexShardStartedException, Index } recoveryState.setStage(RecoveryState.Stage.DONE); changeState(IndexShardState.POST_RECOVERY, reason); + + // Initialize primary term routing after recovery is complete + initializePrimaryTermRouting(); } } } diff --git a/server/src/main/java/org/opensearch/index/store/FsDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/FsDirectoryFactory.java index db1cc9e843e73..3839712f712fe 100644 --- a/server/src/main/java/org/opensearch/index/store/FsDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/FsDirectoryFactory.java @@ -49,6 +49,8 @@ import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; +import org.opensearch.index.store.distributed.DistributedSegmentDirectory; +import org.opensearch.index.store.distributed.PrimaryTermAwareDirectoryWrapper; import org.opensearch.plugins.IndexStorePlugin; import java.io.IOException; @@ -96,15 +98,29 @@ protected Directory newFSDirectory(Path location, LockFactory lockFactory, Index Set preLoadExtensions = new HashSet<>(indexSettings.getValue(IndexModule.INDEX_STORE_PRE_LOAD_SETTING)); switch (type) { case HYBRIDFS: - // Use Lucene defaults - final FSDirectory primaryDirectory = FSDirectory.open(location, lockFactory); - final Set nioExtensions = new HashSet<>(indexSettings.getValue(IndexModule.INDEX_STORE_HYBRID_NIO_EXTENSIONS)); - if (primaryDirectory instanceof MMapDirectory) { - MMapDirectory mMapDirectory = (MMapDirectory) primaryDirectory; - return new HybridDirectory(lockFactory, setPreload(mMapDirectory, preLoadExtensions), nioExtensions); - } else { - return primaryDirectory; - } + // Create primary directory + final FSDirectory primaryDirectory = new NIOFSDirectory(location, lockFactory); + + return new PrimaryTermAwareDirectoryWrapper(primaryDirectory, location); + // // Check if primary term routing should be enabled + // boolean enablePrimaryTermRouting = indexSettings.getSettings() + // .getAsBoolean("index.store.distributed_segment.enable_primary_term_routing", true); + + // if (enablePrimaryTermRouting) { + // // Use wrapper that can be configured for primary term routing later + // return new PrimaryTermAwareDirectoryWrapper(primaryDirectory, location); + // } else { + // // Use legacy hash-based routing + // return new DistributedSegmentDirectory(primaryDirectory, location); + // } + + // final Set nioExtensions = new HashSet<>(indexSettings.getValue(IndexModule.INDEX_STORE_HYBRID_NIO_EXTENSIONS)); + // if (primaryDirectory instanceof MMapDirectory) { + // MMapDirectory mMapDirectory = (MMapDirectory) primaryDirectory; + // return new HybridDirectory(lockFactory, setPreload(mMapDirectory, preLoadExtensions), nioExtensions); + // } else { + // return primaryDirectory; + // } case MMAPFS: return setPreload(new MMapDirectory(location, lockFactory), preLoadExtensions); // simplefs was removed in Lucene 9; support for enum is maintained for bwc diff --git a/server/src/main/java/org/opensearch/index/store/distributed/DefaultFilenameHasher.java b/server/src/main/java/org/opensearch/index/store/distributed/DefaultFilenameHasher.java new file mode 100644 index 0000000000000..0210cc5dc5d49 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/distributed/DefaultFilenameHasher.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.distributed; + +import java.util.Set; + +/** + * Default implementation of FilenameHasher that uses consistent hashing + * to distribute segment files across multiple directories while keeping + * critical files like segments_N in the base directory. + * + * @opensearch.internal + */ +public class DefaultFilenameHasher implements FilenameHasher { + + private static final int NUM_DIRECTORIES = 5; + private static final Set EXCLUDED_PREFIXES = Set.of("segments_", "pending_segments_", "write.lock"); + + /** + * Maps a filename to a directory index using consistent hashing. + * Files with excluded prefixes (like segments_N) are always mapped to index 0. + * + * @param filename the segment filename to hash + * @return directory index between 0 and 4 (inclusive) + * @throws IllegalArgumentException if filename is null or empty + */ + @Override + public int getDirectoryIndex(String filename) { + if (filename == null || filename.isEmpty()) { + throw new IllegalArgumentException("Filename cannot be null or empty"); + } + + if (isExcludedFile(filename)) { + return 0; // Base directory for excluded files + } + + // Use consistent hashing with absolute value to ensure positive result + return Math.abs(filename.hashCode()) % NUM_DIRECTORIES; + } + + /** + * Checks if a filename should be excluded from distribution. + * Currently excludes files starting with "segments_" prefix. + * + * @param filename the filename to check + * @return true if file should remain in base directory, false if it can be distributed + */ + @Override + public boolean isExcludedFile(String filename) { + if (filename == null || filename.isEmpty()) { + return true; // Treat invalid filenames as excluded + } + + if (filename.endsWith(".tmp")) { + return true; + } + + return EXCLUDED_PREFIXES.stream().anyMatch(filename::startsWith); + } +} \ No newline at end of file diff --git a/server/src/main/java/org/opensearch/index/store/distributed/DirectoryManager.java b/server/src/main/java/org/opensearch/index/store/distributed/DirectoryManager.java new file mode 100644 index 0000000000000..f723df035123c --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/distributed/DirectoryManager.java @@ -0,0 +1,163 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.distributed; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.FSLockFactory; +import org.apache.lucene.store.NIOFSDirectory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +/** + * Manages the creation and access to subdirectories for distributed segment storage. + * Creates up to 5 subdirectories for distributing segment files while keeping + * the base directory (index 0) for critical files like segments_N. + * + * @opensearch.internal + */ +public class DirectoryManager { + + private static final int NUM_DIRECTORIES = 5; + private final Path baseDirectory; + private final Directory[] subdirectories; + + /** + * Creates a new DirectoryManager with the specified base directory. + * + * @param baseDirectory the base Directory instance (used as subdirectory 0) + * @param basePath the base filesystem path for creating subdirectories + * @throws IOException if subdirectory creation fails + */ + public DirectoryManager(Directory baseDirectory, Path basePath) throws IOException { + this.baseDirectory = basePath; + this.subdirectories = createSubdirectories(baseDirectory, basePath); + } + + /** + * Creates the array of subdirectories, with index 0 being the base directory + * and indices 1-4 being newly created subdirectories. + * + * @param base the base Directory instance + * @param basePath the base filesystem path + * @return array of Directory instances + * @throws IOException if subdirectory creation fails + */ + private Directory[] createSubdirectories(Directory base, Path basePath) throws IOException { + Directory[] dirs = new Directory[NUM_DIRECTORIES]; + dirs[0] = base; // Base directory for segments_N and excluded files + + try { + for (int i = 1; i < NUM_DIRECTORIES; i++) { + Path subPath = basePath.resolve("varun_segments_" + i); + + // Create directory if it doesn't exist + if (!Files.exists(subPath)) { + Files.createDirectories(subPath); + } + + // Validate directory is writable + if (!Files.isWritable(subPath)) { + throw new IOException("Subdirectory is not writable: " + subPath); + } + + dirs[i] = new NIOFSDirectory(subPath, FSLockFactory.getDefault()); + } + } catch (IOException e) { + // Clean up any successfully created directories + closeDirectories(dirs); + throw new DistributedDirectoryException( + "Failed to create subdirectories", + -1, + "createSubdirectories", + e + ); + } + + return dirs; + } + + /** + * Gets the Directory instance for the specified index. + * + * @param index the directory index (0-4) + * @return the Directory instance + * @throws IllegalArgumentException if index is out of range + */ + public Directory getDirectory(int index) { + if (index < 0 || index >= NUM_DIRECTORIES) { + throw new IllegalArgumentException("Directory index must be between 0 and " + (NUM_DIRECTORIES - 1) + + ", got: " + index); + } + return subdirectories[index]; + } + + /** + * Gets the number of managed directories. + * + * @return the number of directories (always 5) + */ + public int getNumDirectories() { + return NUM_DIRECTORIES; + } + + /** + * Gets the base filesystem path. + * + * @return the base path + */ + public Path getBasePath() { + return baseDirectory; + } + + /** + * Closes all managed directories except the base directory (index 0). + * The base directory should be closed by the caller since it was provided + * during construction. + * + * @throws IOException if any directory fails to close + */ + public void close() throws IOException { + closeDirectories(subdirectories); + } + + /** + * Helper method to close directories and collect exceptions. + * + * @param dirs array of directories to close + * @throws IOException if any directory fails to close + */ + private void closeDirectories(Directory[] dirs) throws IOException { + IOException exception = null; + + // Close subdirectories (skip index 0 as it's the base directory managed externally) + for (int i = 1; i < dirs.length && dirs[i] != null; i++) { + try { + dirs[i].close(); + } catch (IOException e) { + if (exception == null) { + exception = new DistributedDirectoryException( + "Failed to close subdirectory", + i, + "close", + e + ); + } else { + exception.addSuppressed(e); + } + } + } + + if (exception != null) { + throw exception; + } + } +} \ No newline at end of file diff --git a/server/src/main/java/org/opensearch/index/store/distributed/DistributedDirectoryException.java b/server/src/main/java/org/opensearch/index/store/distributed/DistributedDirectoryException.java new file mode 100644 index 0000000000000..a8d49a2fee2b8 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/distributed/DistributedDirectoryException.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.distributed; + +import java.io.IOException; + +/** + * Exception thrown when operations on a distributed segment directory fail. + * This exception provides additional context about which directory index + * and operation caused the failure to aid in debugging and monitoring. + * + * @opensearch.internal + */ +public class DistributedDirectoryException extends IOException { + + private final int directoryIndex; + private final String operation; + + /** + * Creates a new DistributedDirectoryException with directory context. + * + * @param message the error message + * @param directoryIndex the index of the directory where the error occurred (0-4) + * @param operation the operation that was being performed when the error occurred + * @param cause the underlying cause of the exception + */ + public DistributedDirectoryException(String message, int directoryIndex, String operation, Throwable cause) { + super(String.format("Directory %d operation '%s' failed: %s", directoryIndex, operation, message), cause); + this.directoryIndex = directoryIndex; + this.operation = operation; + } + + /** + * Creates a new DistributedDirectoryException with directory context. + * + * @param message the error message + * @param directoryIndex the index of the directory where the error occurred (0-4) + * @param operation the operation that was being performed when the error occurred + */ + public DistributedDirectoryException(String message, int directoryIndex, String operation) { + this(message, directoryIndex, operation, null); + } + + /** + * Gets the directory index where the error occurred. + * + * @return the directory index (0-4) + */ + public int getDirectoryIndex() { + return directoryIndex; + } + + /** + * Gets the operation that was being performed when the error occurred. + * + * @return the operation name + */ + public String getOperation() { + return operation; + } +} \ No newline at end of file diff --git a/server/src/main/java/org/opensearch/index/store/distributed/DistributedDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/distributed/DistributedDirectoryFactory.java new file mode 100644 index 0000000000000..8bfa25fa48216 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/distributed/DistributedDirectoryFactory.java @@ -0,0 +1,152 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.distributed; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.Directory; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.shard.ShardId; + +import java.io.IOException; +import java.nio.file.Path; + +/** + * Factory for creating DistributedSegmentDirectory instances with configuration support. + * Provides integration with OpenSearch settings and can be used to enable/disable + * distributed storage based on configuration. + * + * @opensearch.internal + */ +public class DistributedDirectoryFactory { + + private static final Logger logger = LogManager.getLogger(DistributedDirectoryFactory.class); + + // Configuration keys + public static final String DISTRIBUTED_ENABLED_SETTING = "index.store.distributed.enabled"; + public static final String DISTRIBUTED_SUBDIRECTORIES_SETTING = "index.store.distributed.subdirectories"; + public static final String DISTRIBUTED_HASH_ALGORITHM_SETTING = "index.store.distributed.hash_algorithm"; + + // Default values + public static final boolean DEFAULT_DISTRIBUTED_ENABLED = true; + public static final int DEFAULT_SUBDIRECTORIES = 5; + public static final String DEFAULT_HASH_ALGORITHM = "default"; + + private final Settings settings; + + /** + * Creates a new DistributedDirectoryFactory with the given settings. + * + * @param settings the OpenSearch settings + */ + public DistributedDirectoryFactory(Settings settings) { + this.settings = settings; + } + + /** + * Creates a Directory instance, either distributed or the original delegate + * based on configuration settings. + * + * @param delegate the base Directory instance + * @param basePath the base filesystem path + * @param shardId the shard identifier (for logging) + * @return Directory instance (distributed or original delegate) + * @throws IOException if directory creation fails + */ + public Directory createDirectory(Directory delegate, Path basePath, ShardId shardId) throws IOException { + boolean distributedEnabled = settings.getAsBoolean(DISTRIBUTED_ENABLED_SETTING, DEFAULT_DISTRIBUTED_ENABLED); + + if (!distributedEnabled) { + logger.debug("Distributed storage disabled for shard {}, using delegate directory", shardId); + return delegate; + } + + try { + FilenameHasher hasher = createHasher(); + DistributedSegmentDirectory distributedDirectory = new DistributedSegmentDirectory( + delegate, basePath, hasher + ); + + logger.info("Created distributed segment directory for shard {} at path: {}", shardId, basePath); + return distributedDirectory; + + } catch (IOException e) { + logger.error("Failed to create distributed directory for shard {}, falling back to delegate: {}", + shardId, e.getMessage()); + // Fall back to original directory if distributed creation fails + return delegate; + } + } + + /** + * Creates a Directory instance with default settings (distributed disabled). + * + * @param delegate the base Directory instance + * @param basePath the base filesystem path + * @return Directory instance (usually the original delegate) + * @throws IOException if directory creation fails + */ + public Directory createDirectory(Directory delegate, Path basePath) throws IOException { + return createDirectory(delegate, basePath, null); + } + + /** + * Creates a FilenameHasher based on configuration settings. + * + * @return FilenameHasher instance + */ + private FilenameHasher createHasher() { + String hashAlgorithm = settings.get(DISTRIBUTED_HASH_ALGORITHM_SETTING, DEFAULT_HASH_ALGORITHM); + + switch (hashAlgorithm.toLowerCase()) { + case "default": + return new DefaultFilenameHasher(); + default: + logger.warn("Unknown hash algorithm '{}', using default", hashAlgorithm); + return new DefaultFilenameHasher(); + } + } + + /** + * Checks if distributed storage is enabled in the settings. + * + * @return true if distributed storage is enabled + */ + public boolean isDistributedEnabled() { + return settings.getAsBoolean(DISTRIBUTED_ENABLED_SETTING, DEFAULT_DISTRIBUTED_ENABLED); + } + + /** + * Gets the configured number of subdirectories. + * + * @return number of subdirectories + */ + public int getNumSubdirectories() { + return settings.getAsInt(DISTRIBUTED_SUBDIRECTORIES_SETTING, DEFAULT_SUBDIRECTORIES); + } + + /** + * Gets the configured hash algorithm. + * + * @return hash algorithm name + */ + public String getHashAlgorithm() { + return settings.get(DISTRIBUTED_HASH_ALGORITHM_SETTING, DEFAULT_HASH_ALGORITHM); + } + + /** + * Creates a new factory instance with updated settings. + * + * @param newSettings the new settings + * @return new factory instance + */ + public DistributedDirectoryFactory withSettings(Settings newSettings) { + return new DistributedDirectoryFactory(newSettings); + } +} \ No newline at end of file diff --git a/server/src/main/java/org/opensearch/index/store/distributed/DistributedDirectoryMetrics.java b/server/src/main/java/org/opensearch/index/store/distributed/DistributedDirectoryMetrics.java new file mode 100644 index 0000000000000..d878c2f463a3b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/distributed/DistributedDirectoryMetrics.java @@ -0,0 +1,321 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.distributed; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; + +/** + * Metrics collection for DistributedSegmentDirectory operations. + * Tracks file operations, distribution patterns, and performance metrics + * across all subdirectories. + * + * @opensearch.internal + */ +public class DistributedDirectoryMetrics { + + private static final Logger logger = LogManager.getLogger(DistributedDirectoryMetrics.class); + private static final int NUM_DIRECTORIES = 5; + + // Operation counters per directory + private final LongAdder[] fileOperationsByDirectory; + private final LongAdder[] openInputOperations; + private final LongAdder[] createOutputOperations; + private final LongAdder[] deleteFileOperations; + private final LongAdder[] fileLengthOperations; + + // Timing metrics per directory (in nanoseconds) + private final LongAdder[] totalOperationTimeByDirectory; + private final AtomicLong[] maxOperationTimeByDirectory; + private final AtomicLong[] minOperationTimeByDirectory; + + // Error counters + private final LongAdder[] errorsByDirectory; + private final LongAdder totalErrors; + + // Distribution metrics + private final LongAdder totalOperations; + private final AtomicLong startTime; + + /** + * Creates a new DistributedDirectoryMetrics instance. + */ + public DistributedDirectoryMetrics() { + this.fileOperationsByDirectory = new LongAdder[NUM_DIRECTORIES]; + this.openInputOperations = new LongAdder[NUM_DIRECTORIES]; + this.createOutputOperations = new LongAdder[NUM_DIRECTORIES]; + this.deleteFileOperations = new LongAdder[NUM_DIRECTORIES]; + this.fileLengthOperations = new LongAdder[NUM_DIRECTORIES]; + + this.totalOperationTimeByDirectory = new LongAdder[NUM_DIRECTORIES]; + this.maxOperationTimeByDirectory = new AtomicLong[NUM_DIRECTORIES]; + this.minOperationTimeByDirectory = new AtomicLong[NUM_DIRECTORIES]; + + this.errorsByDirectory = new LongAdder[NUM_DIRECTORIES]; + this.totalErrors = new LongAdder(); + this.totalOperations = new LongAdder(); + this.startTime = new AtomicLong(System.currentTimeMillis()); + + // Initialize arrays + for (int i = 0; i < NUM_DIRECTORIES; i++) { + this.fileOperationsByDirectory[i] = new LongAdder(); + this.openInputOperations[i] = new LongAdder(); + this.createOutputOperations[i] = new LongAdder(); + this.deleteFileOperations[i] = new LongAdder(); + this.fileLengthOperations[i] = new LongAdder(); + + this.totalOperationTimeByDirectory[i] = new LongAdder(); + this.maxOperationTimeByDirectory[i] = new AtomicLong(0); + this.minOperationTimeByDirectory[i] = new AtomicLong(Long.MAX_VALUE); + + this.errorsByDirectory[i] = new LongAdder(); + } + } + + /** + * Records a file operation with timing information. + * + * @param directoryIndex the directory index (0-4) + * @param operation the operation type + * @param durationNanos the operation duration in nanoseconds + */ + public void recordFileOperation(int directoryIndex, String operation, long durationNanos) { + if (directoryIndex < 0 || directoryIndex >= NUM_DIRECTORIES) { + logger.warn("Invalid directory index: {}", directoryIndex); + return; + } + + // Update counters + fileOperationsByDirectory[directoryIndex].increment(); + totalOperations.increment(); + + // Update operation-specific counters + switch (operation.toLowerCase()) { + case "openinput": + openInputOperations[directoryIndex].increment(); + break; + case "createoutput": + createOutputOperations[directoryIndex].increment(); + break; + case "deletefile": + deleteFileOperations[directoryIndex].increment(); + break; + case "filelength": + fileLengthOperations[directoryIndex].increment(); + break; + } + + // Update timing metrics + totalOperationTimeByDirectory[directoryIndex].add(durationNanos); + + // Update min/max times + long currentMax = maxOperationTimeByDirectory[directoryIndex].get(); + if (durationNanos > currentMax) { + maxOperationTimeByDirectory[directoryIndex].compareAndSet(currentMax, durationNanos); + } + + long currentMin = minOperationTimeByDirectory[directoryIndex].get(); + if (durationNanos < currentMin) { + minOperationTimeByDirectory[directoryIndex].compareAndSet(currentMin, durationNanos); + } + + logger.debug("Recorded {} operation in directory {} took {}ns", operation, directoryIndex, durationNanos); + } + + /** + * Records an error for a specific directory. + * + * @param directoryIndex the directory index (0-4) + * @param operation the operation that failed + */ + public void recordError(int directoryIndex, String operation) { + if (directoryIndex < 0 || directoryIndex >= NUM_DIRECTORIES) { + logger.warn("Invalid directory index for error: {}", directoryIndex); + return; + } + + errorsByDirectory[directoryIndex].increment(); + totalErrors.increment(); + + logger.debug("Recorded error for {} operation in directory {}", operation, directoryIndex); + } + + /** + * Gets the total number of operations for a specific directory. + * + * @param directoryIndex the directory index (0-4) + * @return the operation count + */ + public long getOperationCount(int directoryIndex) { + if (directoryIndex < 0 || directoryIndex >= NUM_DIRECTORIES) { + return 0; + } + return fileOperationsByDirectory[directoryIndex].sum(); + } + + /** + * Gets the total number of operations across all directories. + * + * @return the total operation count + */ + public long getTotalOperations() { + return totalOperations.sum(); + } + + /** + * Gets the error count for a specific directory. + * + * @param directoryIndex the directory index (0-4) + * @return the error count + */ + public long getErrorCount(int directoryIndex) { + if (directoryIndex < 0 || directoryIndex >= NUM_DIRECTORIES) { + return 0; + } + return errorsByDirectory[directoryIndex].sum(); + } + + /** + * Gets the total error count across all directories. + * + * @return the total error count + */ + public long getTotalErrors() { + return totalErrors.sum(); + } + + /** + * Gets the average operation time for a specific directory in nanoseconds. + * + * @param directoryIndex the directory index (0-4) + * @return the average operation time in nanoseconds, or 0 if no operations + */ + public long getAverageOperationTime(int directoryIndex) { + if (directoryIndex < 0 || directoryIndex >= NUM_DIRECTORIES) { + return 0; + } + + long totalTime = totalOperationTimeByDirectory[directoryIndex].sum(); + long operationCount = fileOperationsByDirectory[directoryIndex].sum(); + + return operationCount > 0 ? totalTime / operationCount : 0; + } + + /** + * Gets the maximum operation time for a specific directory in nanoseconds. + * + * @param directoryIndex the directory index (0-4) + * @return the maximum operation time in nanoseconds + */ + public long getMaxOperationTime(int directoryIndex) { + if (directoryIndex < 0 || directoryIndex >= NUM_DIRECTORIES) { + return 0; + } + + long max = maxOperationTimeByDirectory[directoryIndex].get(); + return max == 0 ? 0 : max; + } + + /** + * Gets the minimum operation time for a specific directory in nanoseconds. + * + * @param directoryIndex the directory index (0-4) + * @return the minimum operation time in nanoseconds + */ + public long getMinOperationTime(int directoryIndex) { + if (directoryIndex < 0 || directoryIndex >= NUM_DIRECTORIES) { + return 0; + } + + long min = minOperationTimeByDirectory[directoryIndex].get(); + return min == Long.MAX_VALUE ? 0 : min; + } + + /** + * Gets the distribution of operations across directories as percentages. + * + * @return array of percentages (0-100) for each directory + */ + public double[] getDistributionPercentages() { + double[] percentages = new double[NUM_DIRECTORIES]; + long total = getTotalOperations(); + + if (total == 0) { + return percentages; // All zeros + } + + for (int i = 0; i < NUM_DIRECTORIES; i++) { + percentages[i] = (getOperationCount(i) * 100.0) / total; + } + + return percentages; + } + + /** + * Gets the uptime in milliseconds since metrics collection started. + * + * @return uptime in milliseconds + */ + public long getUptimeMillis() { + return System.currentTimeMillis() - startTime.get(); + } + + /** + * Resets all metrics to zero. + */ + public void reset() { + for (int i = 0; i < NUM_DIRECTORIES; i++) { + fileOperationsByDirectory[i].reset(); + openInputOperations[i].reset(); + createOutputOperations[i].reset(); + deleteFileOperations[i].reset(); + fileLengthOperations[i].reset(); + + totalOperationTimeByDirectory[i].reset(); + maxOperationTimeByDirectory[i].set(0); + minOperationTimeByDirectory[i].set(Long.MAX_VALUE); + + errorsByDirectory[i].reset(); + } + + totalErrors.reset(); + totalOperations.reset(); + startTime.set(System.currentTimeMillis()); + + logger.info("Reset all distributed directory metrics"); + } + + /** + * Returns a summary string of current metrics. + * + * @return metrics summary + */ + public String getSummary() { + StringBuilder sb = new StringBuilder(); + sb.append("DistributedDirectory Metrics Summary:\n"); + sb.append("Total Operations: ").append(getTotalOperations()).append("\n"); + sb.append("Total Errors: ").append(getTotalErrors()).append("\n"); + sb.append("Uptime: ").append(getUptimeMillis()).append("ms\n"); + + double[] distribution = getDistributionPercentages(); + sb.append("Distribution by Directory:\n"); + for (int i = 0; i < NUM_DIRECTORIES; i++) { + sb.append(" Directory ").append(i).append(": ") + .append(getOperationCount(i)).append(" ops (") + .append(String.format("%.1f", distribution[i])).append("%), ") + .append(getErrorCount(i)).append(" errors, ") + .append("avg: ").append(getAverageOperationTime(i) / 1_000_000).append("ms\n"); + } + + return sb.toString(); + } +} \ No newline at end of file diff --git a/server/src/main/java/org/opensearch/index/store/distributed/DistributedSegmentDirectory.java b/server/src/main/java/org/opensearch/index/store/distributed/DistributedSegmentDirectory.java new file mode 100644 index 0000000000000..a6ea07ed95cf7 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/distributed/DistributedSegmentDirectory.java @@ -0,0 +1,647 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.distributed; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.opensearch.index.shard.IndexShard; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * A Directory implementation that distributes segment files across multiple + * subdirectories based on primary term routing. This helps improve I/O distribution + * by spreading file access across multiple storage paths while maintaining full + * Lucene Directory compatibility and providing better temporal locality. + * + * Critical files like segments_N are kept in the base directory to maintain + * compatibility with existing Lucene expectations. + * + * @opensearch.internal + */ +public class DistributedSegmentDirectory extends FilterDirectory { + + private static final Logger logger = LogManager.getLogger(DistributedSegmentDirectory.class); + + // Legacy fields for backward compatibility + private final FilenameHasher hasher; + private final DirectoryManager directoryManager; + + // New primary term routing fields + private final PrimaryTermRouter router; + private final PrimaryTermDirectoryManager primaryTermDirectoryManager; + private final boolean usePrimaryTermRouting; + + /** + * Creates a new DistributedSegmentDirectory with default filename hasher (legacy). + * + * @param delegate the base Directory instance + * @param basePath the base filesystem path for creating subdirectories + * @throws IOException if subdirectory creation fails + */ + public DistributedSegmentDirectory(Directory delegate, Path basePath) throws IOException { + this(delegate, basePath, new DefaultFilenameHasher()); + } + + /** + * Creates a new DistributedSegmentDirectory with custom filename hasher (legacy). + * + * @param delegate the base Directory instance + * @param basePath the base filesystem path for creating subdirectories + * @param hasher the FilenameHasher implementation to use + * @throws IOException if subdirectory creation fails + */ + public DistributedSegmentDirectory(Directory delegate, Path basePath, FilenameHasher hasher) throws IOException { + super(delegate); + this.hasher = hasher; + this.directoryManager = new DirectoryManager(delegate, basePath); + + // Primary term routing not available in legacy constructor + this.router = null; + this.primaryTermDirectoryManager = null; + this.usePrimaryTermRouting = false; + + logger.info("Created DistributedSegmentDirectory with {} subdirectories at path: {} (legacy hash-based routing)", + directoryManager.getNumDirectories(), basePath); + } + + /** + * Creates a new DistributedSegmentDirectory with primary term routing. + * + * @param delegate the base Directory instance + * @param basePath the base filesystem path for creating subdirectories + * @param indexShard the IndexShard instance for primary term access + * @throws IOException if subdirectory creation fails + */ + public DistributedSegmentDirectory(Directory delegate, Path basePath, IndexShard indexShard) throws IOException { + super(delegate); + + // Initialize primary term routing components + IndexShardContext shardContext = new IndexShardContext(indexShard); + this.router = new PrimaryTermRouter(shardContext); + this.primaryTermDirectoryManager = new PrimaryTermDirectoryManager(delegate, basePath); + this.usePrimaryTermRouting = true; + + // Legacy fields set to null for primary term routing + this.hasher = null; + this.directoryManager = null; + + logger.info("Created DistributedSegmentDirectory with primary term routing at path: {}", basePath); + } + + /** + * Resolves the appropriate directory for a given filename. + * Uses primary term routing if available, otherwise falls back to hash-based routing. + * + * @param filename the filename to resolve + * @return the Directory instance that should handle this file + */ + protected Directory resolveDirectory(String filename) { + if (usePrimaryTermRouting) { + try { + return router.getDirectoryForFile(filename, primaryTermDirectoryManager); + } catch (IOException e) { + logger.warn("Primary term routing failed for file {}, falling back to base directory", filename, e); + return primaryTermDirectoryManager.getBaseDirectory(); + } + } else { + // Legacy hash-based routing + int directoryIndex = hasher.getDirectoryIndex(filename); + return directoryManager.getDirectory(directoryIndex); + } + } + + /** + * Gets the directory index for a filename (useful for logging and debugging). + * For primary term routing, returns the primary term as the "index". + * + * @param filename the filename to check + * @return the directory index (0-4) for hash routing, or primary term for primary term routing + */ + protected int getDirectoryIndex(String filename) { + if (usePrimaryTermRouting) { + if (router.isExcludedFile(filename)) { + return 0; // Base directory + } + return (int) router.getCurrentPrimaryTerm(); + } else { + return hasher.getDirectoryIndex(filename); + } + } + + /** + * Gets the current primary term (for primary term routing). + * + * @return the current primary term, or -1 if not using primary term routing + */ + public long getCurrentPrimaryTerm() { + if (usePrimaryTermRouting) { + return router.getCurrentPrimaryTerm(); + } + return -1L; + } + + /** + * Checks if this directory is using primary term routing. + * + * @return true if using primary term routing, false if using hash-based routing + */ + public boolean isUsingPrimaryTermRouting() { + return usePrimaryTermRouting; + } + + /** + * Gets the DirectoryManager instance (useful for testing, legacy routing only). + * + * @return the DirectoryManager, or null if using primary term routing + */ + protected DirectoryManager getDirectoryManager() { + return directoryManager; + } + + /** + * Gets the FilenameHasher instance (useful for testing, legacy routing only). + * + * @return the FilenameHasher, or null if using primary term routing + */ + protected FilenameHasher getHasher() { + return hasher; + } + + /** + * Gets the PrimaryTermRouter instance (useful for testing, primary term routing only). + * + * @return the PrimaryTermRouter, or null if using hash-based routing + */ + protected PrimaryTermRouter getRouter() { + return router; + } + + /** + * Gets the PrimaryTermDirectoryManager instance (useful for testing, primary term routing only). + * + * @return the PrimaryTermDirectoryManager, or null if using hash-based routing + */ + protected PrimaryTermDirectoryManager getPrimaryTermDirectoryManager() { + return primaryTermDirectoryManager; + } + + /** + * Gets detailed routing information for a filename for debugging purposes. + * + * @param filename the filename to analyze + * @return a string describing the routing decision + */ + public String getRoutingInfo(String filename) { + if (usePrimaryTermRouting) { + if (router.isExcludedFile(filename)) { + return String.format("File '%s' excluded from primary term routing, using base directory", filename); + } else { + long primaryTerm = router.getCurrentPrimaryTerm(); + return String.format("File '%s' routed to primary term %d directory", filename, primaryTerm); + } + } else { + int index = hasher.getDirectoryIndex(filename); + return String.format("File '%s' routed to hash-based directory index %d", filename, index); + } + } + + /** + * Gets statistics about the current directory usage. + * + * @return directory usage statistics + */ + public String getDirectoryStats() { + if (usePrimaryTermRouting) { + PrimaryTermDirectoryManager.DirectoryStats stats = primaryTermDirectoryManager.getDirectoryStats(); + return String.format("Primary term routing: %s", stats.toString()); + } else { + return String.format("Hash-based routing: %d directories", directoryManager.getNumDirectories()); + } + } + + /** + * Validates all managed directories for accessibility and health. + * + * @throws IOException if validation fails + */ + public void validateDirectories() throws IOException { + if (usePrimaryTermRouting) { + primaryTermDirectoryManager.validateDirectories(); + } else { + // Legacy validation would go here if DirectoryManager had such a method + logger.debug("Directory validation not implemented for hash-based routing"); + } + } + + @Override + public void close() throws IOException { + IOException exception = null; + + // Close the appropriate directory manager + if (usePrimaryTermRouting) { + try { + if (primaryTermDirectoryManager != null) { + primaryTermDirectoryManager.close(); + } + } catch (IOException e) { + exception = e; + } + } else { + try { + if (directoryManager != null) { + directoryManager.close(); + } + } catch (IOException e) { + exception = e; + } + } + + // Close the base directory + try { + super.close(); + } catch (IOException e) { + if (exception == null) { + exception = e; + } else { + exception.addSuppressed(e); + } + } + + if (exception != null) { + throw exception; + } + + logger.debug("Closed DistributedSegmentDirectory (primary term routing: {})", usePrimaryTermRouting); + } + + // Placeholder methods - will be implemented in subsequent tasks + + @Override + public String[] listAll() throws IOException { + Set allFiles = new HashSet<>(); + IOException lastException = null; + + if (usePrimaryTermRouting) { + // Primary term routing: collect files from base directory and all primary term directories + try { + // Add files from base directory + Directory baseDir = primaryTermDirectoryManager.getBaseDirectory(); + String[] baseFiles = baseDir.listAll(); + + for (String file : baseFiles) { + // Filter out primary term subdirectories from base directory listing + if (!file.startsWith(PrimaryTermDirectoryManager.PRIMARY_TERM_DIR_PREFIX)) { + allFiles.add(file); + } + } + + logger.debug("Listed {} files from base directory (filtered)", baseFiles.length); + } catch (IOException e) { + logger.warn("Failed to list files from base directory: {}", e.getMessage()); + lastException = e; + } + + // Add files from all primary term directories + for (Long primaryTerm : primaryTermDirectoryManager.getAllPrimaryTerms()) { + try { + Directory dir = primaryTermDirectoryManager.getDirectoryForPrimaryTerm(primaryTerm); + String[] files = dir.listAll(); + Collections.addAll(allFiles, files); + + logger.debug("Listed {} files from primary term {} directory", files.length, primaryTerm); + } catch (IOException e) { + logger.warn("Failed to list files from primary term {} directory: {}", primaryTerm, e.getMessage()); + lastException = e; + // Continue with other directories + } + } + } else { + // Legacy hash-based routing + for (int i = 0; i < directoryManager.getNumDirectories(); i++) { + try { + Directory dir = directoryManager.getDirectory(i); + String[] files = dir.listAll(); + + // Filter out subdirectory names from base directory (index 0) + if (i == 0) { + for (String file : files) { + // Only add files that are not our created subdirectories + if (!file.startsWith("varun_segments_") || !isSubdirectoryName(file)) { + allFiles.add(file); + } + } + } else { + // For other directories, add all files + Collections.addAll(allFiles, files); + } + + logger.debug("Listed {} files from directory {} (filtered: {})", + files.length, i, i == 0 ? "yes" : "no"); + } catch (IOException e) { + logger.warn("Failed to list files from directory {}: {}", i, e.getMessage()); + lastException = new DistributedDirectoryException( + "Failed to list files from directory", + i, + "listAll", + e); + // Continue trying other directories + } + } + } + + // If we couldn't list any directory and have an exception, throw it + if (allFiles.isEmpty() && lastException != null) { + throw lastException; + } + + String[] result = allFiles.toArray(new String[0]); + logger.info("Listed total {} unique files across all directories (routing: {})", + result.length, usePrimaryTermRouting ? "primary-term" : "hash-based"); + return result; + } + + /** + * Checks if a filename matches our subdirectory naming pattern. + * Our subdirectories are named "segments_1", "segments_2", etc. + * + * @param filename the filename to check + * @return true if this is one of our subdirectory names + */ + private boolean isSubdirectoryName(String filename) { + if (!filename.startsWith("varun_segments_")) { + return false; + } + + String suffix = filename.substring("varun_segments_".length()); + try { + int dirIndex = Integer.parseInt(suffix); + // Check if this matches our subdirectory naming pattern (1-4) + return dirIndex >= 1 && dirIndex < directoryManager.getNumDirectories(); + } catch (NumberFormatException e) { + // If it's not a number, it's a real segments file, not our subdirectory + return false; + } + } + + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + Directory targetDir = resolveDirectory(name); + + if (usePrimaryTermRouting) { + long primaryTerm = router.isExcludedFile(name) ? -1 : router.getCurrentPrimaryTerm(); + logger.debug("Opening input for file {} (primary term: {})", name, primaryTerm); + } else { + int dirIndex = getDirectoryIndex(name); + logger.debug("Opening input for file {} in directory {}", name, dirIndex); + } + + try { + return targetDir.openInput(name, context); + } catch (IOException e) { + if (usePrimaryTermRouting) { + throw new PrimaryTermRoutingException( + "Failed to open input for file: " + name, + PrimaryTermRoutingException.ErrorType.FILE_ROUTING_ERROR, + router.getCurrentPrimaryTerm(), + name, + e); + } else { + throw new DistributedDirectoryException( + "Failed to open input for file: " + name, + getDirectoryIndex(name), + "openInput", + e); + } + } + } + + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + Directory targetDir = resolveDirectory(name); + + if (usePrimaryTermRouting) { + long primaryTerm = router.isExcludedFile(name) ? -1 : router.getCurrentPrimaryTerm(); + logger.debug("Creating output for file {} (primary term: {})", name, primaryTerm); + } else { + int dirIndex = getDirectoryIndex(name); + logger.debug("Creating output for file {} in directory {}", name, dirIndex); + } + + try { + return targetDir.createOutput(name, context); + } catch (IOException e) { + if (usePrimaryTermRouting) { + throw new PrimaryTermRoutingException( + "Failed to create output for file: " + name, + PrimaryTermRoutingException.ErrorType.FILE_ROUTING_ERROR, + router.getCurrentPrimaryTerm(), + name, + e); + } else { + throw new DistributedDirectoryException( + "Failed to create output for file: " + name, + getDirectoryIndex(name), + "createOutput", + e); + } + } + } + + @Override + public void deleteFile(String name) throws IOException { + Directory targetDir = resolveDirectory(name); + + if (usePrimaryTermRouting) { + long primaryTerm = router.isExcludedFile(name) ? -1 : router.getCurrentPrimaryTerm(); + logger.debug("Deleting file {} (primary term: {})", name, primaryTerm); + } else { + int dirIndex = getDirectoryIndex(name); + logger.debug("Deleting file {} from directory {}", name, dirIndex); + } + + try { + targetDir.deleteFile(name); + } catch (IOException e) { + if (usePrimaryTermRouting) { + throw new PrimaryTermRoutingException( + "Failed to delete file: " + name, + PrimaryTermRoutingException.ErrorType.FILE_ROUTING_ERROR, + router.getCurrentPrimaryTerm(), + name, + e); + } else { + throw new DistributedDirectoryException( + "Failed to delete file: " + name, + getDirectoryIndex(name), + "deleteFile", + e); + } + } + } + + @Override + public long fileLength(String name) throws IOException { + Directory targetDir = resolveDirectory(name); + + try { + return targetDir.fileLength(name); + } catch (IOException e) { + if (usePrimaryTermRouting) { + throw new PrimaryTermRoutingException( + "Failed to get file length for: " + name, + PrimaryTermRoutingException.ErrorType.FILE_ROUTING_ERROR, + router.getCurrentPrimaryTerm(), + name, + e); + } else { + throw new DistributedDirectoryException( + "Failed to get file length for: " + name, + getDirectoryIndex(name), + "fileLength", + e); + } + } + } + + @Override + public void sync(Collection names) throws IOException { + List exceptions = new ArrayList<>(); + + if (usePrimaryTermRouting) { + // Group files by directory (base directory or primary term directories) + Map> filesByDirectory = new HashMap<>(); + + for (String name : names) { + Directory targetDir = resolveDirectory(name); + filesByDirectory.computeIfAbsent(targetDir, k -> new ArrayList<>()).add(name); + } + + for (Map.Entry> entry : filesByDirectory.entrySet()) { + Directory dir = entry.getKey(); + List files = entry.getValue(); + + try { + dir.sync(files); + logger.debug("Synced {} files in primary term directory", files.size()); + } catch (IOException e) { + logger.warn("Failed to sync {} files in primary term directory: {}", files.size(), e.getMessage()); + exceptions.add(new PrimaryTermRoutingException( + "Failed to sync files: " + files, + PrimaryTermRoutingException.ErrorType.FILE_ROUTING_ERROR, + router.getCurrentPrimaryTerm(), + e)); + } + } + } else { + // Legacy hash-based routing + Map> filesByDirectory = names.stream() + .collect(Collectors.groupingBy(this::getDirectoryIndex)); + + for (Map.Entry> entry : filesByDirectory.entrySet()) { + int dirIndex = entry.getKey(); + List files = entry.getValue(); + + try { + Directory dir = directoryManager.getDirectory(dirIndex); + dir.sync(files); + logger.debug("Synced {} files in directory {}", files.size(), dirIndex); + } catch (IOException e) { + logger.warn("Failed to sync {} files in directory {}: {}", files.size(), dirIndex, e.getMessage()); + exceptions.add(new DistributedDirectoryException( + "Failed to sync files: " + files, + dirIndex, + "sync", + e)); + } + } + } + + // If any sync operations failed, throw the first exception with others as suppressed + if (!exceptions.isEmpty()) { + IOException primaryException = exceptions.get(0); + for (int i = 1; i < exceptions.size(); i++) { + primaryException.addSuppressed(exceptions.get(i)); + } + throw primaryException; + } + + logger.info("Successfully synced {} files across directories (routing: {})", + names.size(), usePrimaryTermRouting ? "primary-term" : "hash-based"); + } + + @Override + public void rename(String source, String dest) throws IOException { + Directory sourceDir = resolveDirectory(source); + Directory destDir = resolveDirectory(dest); + + if (sourceDir != destDir) { + // Cross-directory rename - not supported atomically + if (usePrimaryTermRouting) { + long sourcePrimaryTerm = router.isExcludedFile(source) ? -1 : router.getCurrentPrimaryTerm(); + long destPrimaryTerm = router.isExcludedFile(dest) ? -1 : router.getCurrentPrimaryTerm(); + throw new PrimaryTermRoutingException( + "Cross-directory rename not supported: " + source + " (primary term " + sourcePrimaryTerm + + ") -> " + dest + " (primary term " + destPrimaryTerm + ")", + PrimaryTermRoutingException.ErrorType.FILE_ROUTING_ERROR, + sourcePrimaryTerm, + source); + } else { + int sourceIndex = getDirectoryIndex(source); + int destIndex = getDirectoryIndex(dest); + throw new DistributedDirectoryException( + "Cross-directory rename not supported: " + source + " (dir " + sourceIndex + + ") -> " + dest + " (dir " + destIndex + ")", + sourceIndex, + "rename"); + } + } + + try { + sourceDir.rename(source, dest); + if (usePrimaryTermRouting) { + long primaryTerm = router.isExcludedFile(source) ? -1 : router.getCurrentPrimaryTerm(); + logger.debug("Renamed {} to {} (primary term: {})", source, dest, primaryTerm); + } else { + int dirIndex = getDirectoryIndex(source); + logger.debug("Renamed {} to {} in directory {}", source, dest, dirIndex); + } + } catch (IOException e) { + if (usePrimaryTermRouting) { + throw new PrimaryTermRoutingException( + "Failed to rename " + source + " to " + dest, + PrimaryTermRoutingException.ErrorType.FILE_ROUTING_ERROR, + router.getCurrentPrimaryTerm(), + source, + e); + } else { + throw new DistributedDirectoryException( + "Failed to rename " + source + " to " + dest, + getDirectoryIndex(source), + "rename", + e); + } + } + } +} \ No newline at end of file diff --git a/server/src/main/java/org/opensearch/index/store/distributed/FallbackStrategy.java b/server/src/main/java/org/opensearch/index/store/distributed/FallbackStrategy.java new file mode 100644 index 0000000000000..da2bb64003557 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/distributed/FallbackStrategy.java @@ -0,0 +1,144 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.distributed; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.Directory; + +/** + * Provides fallback strategies for primary term routing failures. + * This class handles graceful degradation scenarios when primary term + * routing encounters errors, ensuring the system continues to function + * by falling back to the base directory. + * + * @opensearch.internal + */ +public class FallbackStrategy { + + private static final Logger logger = LogManager.getLogger(FallbackStrategy.class); + + /** + * Handles primary term routing failures by falling back to the base directory. + * This method is called when primary term access fails or routing encounters errors. + * + * @param filename the filename that failed to route + * @param manager the directory manager to get the base directory from + * @param cause the exception that caused the failure + * @return the base directory as a fallback + */ + public static Directory handlePrimaryTermFailure(String filename, PrimaryTermDirectoryManager manager, Exception cause) { + logger.warn("Primary term routing failed for file '{}', falling back to base directory. Cause: {}", + filename, cause.getMessage()); + + if (logger.isDebugEnabled()) { + logger.debug("Primary term routing failure details for file: " + filename, cause); + } + + return manager.getBaseDirectory(); + } + + /** + * Handles directory creation failures by falling back to the base directory. + * This method is called when creating a new primary term directory fails. + * + * @param primaryTerm the primary term for which directory creation failed + * @param manager the directory manager to get the base directory from + * @param cause the exception that caused the failure + * @return the base directory as a fallback + */ + public static Directory handleDirectoryCreationFailure(long primaryTerm, PrimaryTermDirectoryManager manager, Exception cause) { + logger.error("Failed to create directory for primary term {}, using base directory. Cause: {}", + primaryTerm, cause.getMessage()); + + if (logger.isDebugEnabled()) { + logger.debug("Directory creation failure details for primary term: " + primaryTerm, cause); + } + + return manager.getBaseDirectory(); + } + + /** + * Handles directory validation failures by falling back to the base directory. + * This method is called when directory accessibility or permission checks fail. + * + * @param primaryTerm the primary term for which validation failed + * @param directoryPath the path that failed validation + * @param manager the directory manager to get the base directory from + * @param cause the exception that caused the failure + * @return the base directory as a fallback + */ + public static Directory handleDirectoryValidationFailure(long primaryTerm, String directoryPath, + PrimaryTermDirectoryManager manager, Exception cause) { + logger.warn("Directory validation failed for primary term {} at path '{}', using base directory. Cause: {}", + primaryTerm, directoryPath, cause.getMessage()); + + if (logger.isDebugEnabled()) { + logger.debug("Directory validation failure details for primary term " + primaryTerm + " at path: " + directoryPath, cause); + } + + return manager.getBaseDirectory(); + } + + /** + * Handles IndexShard unavailability by using the default primary term. + * This method is called when IndexShard context is not available for primary term access. + * + * @param operation the operation that was being attempted + * @return the default primary term to use as fallback + */ + public static long handleIndexShardUnavailable(String operation) { + logger.debug("IndexShard unavailable for operation '{}', using default primary term", operation); + return IndexShardContext.DEFAULT_PRIMARY_TERM; + } + + /** + * Creates a PrimaryTermRoutingException with appropriate error context. + * This is a utility method for consistent exception creation with fallback logging. + * + * @param message the error message + * @param errorType the type of error + * @param primaryTerm the primary term associated with the error + * @param filename the filename associated with the error, may be null + * @param cause the underlying cause + * @return a new PrimaryTermRoutingException + */ + public static PrimaryTermRoutingException createRoutingException(String message, + PrimaryTermRoutingException.ErrorType errorType, + long primaryTerm, + String filename, + Throwable cause) { + PrimaryTermRoutingException exception = new PrimaryTermRoutingException(message, errorType, primaryTerm, filename, cause); + + logger.warn("Created routing exception: {}", exception.getMessage()); + if (logger.isDebugEnabled()) { + logger.debug("Routing exception details", exception); + } + + return exception; + } + + /** + * Logs a successful recovery from a fallback scenario. + * This method should be called when the system successfully recovers from a failure. + * + * @param operation the operation that recovered + * @param primaryTerm the primary term involved + * @param filename the filename involved, may be null + */ + public static void logSuccessfulRecovery(String operation, long primaryTerm, String filename) { + if (filename != null) { + logger.info("Successfully recovered from fallback for operation '{}' with primary term {} and file '{}'", + operation, primaryTerm, filename); + } else { + logger.info("Successfully recovered from fallback for operation '{}' with primary term {}", + operation, primaryTerm); + } + } +} \ No newline at end of file diff --git a/server/src/main/java/org/opensearch/index/store/distributed/FilenameHasher.java b/server/src/main/java/org/opensearch/index/store/distributed/FilenameHasher.java new file mode 100644 index 0000000000000..1b24894416836 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/distributed/FilenameHasher.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.distributed; + +/** + * Interface for mapping filenames to directory indices in a distributed segment directory. + * Implementations should provide consistent hashing to ensure the same filename always + * maps to the same directory index across multiple invocations. + * + * @opensearch.internal + */ +public interface FilenameHasher { + + /** + * Maps a filename to a directory index (0-4 for 5 directories). + * This method must be deterministic - the same filename should always + * return the same directory index. + * + * @param filename the segment filename to hash + * @return directory index between 0 and 4 (inclusive) + * @throws IllegalArgumentException if filename is null or empty + */ + int getDirectoryIndex(String filename); + + /** + * Checks if a filename should be excluded from distribution and kept + * in the base directory (index 0). Typically used for critical files + * like segments_N that must remain in their expected location. + * + * @param filename the filename to check + * @return true if file should remain in base directory, false if it can be distributed + */ + boolean isExcludedFile(String filename); +} \ No newline at end of file diff --git a/server/src/main/java/org/opensearch/index/store/distributed/IndexShardContext.java b/server/src/main/java/org/opensearch/index/store/distributed/IndexShardContext.java new file mode 100644 index 0000000000000..49e2c0d139085 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/distributed/IndexShardContext.java @@ -0,0 +1,133 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.distributed; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.index.shard.IndexShard; + +/** + * Provides safe access to IndexShard for primary term information with caching. + * This class handles cases where IndexShard may be unavailable and provides + * fallback mechanisms for primary term routing. + * + * @opensearch.internal + */ +public class IndexShardContext { + + private static final Logger logger = LogManager.getLogger(IndexShardContext.class); + + /** Default primary term used when IndexShard is unavailable */ + public static final long DEFAULT_PRIMARY_TERM = 0L; + + /** Cache duration for primary term values (1 second) */ + private static final long CACHE_DURATION_MS = 1000L; + + private final IndexShard indexShard; + private volatile long cachedPrimaryTerm = -1L; + private volatile long cacheTimestamp = 0L; + + /** + * Creates a new IndexShardContext with the given IndexShard reference. + * + * @param indexShard the IndexShard instance to access, may be null + */ + public IndexShardContext(IndexShard indexShard) { + this.indexShard = indexShard; + logger.debug("Created IndexShardContext with IndexShard: {}", indexShard != null ? "available" : "null"); + } + + /** + * Gets the current primary term from the IndexShard with caching. + * Returns a cached value if it's recent (within CACHE_DURATION_MS), + * otherwise fetches a fresh value from the IndexShard. + * + * @return the current primary term, or DEFAULT_PRIMARY_TERM if unavailable + */ + public long getPrimaryTerm() { + long currentTime = System.currentTimeMillis(); + + // Use cached value if recent and valid + if (cachedPrimaryTerm != -1L && (currentTime - cacheTimestamp) < CACHE_DURATION_MS) { + logger.trace("Using cached primary term: {}", cachedPrimaryTerm); + return cachedPrimaryTerm; + } + + if (indexShard != null) { + try { + long primaryTerm = indexShard.getOperationPrimaryTerm(); + + // Update cache + cachedPrimaryTerm = primaryTerm; + cacheTimestamp = currentTime; + + logger.debug("Retrieved primary term from IndexShard: {}", primaryTerm); + return primaryTerm; + } catch (Exception e) { + logger.warn("Failed to get primary term from IndexShard, using default", e); + // Don't cache failed attempts + } + } else { + logger.debug("IndexShard is null, using default primary term"); + } + + return DEFAULT_PRIMARY_TERM; + } + + /** + * Checks if the IndexShard is available for primary term access. + * + * @return true if IndexShard is available, false otherwise + */ + public boolean isAvailable() { + return indexShard != null; + } + + /** + * Invalidates the cached primary term, forcing a fresh fetch on next access. + * This can be useful when primary term changes are detected externally. + */ + public void invalidateCache() { + cachedPrimaryTerm = -1L; + cacheTimestamp = 0L; + logger.debug("Invalidated primary term cache"); + } + + /** + * Gets the IndexShard instance (for testing purposes). + * + * @return the IndexShard instance, may be null + */ + protected IndexShard getIndexShard() { + return indexShard; + } + + /** + * Gets the cached primary term value (for testing purposes). + * + * @return the cached primary term, or -1 if not cached + */ + protected long getCachedPrimaryTerm() { + return cachedPrimaryTerm; + } + + /** + * Checks if the cache is valid (for testing purposes). + * + * @return true if cache is valid and recent, false otherwise + */ + protected boolean isCacheValid() { + if (cachedPrimaryTerm == -1L) { + return false; + } + + long currentTime = System.currentTimeMillis(); + return (currentTime - cacheTimestamp) < CACHE_DURATION_MS; + } +} \ No newline at end of file diff --git a/server/src/main/java/org/opensearch/index/store/distributed/PrimaryTermAwareDirectoryWrapper.java b/server/src/main/java/org/opensearch/index/store/distributed/PrimaryTermAwareDirectoryWrapper.java new file mode 100644 index 0000000000000..f83fc5be3d75b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/distributed/PrimaryTermAwareDirectoryWrapper.java @@ -0,0 +1,218 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.distributed; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.opensearch.index.shard.IndexShard; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A wrapper directory that can be initialized without IndexShard and later + * configured to use primary term routing when the IndexShard becomes available. + * This allows the DirectoryFactory to create the directory before IndexShard + * is created, and then the IndexShard can configure primary term routing later. + * + * @opensearch.internal + */ +public class PrimaryTermAwareDirectoryWrapper extends FilterDirectory { + + private static final Logger logger = LogManager.getLogger(PrimaryTermAwareDirectoryWrapper.class); + + private final Path basePath; + private final AtomicReference distributedDirectory; + private volatile boolean primaryTermRoutingEnabled = false; + + /** + * Creates a new PrimaryTermAwareDirectoryWrapper. + * + * @param delegate the base Directory instance + * @param basePath the base filesystem path for creating subdirectories + */ + public PrimaryTermAwareDirectoryWrapper(Directory delegate, Path basePath) { + super(delegate); + this.basePath = basePath; + this.distributedDirectory = new AtomicReference<>(); + + logger.debug("Created PrimaryTermAwareDirectoryWrapper at path: {}", basePath); + } + + /** + * Enables primary term routing by setting the IndexShard reference. + * This method should be called after the IndexShard is created. + * + * @param indexShard the IndexShard instance for primary term access + * @throws IOException if primary term routing setup fails + */ + public void enablePrimaryTermRouting(IndexShard indexShard) throws IOException { + if (primaryTermRoutingEnabled) { + logger.debug("Primary term routing already enabled"); + return; + } + + try { + DistributedSegmentDirectory newDistributedDirectory = new DistributedSegmentDirectory(in, basePath, indexShard); + distributedDirectory.set(newDistributedDirectory); + primaryTermRoutingEnabled = true; + + logger.info("Enabled primary term routing for directory at path: {}", basePath); + } catch (IOException e) { + logger.error("Failed to enable primary term routing", e); + throw e; + } + } + + /** + * Gets the underlying distributed directory if primary term routing is enabled. + * + * @return the DistributedSegmentDirectory, or null if not enabled + */ + private DistributedSegmentDirectory getDistributedDirectory() { + return distributedDirectory.get(); + } + + /** + * Checks if primary term routing is enabled. + * + * @return true if primary term routing is enabled, false otherwise + */ + public boolean isPrimaryTermRoutingEnabled() { + return primaryTermRoutingEnabled; + } + + @Override + public String[] listAll() throws IOException { + DistributedSegmentDirectory distributed = getDistributedDirectory(); + if (distributed != null) { + return distributed.listAll(); + } + return super.listAll(); + } + + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + DistributedSegmentDirectory distributed = getDistributedDirectory(); + if (distributed != null) { + return distributed.openInput(name, context); + } + return super.openInput(name, context); + } + + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + DistributedSegmentDirectory distributed = getDistributedDirectory(); + if (distributed != null) { + return distributed.createOutput(name, context); + } + return super.createOutput(name, context); + } + + @Override + public void deleteFile(String name) throws IOException { + DistributedSegmentDirectory distributed = getDistributedDirectory(); + if (distributed != null) { + distributed.deleteFile(name); + } else { + super.deleteFile(name); + } + } + + @Override + public long fileLength(String name) throws IOException { + DistributedSegmentDirectory distributed = getDistributedDirectory(); + if (distributed != null) { + return distributed.fileLength(name); + } + return super.fileLength(name); + } + + @Override + public void sync(Collection names) throws IOException { + DistributedSegmentDirectory distributed = getDistributedDirectory(); + if (distributed != null) { + distributed.sync(names); + } else { + super.sync(names); + } + } + + @Override + public void rename(String source, String dest) throws IOException { + DistributedSegmentDirectory distributed = getDistributedDirectory(); + if (distributed != null) { + distributed.rename(source, dest); + } else { + super.rename(source, dest); + } + } + + @Override + public void close() throws IOException { + IOException exception = null; + + // Close the distributed directory if it exists + DistributedSegmentDirectory distributed = getDistributedDirectory(); + if (distributed != null) { + try { + distributed.close(); + } catch (IOException e) { + exception = e; + } + } + + // Close the base directory + try { + super.close(); + } catch (IOException e) { + if (exception == null) { + exception = e; + } else { + exception.addSuppressed(e); + } + } + + if (exception != null) { + throw exception; + } + + logger.debug("Closed PrimaryTermAwareDirectoryWrapper"); + } + + /** + * Gets routing information for debugging purposes. + * + * @param filename the filename to analyze + * @return routing information string + */ + public String getRoutingInfo(String filename) { + DistributedSegmentDirectory distributed = getDistributedDirectory(); + if (distributed != null) { + return distributed.getRoutingInfo(filename); + } + return "Primary term routing not enabled, using base directory"; + } + + /** + * Gets the base path. + * + * @return the base filesystem path + */ + public Path getBasePath() { + return basePath; + } +} \ No newline at end of file diff --git a/server/src/main/java/org/opensearch/index/store/distributed/PrimaryTermDirectoryManager.java b/server/src/main/java/org/opensearch/index/store/distributed/PrimaryTermDirectoryManager.java new file mode 100644 index 0000000000000..e43c33b0ed33a --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/distributed/PrimaryTermDirectoryManager.java @@ -0,0 +1,492 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.distributed; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Manages the creation and lifecycle of directories based on primary terms. + * This class handles dynamic directory creation, maintains mappings between + * primary terms and Directory instances, and provides directory validation + * and cleanup functionality. + * + * @opensearch.internal + */ +public class PrimaryTermDirectoryManager implements Closeable { + + private static final Logger logger = LogManager.getLogger(PrimaryTermDirectoryManager.class); + + /** Prefix used for primary term directory names */ + public static final String PRIMARY_TERM_DIR_PREFIX = "primary_term_"; + + private final Directory baseDirectory; + private final Path basePath; + private final Map primaryTermDirectories; + private volatile boolean closed = false; + + /** + * Creates a new PrimaryTermDirectoryManager. + * + * @param baseDirectory the base directory for fallback operations + * @param basePath the base filesystem path for creating subdirectories + * @throws IOException if initialization fails + */ + public PrimaryTermDirectoryManager(Directory baseDirectory, Path basePath) throws IOException { + this.baseDirectory = baseDirectory; + this.basePath = basePath; + this.primaryTermDirectories = new ConcurrentHashMap<>(); + + // Ensure base path exists + Files.createDirectories(basePath); + + logger.info("Created PrimaryTermDirectoryManager at path: {}", basePath); + } + + /** + * Gets the directory for a given primary term, creating it if necessary. + * This method uses lazy initialization with thread-safe double-check pattern. + * + * @param primaryTerm the primary term + * @return the Directory instance for this primary term + * @throws IOException if directory creation fails + */ + public Directory getDirectoryForPrimaryTerm(long primaryTerm) throws IOException { + ensureNotClosed(); + + Directory existing = primaryTermDirectories.get(primaryTerm); + if (existing != null) { + return existing; + } + + return createDirectoryForPrimaryTerm(primaryTerm); + } + + /** + * Creates a new directory for the given primary term using thread-safe double-check pattern. + * This method is synchronized to prevent race conditions during directory creation. + * + * @param primaryTerm the primary term + * @return the newly created Directory instance + * @throws IOException if directory creation fails + */ + private synchronized Directory createDirectoryForPrimaryTerm(long primaryTerm) throws IOException { + ensureNotClosed(); + + // Double-check pattern for thread safety + Directory existing = primaryTermDirectories.get(primaryTerm); + if (existing != null) { + return existing; + } + + try { + String dirName = getDirectoryNameForPrimaryTerm(primaryTerm); + Path dirPath = basePath.resolve(dirName); + + // Create directory if it doesn't exist + Files.createDirectories(dirPath); + + // Create and store the Directory instance + Directory newDirectory = FSDirectory.open(dirPath); + primaryTermDirectories.put(primaryTerm, newDirectory); + + logger.info("Created directory for primary term {}: {}", primaryTerm, dirPath); + return newDirectory; + + } catch (IOException e) { + logger.error("Failed to create directory for primary term {}", primaryTerm, e); + throw new PrimaryTermRoutingException( + "Failed to create directory for primary term " + primaryTerm, + PrimaryTermRoutingException.ErrorType.DIRECTORY_CREATION_ERROR, + primaryTerm, + e + ); + } + } + + /** + * Gets the directory name for a given primary term using the standard naming convention. + * + * @param primaryTerm the primary term + * @return the directory name (e.g., "primary_term_1") + */ + public String getDirectoryNameForPrimaryTerm(long primaryTerm) { + return PRIMARY_TERM_DIR_PREFIX + primaryTerm; + } + + /** + * Gets the base directory used for fallback operations. + * + * @return the base Directory instance + */ + public Directory getBaseDirectory() { + return baseDirectory; + } + + /** + * Gets the base filesystem path. + * + * @return the base Path + */ + public Path getBasePath() { + return basePath; + } + + /** + * Lists all primary term directories currently managed. + * + * @return a list of all Directory instances + */ + public List listAllDirectories() { + ensureNotClosed(); + return new ArrayList<>(primaryTermDirectories.values()); + } + + /** + * Gets all primary terms that have directories. + * + * @return a set of all primary terms with directories + */ + public Set getAllPrimaryTerms() { + ensureNotClosed(); + return primaryTermDirectories.keySet(); + } + + /** + * Gets the number of primary term directories currently managed. + * + * @return the number of directories + */ + public int getDirectoryCount() { + return primaryTermDirectories.size(); + } + + /** + * Checks if a directory exists for the given primary term. + * + * @param primaryTerm the primary term to check + * @return true if a directory exists, false otherwise + */ + public boolean hasDirectoryForPrimaryTerm(long primaryTerm) { + return primaryTermDirectories.containsKey(primaryTerm); + } + + /** + * Validates that all managed directories are accessible. + * This method checks each directory for basic accessibility. + * + * @throws IOException if any directory validation fails + */ + public void validateDirectories() throws IOException { + ensureNotClosed(); + + List validationErrors = new ArrayList<>(); + + for (Map.Entry entry : primaryTermDirectories.entrySet()) { + long primaryTerm = entry.getKey(); + Directory directory = entry.getValue(); + + try { + // Basic validation - try to list files + directory.listAll(); + logger.debug("Validated directory for primary term {}", primaryTerm); + } catch (Exception e) { + logger.warn("Validation failed for primary term {} directory", primaryTerm, e); + validationErrors.add(new PrimaryTermRoutingException( + "Directory validation failed for primary term " + primaryTerm, + PrimaryTermRoutingException.ErrorType.DIRECTORY_VALIDATION_ERROR, + primaryTerm, + e + )); + } + } + + if (!validationErrors.isEmpty()) { + IOException combinedException = new IOException("Directory validation failed for " + validationErrors.size() + " directories"); + for (Exception error : validationErrors) { + combinedException.addSuppressed(error); + } + throw combinedException; + } + + logger.info("Successfully validated {} primary term directories", primaryTermDirectories.size()); + } + + /** + * Closes all managed directories and releases resources. + * This method should be called when the manager is no longer needed. + */ + @Override + public void close() throws IOException { + if (closed) { + return; + } + + closed = true; + + List closeExceptions = new ArrayList<>(); + + // Close all primary term directories + for (Map.Entry entry : primaryTermDirectories.entrySet()) { + try { + entry.getValue().close(); + logger.debug("Closed directory for primary term {}", entry.getKey()); + } catch (IOException e) { + logger.warn("Failed to close directory for primary term {}", entry.getKey(), e); + closeExceptions.add(e); + } + } + + primaryTermDirectories.clear(); + + // If there were close exceptions, throw the first one with others as suppressed + if (!closeExceptions.isEmpty()) { + IOException primaryException = closeExceptions.get(0); + for (int i = 1; i < closeExceptions.size(); i++) { + primaryException.addSuppressed(closeExceptions.get(i)); + } + throw primaryException; + } + + logger.info("Closed PrimaryTermDirectoryManager with {} directories", primaryTermDirectories.size()); + } + + /** + * Checks if this manager has been closed. + * + * @return true if closed, false otherwise + */ + public boolean isClosed() { + return closed; + } + + /** + * Ensures that this manager has not been closed. + * + * @throws IllegalStateException if the manager is closed + */ + private void ensureNotClosed() { + if (closed) { + throw new IllegalStateException("PrimaryTermDirectoryManager has been closed"); + } + } + + /** + * Performs comprehensive validation of a specific primary term directory. + * This includes accessibility, permission checks, and basic operations. + * + * @param primaryTerm the primary term to validate + * @throws IOException if validation fails + */ + public void validatePrimaryTermDirectory(long primaryTerm) throws IOException { + ensureNotClosed(); + + Directory directory = primaryTermDirectories.get(primaryTerm); + if (directory == null) { + throw new PrimaryTermRoutingException( + "No directory found for primary term " + primaryTerm, + PrimaryTermRoutingException.ErrorType.DIRECTORY_VALIDATION_ERROR, + primaryTerm, + (Throwable) null + ); + } + + try { + // Test basic operations + String[] files = directory.listAll(); + logger.debug("Primary term {} directory contains {} files", primaryTerm, files.length); + + // Test sync operation + directory.sync(java.util.Arrays.asList(files)); + + // Additional validation could include: + // - Check directory permissions + // - Verify disk space + // - Test write operations + + } catch (Exception e) { + throw new PrimaryTermRoutingException( + "Validation failed for primary term " + primaryTerm + " directory", + PrimaryTermRoutingException.ErrorType.DIRECTORY_VALIDATION_ERROR, + primaryTerm, + e + ); + } + } + + /** + * Cleans up unused directories for primary terms that are no longer active. + * This method can be called periodically to reclaim disk space. + * + * @param activePrimaryTerms set of primary terms that should be kept + * @return the number of directories cleaned up + * @throws IOException if cleanup fails + */ + public int cleanupUnusedDirectories(Set activePrimaryTerms) throws IOException { + ensureNotClosed(); + + List toRemove = new ArrayList<>(); + + for (Long primaryTerm : primaryTermDirectories.keySet()) { + if (!activePrimaryTerms.contains(primaryTerm)) { + toRemove.add(primaryTerm); + } + } + + int cleanedUp = 0; + for (Long primaryTerm : toRemove) { + try { + Directory directory = primaryTermDirectories.remove(primaryTerm); + if (directory != null) { + directory.close(); + cleanedUp++; + logger.info("Cleaned up directory for inactive primary term {}", primaryTerm); + } + } catch (IOException e) { + logger.warn("Failed to cleanup directory for primary term {}", primaryTerm, e); + // Continue with other directories + } + } + + logger.info("Cleaned up {} unused primary term directories", cleanedUp); + return cleanedUp; + } + + /** + * Gets statistics about the managed directories. + * + * @return DirectoryStats containing information about managed directories + */ + public DirectoryStats getDirectoryStats() { + ensureNotClosed(); + + int totalDirectories = primaryTermDirectories.size(); + long minPrimaryTerm = primaryTermDirectories.keySet().stream().mapToLong(Long::longValue).min().orElse(-1L); + long maxPrimaryTerm = primaryTermDirectories.keySet().stream().mapToLong(Long::longValue).max().orElse(-1L); + + return new DirectoryStats(totalDirectories, minPrimaryTerm, maxPrimaryTerm, basePath.toString()); + } + + /** + * Statistics about managed directories. + */ + public static class DirectoryStats { + private final int totalDirectories; + private final long minPrimaryTerm; + private final long maxPrimaryTerm; + private final String basePath; + + public DirectoryStats(int totalDirectories, long minPrimaryTerm, long maxPrimaryTerm, String basePath) { + this.totalDirectories = totalDirectories; + this.minPrimaryTerm = minPrimaryTerm; + this.maxPrimaryTerm = maxPrimaryTerm; + this.basePath = basePath; + } + + public int getTotalDirectories() { + return totalDirectories; + } + + public long getMinPrimaryTerm() { + return minPrimaryTerm; + } + + public long getMaxPrimaryTerm() { + return maxPrimaryTerm; + } + + public String getBasePath() { + return basePath; + } + + @Override + public String toString() { + return String.format("DirectoryStats{totalDirectories=%d, minPrimaryTerm=%d, maxPrimaryTerm=%d, basePath='%s'}", + totalDirectories, minPrimaryTerm, maxPrimaryTerm, basePath); + } + } + + /** + * Checks if the filesystem has sufficient space for new directories. + * This is a basic check that can be extended with more sophisticated logic. + * + * @param requiredSpaceBytes minimum required space in bytes + * @return true if sufficient space is available + */ + public boolean hasAvailableSpace(long requiredSpaceBytes) { + try { + long usableSpace = Files.getFileStore(basePath).getUsableSpace(); + boolean hasSpace = usableSpace >= requiredSpaceBytes; + + if (!hasSpace) { + logger.warn("Insufficient disk space: required={} bytes, available={} bytes", + requiredSpaceBytes, usableSpace); + } + + return hasSpace; + } catch (IOException e) { + logger.warn("Failed to check available disk space", e); + return true; // Assume space is available if check fails + } + } + + /** + * Forces synchronization of all managed directories. + * This ensures all pending writes are flushed to disk. + * + * @throws IOException if sync fails for any directory + */ + public void syncAllDirectories() throws IOException { + ensureNotClosed(); + + List syncErrors = new ArrayList<>(); + + for (Map.Entry entry : primaryTermDirectories.entrySet()) { + try { + String[] files = entry.getValue().listAll(); + entry.getValue().sync(java.util.Arrays.asList(files)); + logger.debug("Synced directory for primary term {}", entry.getKey()); + } catch (IOException e) { + logger.warn("Failed to sync directory for primary term {}", entry.getKey(), e); + syncErrors.add(e); + } + } + + if (!syncErrors.isEmpty()) { + IOException combinedException = new IOException("Failed to sync " + syncErrors.size() + " directories"); + for (IOException error : syncErrors) { + combinedException.addSuppressed(error); + } + throw combinedException; + } + + logger.info("Successfully synced {} primary term directories", primaryTermDirectories.size()); + } + + /** + * Gets the primary term directories map (for testing purposes). + * + * @return the map of primary terms to directories + */ + protected Map getPrimaryTermDirectories() { + return primaryTermDirectories; + } +} \ No newline at end of file diff --git a/server/src/main/java/org/opensearch/index/store/distributed/PrimaryTermRouter.java b/server/src/main/java/org/opensearch/index/store/distributed/PrimaryTermRouter.java new file mode 100644 index 0000000000000..0d32c3ecab16c --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/distributed/PrimaryTermRouter.java @@ -0,0 +1,157 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.distributed; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.Directory; + +import java.io.IOException; +import java.util.Set; + +/** + * Routes segment files to appropriate directories based on primary term. + * This class determines which directory should handle a given file by either + * routing to a primary term-specific directory or keeping certain files + * (like segments_N) in the base directory for compatibility. + * + * @opensearch.internal + */ +public class PrimaryTermRouter { + + private static final Logger logger = LogManager.getLogger(PrimaryTermRouter.class); + + /** Files with these prefixes are excluded from primary term routing */ + private static final Set EXCLUDED_PREFIXES = Set.of( + "segments_", + "pending_segments_", + "write.lock"); + + private final IndexShardContext shardContext; + private volatile long lastLoggedPrimaryTerm = -1L; + + /** + * Creates a new PrimaryTermRouter with the given IndexShardContext. + * + * @param shardContext the context for accessing primary term information + */ + public PrimaryTermRouter(IndexShardContext shardContext) { + this.shardContext = shardContext; + logger.debug("Created PrimaryTermRouter with context: {}", + shardContext != null ? "available" : "null"); + } + + /** + * Determines the appropriate directory for a given filename. + * Files are routed based on primary term unless they are excluded + * (segments_N, lock files, temporary files). + * + * @param filename the name of the file to route + * @param directoryManager the manager for accessing directories + * @return the Directory that should handle this file + * @throws IOException if directory access fails + */ + public Directory getDirectoryForFile(String filename, PrimaryTermDirectoryManager directoryManager) + throws IOException { + if (filename == null || filename.isEmpty()) { + throw new IllegalArgumentException("Filename cannot be null or empty"); + } + + if (isExcludedFile(filename)) { + logger.trace("File {} excluded from primary term routing, using base directory", filename); + return directoryManager.getBaseDirectory(); + } + + long primaryTerm = getCurrentPrimaryTerm(); + + // Log primary term changes for debugging + if (primaryTerm != lastLoggedPrimaryTerm) { + logger.info("Primary term changed from {} to {} for file routing", + lastLoggedPrimaryTerm, primaryTerm); + lastLoggedPrimaryTerm = primaryTerm; + } + + try { + Directory directory = directoryManager.getDirectoryForPrimaryTerm(primaryTerm); + logger.trace("Routed file {} to primary term {} directory", filename, primaryTerm); + return directory; + } catch (IOException e) { + logger.warn("Failed to get directory for primary term {}, falling back to base directory for file {}", + primaryTerm, filename, e); + return directoryManager.getBaseDirectory(); + } + } + + /** + * Gets the current primary term from the IndexShardContext. + * Returns DEFAULT_PRIMARY_TERM if the context is unavailable. + * + * @return the current primary term + */ + public long getCurrentPrimaryTerm() { + if (shardContext != null && shardContext.isAvailable()) { + return shardContext.getPrimaryTerm(); + } + + logger.debug("IndexShardContext unavailable, using default primary term"); + return IndexShardContext.DEFAULT_PRIMARY_TERM; + } + + /** + * Checks if a filename should be excluded from primary term routing. + * Excluded files include: + * - Files starting with "segments_", "pending_segments_", "write.lock" + * - Temporary files ending with ".tmp" + * + * @param filename the filename to check + * @return true if the file should be excluded from routing, false otherwise + */ + public boolean isExcludedFile(String filename) { + if (filename == null || filename.isEmpty()) { + return true; // Treat invalid filenames as excluded + } + + // Exclude temporary files + if (filename.endsWith(".tmp")) { + return true; + } + + // Exclude files with specific prefixes + return EXCLUDED_PREFIXES.stream().anyMatch(filename::startsWith); + } + + /** + * Gets the directory name for a given primary term. + * Uses the naming convention "primary_term_X" where X is the primary term. + * + * @param primaryTerm the primary term + * @return the directory name for this primary term + */ + public String getDirectoryNameForPrimaryTerm(long primaryTerm) { + return PrimaryTermDirectoryManager.PRIMARY_TERM_DIR_PREFIX + primaryTerm; + } + + /** + * Gets the IndexShardContext (for testing purposes). + * + * @return the IndexShardContext instance + */ + protected IndexShardContext getShardContext() { + return shardContext; + } + + /** + * Gets the set of excluded prefixes (for testing purposes). + * + * @return the set of excluded file prefixes + */ + protected Set getExcludedPrefixes() { + return EXCLUDED_PREFIXES; + } +} \ No newline at end of file diff --git a/server/src/main/java/org/opensearch/index/store/distributed/PrimaryTermRoutingException.java b/server/src/main/java/org/opensearch/index/store/distributed/PrimaryTermRoutingException.java new file mode 100644 index 0000000000000..14319c5bbe620 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/distributed/PrimaryTermRoutingException.java @@ -0,0 +1,126 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.distributed; + +import java.io.IOException; + +/** + * Exception thrown when primary term-based routing operations fail. + * Provides detailed context about the failure including error type, + * primary term, and filename for debugging purposes. + * + * @opensearch.internal + */ +public class PrimaryTermRoutingException extends IOException { + + private final ErrorType errorType; + private final long primaryTerm; + private final String filename; + + /** + * Types of errors that can occur during primary term routing. + */ + public enum ErrorType { + /** Error accessing primary term from IndexShard */ + PRIMARY_TERM_ACCESS_ERROR, + + /** Error creating or accessing primary term directory */ + DIRECTORY_CREATION_ERROR, + + /** Error routing file to appropriate directory */ + FILE_ROUTING_ERROR, + + /** Error validating directory accessibility */ + DIRECTORY_VALIDATION_ERROR + } + + /** + * Creates a new PrimaryTermRoutingException. + * + * @param message the error message + * @param errorType the type of error that occurred + * @param primaryTerm the primary term associated with the error + * @param filename the filename associated with the error, may be null + * @param cause the underlying cause, may be null + */ + public PrimaryTermRoutingException(String message, ErrorType errorType, long primaryTerm, String filename, Throwable cause) { + super(buildDetailedMessage(message, errorType, primaryTerm, filename), cause); + this.errorType = errorType; + this.primaryTerm = primaryTerm; + this.filename = filename; + } + + /** + * Creates a new PrimaryTermRoutingException without a filename. + * + * @param message the error message + * @param errorType the type of error that occurred + * @param primaryTerm the primary term associated with the error + * @param cause the underlying cause, may be null + */ + public PrimaryTermRoutingException(String message, ErrorType errorType, long primaryTerm, Throwable cause) { + this(message, errorType, primaryTerm, null, cause); + } + + /** + * Creates a new PrimaryTermRoutingException without a cause. + * + * @param message the error message + * @param errorType the type of error that occurred + * @param primaryTerm the primary term associated with the error + * @param filename the filename associated with the error, may be null + */ + public PrimaryTermRoutingException(String message, ErrorType errorType, long primaryTerm, String filename) { + this(message, errorType, primaryTerm, filename, null); + } + + /** + * Gets the error type. + * + * @return the ErrorType + */ + public ErrorType getErrorType() { + return errorType; + } + + /** + * Gets the primary term associated with the error. + * + * @return the primary term + */ + public long getPrimaryTerm() { + return primaryTerm; + } + + /** + * Gets the filename associated with the error. + * + * @return the filename, may be null + */ + public String getFilename() { + return filename; + } + + /** + * Builds a detailed error message including context information. + */ + private static String buildDetailedMessage(String message, ErrorType errorType, long primaryTerm, String filename) { + StringBuilder sb = new StringBuilder(); + sb.append("[").append(errorType).append("] "); + sb.append(message); + sb.append(" (primaryTerm=").append(primaryTerm); + + if (filename != null) { + sb.append(", filename=").append(filename); + } + + sb.append(")"); + return sb.toString(); + } +} \ No newline at end of file diff --git a/server/src/test/java/org/opensearch/index/store/distributed/DistributedSegmentDirectoryIntegrationTests.java b/server/src/test/java/org/opensearch/index/store/distributed/DistributedSegmentDirectoryIntegrationTests.java new file mode 100644 index 0000000000000..d96253250af1c --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/distributed/DistributedSegmentDirectoryIntegrationTests.java @@ -0,0 +1,322 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.distributed; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class DistributedSegmentDirectoryIntegrationTests extends OpenSearchTestCase { + + private Path tempDir; + private Directory baseDirectory; + private IndexShard mockIndexShard; + private DistributedSegmentDirectory distributedDirectory; + + @Before + public void setUp() throws Exception { + super.setUp(); + tempDir = createTempDir(); + baseDirectory = FSDirectory.open(tempDir); + mockIndexShard = mock(IndexShard.class); + + // Mock IndexShard to return a primary term + when(mockIndexShard.getOperationPrimaryTerm()).thenReturn(1L); + + distributedDirectory = new DistributedSegmentDirectory(baseDirectory, tempDir, mockIndexShard); + } + + @After + public void tearDown() throws Exception { + if (distributedDirectory != null) { + distributedDirectory.close(); + } + super.tearDown(); + } + + public void testFileCreationAndReading() throws IOException { + String filename = "_0.si"; + String content = "test segment info content"; + + // Create a file + try (IndexOutput output = distributedDirectory.createOutput(filename, IOContext.DEFAULT)) { + output.writeString(content); + } + + // Verify file exists and can be read + assertTrue(Arrays.asList(distributedDirectory.listAll()).contains(filename)); + + try (IndexInput input = distributedDirectory.openInput(filename, IOContext.DEFAULT)) { + assertEquals(content, input.readString()); + } + + // Verify file length + assertEquals(content.getBytes().length + 4, distributedDirectory.fileLength(filename)); // +4 for string length prefix + } + + public void testSegmentsFileInBaseDirectory() throws IOException { + String segmentsFile = "segments_1"; + String content = "segments file content"; + + // Create segments file + try (IndexOutput output = distributedDirectory.createOutput(segmentsFile, IOContext.DEFAULT)) { + output.writeString(content); + } + + // Verify it's in the base directory (not routed to primary term directory) + assertTrue(Arrays.asList(distributedDirectory.listAll()).contains(segmentsFile)); + + // Verify routing info shows it's excluded + String routingInfo = distributedDirectory.getRoutingInfo(segmentsFile); + assertTrue(routingInfo.contains("excluded")); + } + + public void testMultipleFilesWithDifferentPrimaryTerms() throws IOException { + String[] filenames = {"_0.si", "_1.cfs", "_2.cfe"}; + String[] contents = {"content1", "content2", "content3"}; + + // Create files with primary term 1 + for (int i = 0; i < filenames.length; i++) { + try (IndexOutput output = distributedDirectory.createOutput(filenames[i], IOContext.DEFAULT)) { + output.writeString(contents[i]); + } + } + + // Change primary term + when(mockIndexShard.getOperationPrimaryTerm()).thenReturn(2L); + + String newFilename = "_3.doc"; + String newContent = "content for primary term 2"; + + // Create file with primary term 2 + try (IndexOutput output = distributedDirectory.createOutput(newFilename, IOContext.DEFAULT)) { + output.writeString(newContent); + } + + // Verify all files are listed + String[] allFiles = distributedDirectory.listAll(); + Set fileSet = new HashSet<>(Arrays.asList(allFiles)); + + for (String filename : filenames) { + assertTrue("File " + filename + " should be listed", fileSet.contains(filename)); + } + assertTrue("New file should be listed", fileSet.contains(newFilename)); + + // Verify files can be read correctly + for (int i = 0; i < filenames.length; i++) { + try (IndexInput input = distributedDirectory.openInput(filenames[i], IOContext.DEFAULT)) { + assertEquals(contents[i], input.readString()); + } + } + + try (IndexInput input = distributedDirectory.openInput(newFilename, IOContext.DEFAULT)) { + assertEquals(newContent, input.readString()); + } + } + + public void testFileDeletion() throws IOException { + String filename = "_0.si"; + String content = "test content"; + + // Create file + try (IndexOutput output = distributedDirectory.createOutput(filename, IOContext.DEFAULT)) { + output.writeString(content); + } + + // Verify file exists + assertTrue(Arrays.asList(distributedDirectory.listAll()).contains(filename)); + + // Delete file + distributedDirectory.deleteFile(filename); + + // Verify file is deleted + assertFalse(Arrays.asList(distributedDirectory.listAll()).contains(filename)); + } + + public void testSyncOperation() throws IOException { + String[] filenames = {"_0.si", "_1.cfs", "segments_1"}; + String content = "sync test content"; + + // Create multiple files + for (String filename : filenames) { + try (IndexOutput output = distributedDirectory.createOutput(filename, IOContext.DEFAULT)) { + output.writeString(content); + } + } + + // Sync all files - should not throw exception + Collection filesToSync = Arrays.asList(filenames); + try { + distributedDirectory.sync(filesToSync); + } catch (IOException e) { + fail("sync should not throw exception: " + e.getMessage()); + } + } + + public void testRenameOperation() throws IOException { + String sourceFilename = "_0.si"; + String destFilename = "_0_renamed.si"; + String content = "rename test content"; + + // Create source file + try (IndexOutput output = distributedDirectory.createOutput(sourceFilename, IOContext.DEFAULT)) { + output.writeString(content); + } + + // Rename file + distributedDirectory.rename(sourceFilename, destFilename); + + // Verify source file is gone and dest file exists + String[] allFiles = distributedDirectory.listAll(); + Set fileSet = new HashSet<>(Arrays.asList(allFiles)); + + assertFalse("Source file should be gone", fileSet.contains(sourceFilename)); + assertTrue("Dest file should exist", fileSet.contains(destFilename)); + + // Verify content is preserved + try (IndexInput input = distributedDirectory.openInput(destFilename, IOContext.DEFAULT)) { + assertEquals(content, input.readString()); + } + } + + public void testCrossDirectoryRenameFailure() throws IOException { + String sourceFilename = "_0.si"; // Regular file (goes to primary term directory) + String destFilename = "segments_1"; // Excluded file (goes to base directory) + String content = "cross directory test"; + + // Create source file + try (IndexOutput output = distributedDirectory.createOutput(sourceFilename, IOContext.DEFAULT)) { + output.writeString(content); + } + + // Attempt cross-directory rename should fail + PrimaryTermRoutingException exception = expectThrows( + PrimaryTermRoutingException.class, + () -> distributedDirectory.rename(sourceFilename, destFilename) + ); + + assertEquals(PrimaryTermRoutingException.ErrorType.FILE_ROUTING_ERROR, exception.getErrorType()); + assertTrue(exception.getMessage().contains("Cross-directory rename not supported")); + } + + public void testDirectoryStats() throws IOException { + // Initially should show primary term routing + String stats = distributedDirectory.getDirectoryStats(); + assertTrue(stats.contains("Primary term routing")); + + // Create some files to populate directories + try (IndexOutput output = distributedDirectory.createOutput("_0.si", IOContext.DEFAULT)) { + output.writeString("test"); + } + + // Stats should still be available + stats = distributedDirectory.getDirectoryStats(); + assertNotNull(stats); + } + + public void testRoutingInfo() { + // Test excluded file + String segmentsFile = "segments_1"; + String routingInfo = distributedDirectory.getRoutingInfo(segmentsFile); + assertTrue(routingInfo.contains("excluded")); + assertTrue(routingInfo.contains("base directory")); + + // Test regular file + String regularFile = "_0.si"; + routingInfo = distributedDirectory.getRoutingInfo(regularFile); + assertTrue(routingInfo.contains("primary term")); + } + + public void testValidateDirectories() throws IOException { + // Create some files to ensure directories exist + try (IndexOutput output = distributedDirectory.createOutput("_0.si", IOContext.DEFAULT)) { + output.writeString("test"); + } + + // Validation should not throw exception + try { + distributedDirectory.validateDirectories(); + } catch (IOException e) { + fail("validateDirectories should not throw exception: " + e.getMessage()); + } + } + + public void testCurrentPrimaryTerm() { + // Should return the mocked primary term + assertEquals(1L, distributedDirectory.getCurrentPrimaryTerm()); + + // Change mock and verify + when(mockIndexShard.getOperationPrimaryTerm()).thenReturn(5L); + assertEquals(5L, distributedDirectory.getCurrentPrimaryTerm()); + } + + public void testIsUsingPrimaryTermRouting() { + assertTrue(distributedDirectory.isUsingPrimaryTermRouting()); + } + + public void testLegacyHashBasedRouting() throws IOException { + // Create a legacy distributed directory (without IndexShard) + DistributedSegmentDirectory legacyDirectory = new DistributedSegmentDirectory(baseDirectory, tempDir); + + try { + assertFalse(legacyDirectory.isUsingPrimaryTermRouting()); + assertEquals(-1L, legacyDirectory.getCurrentPrimaryTerm()); + + // Should still work for basic operations + try (IndexOutput output = legacyDirectory.createOutput("_0.si", IOContext.DEFAULT)) { + output.writeString("legacy test"); + } + + assertTrue(Arrays.asList(legacyDirectory.listAll()).contains("_0.si")); + + } finally { + legacyDirectory.close(); + } + } + + public void testConcurrentFileOperations() throws IOException { + String[] filenames = {"_0.si", "_1.cfs", "_2.cfe", "_3.doc"}; + String content = "concurrent test content"; + + // Create multiple files concurrently (simulated) + for (String filename : filenames) { + try (IndexOutput output = distributedDirectory.createOutput(filename, IOContext.DEFAULT)) { + output.writeString(content + "_" + filename); + } + } + + // Verify all files exist and have correct content + String[] allFiles = distributedDirectory.listAll(); + Set fileSet = new HashSet<>(Arrays.asList(allFiles)); + + for (String filename : filenames) { + assertTrue("File " + filename + " should exist", fileSet.contains(filename)); + + try (IndexInput input = distributedDirectory.openInput(filename, IOContext.DEFAULT)) { + assertEquals(content + "_" + filename, input.readString()); + } + } + } +} \ No newline at end of file diff --git a/server/src/test/java/org/opensearch/index/store/distributed/IndexShardContextTests.java b/server/src/test/java/org/opensearch/index/store/distributed/IndexShardContextTests.java new file mode 100644 index 0000000000000..7919c9349530b --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/distributed/IndexShardContextTests.java @@ -0,0 +1,126 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.distributed; + +import org.opensearch.index.shard.IndexShard; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class IndexShardContextTests extends OpenSearchTestCase { + + private IndexShard mockIndexShard; + private IndexShardContext context; + + @Before + public void setUp() throws Exception { + super.setUp(); + mockIndexShard = mock(IndexShard.class); + } + + public void testGetPrimaryTermWithValidIndexShard() { + long expectedPrimaryTerm = 5L; + when(mockIndexShard.getOperationPrimaryTerm()).thenReturn(expectedPrimaryTerm); + + context = new IndexShardContext(mockIndexShard); + + assertEquals(expectedPrimaryTerm, context.getPrimaryTerm()); + assertTrue(context.isAvailable()); + } + + public void testGetPrimaryTermWithNullIndexShard() { + context = new IndexShardContext(null); + + assertEquals(IndexShardContext.DEFAULT_PRIMARY_TERM, context.getPrimaryTerm()); + assertFalse(context.isAvailable()); + } + + public void testGetPrimaryTermWithExceptionFromIndexShard() { + when(mockIndexShard.getOperationPrimaryTerm()).thenThrow(new RuntimeException("Test exception")); + + context = new IndexShardContext(mockIndexShard); + + assertEquals(IndexShardContext.DEFAULT_PRIMARY_TERM, context.getPrimaryTerm()); + assertTrue(context.isAvailable()); // IndexShard is available, but throws exception + } + + public void testPrimaryTermCaching() { + long primaryTerm1 = 3L; + long primaryTerm2 = 4L; + + when(mockIndexShard.getOperationPrimaryTerm()).thenReturn(primaryTerm1); + + context = new IndexShardContext(mockIndexShard); + + // First call should fetch from IndexShard + assertEquals(primaryTerm1, context.getPrimaryTerm()); + + // Second call within cache duration should return cached value + assertEquals(primaryTerm1, context.getPrimaryTerm()); + + // Change the mock to return different value + when(mockIndexShard.getOperationPrimaryTerm()).thenReturn(primaryTerm2); + + // Should still return cached value + assertEquals(primaryTerm1, context.getPrimaryTerm()); + + // Invalidate cache and verify new value is fetched + context.invalidateCache(); + assertEquals(primaryTerm2, context.getPrimaryTerm()); + } + + public void testCacheExpiration() throws InterruptedException { + long primaryTerm1 = 7L; + long primaryTerm2 = 8L; + + when(mockIndexShard.getOperationPrimaryTerm()).thenReturn(primaryTerm1); + + context = new IndexShardContext(mockIndexShard); + + // First call + assertEquals(primaryTerm1, context.getPrimaryTerm()); + assertTrue(context.isCacheValid()); + + // Change mock return value + when(mockIndexShard.getOperationPrimaryTerm()).thenReturn(primaryTerm2); + + // Wait for cache to expire (cache duration is 1 second) + Thread.sleep(1100); + + // Should fetch new value after cache expiration + assertEquals(primaryTerm2, context.getPrimaryTerm()); + } + + public void testInvalidateCache() { + long primaryTerm = 10L; + when(mockIndexShard.getOperationPrimaryTerm()).thenReturn(primaryTerm); + + context = new IndexShardContext(mockIndexShard); + + // Populate cache + assertEquals(primaryTerm, context.getPrimaryTerm()); + assertTrue(context.isCacheValid()); + + // Invalidate cache + context.invalidateCache(); + assertFalse(context.isCacheValid()); + assertEquals(-1L, context.getCachedPrimaryTerm()); + } + + public void testGetIndexShard() { + context = new IndexShardContext(mockIndexShard); + assertEquals(mockIndexShard, context.getIndexShard()); + + context = new IndexShardContext(null); + assertNull(context.getIndexShard()); + } +} \ No newline at end of file diff --git a/server/src/test/java/org/opensearch/index/store/distributed/PrimaryTermAwareDirectoryWrapperTests.java b/server/src/test/java/org/opensearch/index/store/distributed/PrimaryTermAwareDirectoryWrapperTests.java new file mode 100644 index 0000000000000..89b6536dc3089 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/distributed/PrimaryTermAwareDirectoryWrapperTests.java @@ -0,0 +1,274 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.distributed; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class PrimaryTermAwareDirectoryWrapperTests extends OpenSearchTestCase { + + private Path tempDir; + private Directory baseDirectory; + private IndexShard mockIndexShard; + private PrimaryTermAwareDirectoryWrapper wrapper; + + @Before + public void setUp() throws Exception { + super.setUp(); + tempDir = createTempDir(); + baseDirectory = FSDirectory.open(tempDir); + mockIndexShard = mock(IndexShard.class); + + wrapper = new PrimaryTermAwareDirectoryWrapper(baseDirectory, tempDir); + } + + @After + public void tearDown() throws Exception { + if (wrapper != null) { + wrapper.close(); + } + super.tearDown(); + } + + public void testInitialState() { + assertFalse(wrapper.isPrimaryTermRoutingEnabled()); + assertEquals(tempDir, wrapper.getBasePath()); + + String routingInfo = wrapper.getRoutingInfo("test.txt"); + assertTrue(routingInfo.contains("Primary term routing not enabled")); + } + + public void testEnablePrimaryTermRouting() throws IOException { + when(mockIndexShard.getOperationPrimaryTerm()).thenReturn(1L); + + assertFalse(wrapper.isPrimaryTermRoutingEnabled()); + + wrapper.enablePrimaryTermRouting(mockIndexShard); + + assertTrue(wrapper.isPrimaryTermRoutingEnabled()); + } + + public void testEnablePrimaryTermRoutingTwice() throws IOException { + when(mockIndexShard.getOperationPrimaryTerm()).thenReturn(1L); + + wrapper.enablePrimaryTermRouting(mockIndexShard); + assertTrue(wrapper.isPrimaryTermRoutingEnabled()); + + // Enabling again should not cause issues + wrapper.enablePrimaryTermRouting(mockIndexShard); + assertTrue(wrapper.isPrimaryTermRoutingEnabled()); + } + + public void testFileOperationsBeforePrimaryTermRouting() throws IOException { + String filename = "test.txt"; + String content = "test content"; + + // Operations should work with base directory before primary term routing is enabled + try (IndexOutput output = wrapper.createOutput(filename, IOContext.DEFAULT)) { + output.writeString(content); + } + + assertTrue(Arrays.asList(wrapper.listAll()).contains(filename)); + assertEquals(content.getBytes().length + 4, wrapper.fileLength(filename)); // +4 for string length prefix + } + + public void testFileOperationsAfterPrimaryTermRouting() throws IOException { + when(mockIndexShard.getOperationPrimaryTerm()).thenReturn(2L); + + // Enable primary term routing + wrapper.enablePrimaryTermRouting(mockIndexShard); + + String filename = "_0.si"; + String content = "primary term content"; + + // Operations should work with primary term routing + try (IndexOutput output = wrapper.createOutput(filename, IOContext.DEFAULT)) { + output.writeString(content); + } + + assertTrue(Arrays.asList(wrapper.listAll()).contains(filename)); + + String routingInfo = wrapper.getRoutingInfo(filename); + assertTrue(routingInfo.contains("primary term")); + } + + public void testSegmentsFileRouting() throws IOException { + when(mockIndexShard.getOperationPrimaryTerm()).thenReturn(3L); + + wrapper.enablePrimaryTermRouting(mockIndexShard); + + String segmentsFile = "segments_1"; + String content = "segments content"; + + try (IndexOutput output = wrapper.createOutput(segmentsFile, IOContext.DEFAULT)) { + output.writeString(content); + } + + String routingInfo = wrapper.getRoutingInfo(segmentsFile); + assertTrue(routingInfo.contains("excluded")); + assertTrue(routingInfo.contains("base directory")); + } + + public void testFileDeletion() throws IOException { + when(mockIndexShard.getOperationPrimaryTerm()).thenReturn(1L); + + String filename = "_0.cfs"; + String content = "deletion test"; + + // Create file before enabling primary term routing + try (IndexOutput output = wrapper.createOutput(filename, IOContext.DEFAULT)) { + output.writeString(content); + } + + assertTrue(Arrays.asList(wrapper.listAll()).contains(filename)); + + // Enable primary term routing + wrapper.enablePrimaryTermRouting(mockIndexShard); + + // Delete should work + wrapper.deleteFile(filename); + assertFalse(Arrays.asList(wrapper.listAll()).contains(filename)); + } + + public void testSyncOperation() throws IOException { + when(mockIndexShard.getOperationPrimaryTerm()).thenReturn(1L); + + String[] filenames = {"_0.si", "_1.cfs"}; + + // Create files + for (String filename : filenames) { + try (IndexOutput output = wrapper.createOutput(filename, IOContext.DEFAULT)) { + output.writeString("sync test"); + } + } + + // Enable primary term routing + wrapper.enablePrimaryTermRouting(mockIndexShard); + + // Sync should work + try { + wrapper.sync(Arrays.asList(filenames)); + } catch (IOException e) { + fail("sync should not throw exception: " + e.getMessage()); + } + } + + public void testRenameOperation() throws IOException { + when(mockIndexShard.getOperationPrimaryTerm()).thenReturn(1L); + + String sourceFile = "_0.si"; + String destFile = "_0_renamed.si"; + String content = "rename test"; + + // Create source file + try (IndexOutput output = wrapper.createOutput(sourceFile, IOContext.DEFAULT)) { + output.writeString(content); + } + + // Enable primary term routing + wrapper.enablePrimaryTermRouting(mockIndexShard); + + // Rename should work + wrapper.rename(sourceFile, destFile); + + assertFalse(Arrays.asList(wrapper.listAll()).contains(sourceFile)); + assertTrue(Arrays.asList(wrapper.listAll()).contains(destFile)); + } + + public void testMixedOperationsBeforeAndAfterEnabling() throws IOException { + when(mockIndexShard.getOperationPrimaryTerm()).thenReturn(1L); + + String file1 = "before.txt"; + String file2 = "_0.si"; + String content1 = "before enabling"; + String content2 = "after enabling"; + + // Create file before enabling + try (IndexOutput output = wrapper.createOutput(file1, IOContext.DEFAULT)) { + output.writeString(content1); + } + + // Enable primary term routing + wrapper.enablePrimaryTermRouting(mockIndexShard); + + // Create file after enabling + try (IndexOutput output = wrapper.createOutput(file2, IOContext.DEFAULT)) { + output.writeString(content2); + } + + // Both files should be accessible + String[] allFiles = wrapper.listAll(); + assertTrue(Arrays.asList(allFiles).contains(file1)); + assertTrue(Arrays.asList(allFiles).contains(file2)); + + // Verify routing info + String routingInfo2 = wrapper.getRoutingInfo(file2); + + // Should show primary term routing (since it's enabled) + assertTrue(routingInfo2.contains("primary term")); + } + + public void testCloseWithoutEnabling() throws IOException { + String filename = "test.txt"; + + try (IndexOutput output = wrapper.createOutput(filename, IOContext.DEFAULT)) { + output.writeString("test"); + } + + // Should close without issues + try { + wrapper.close(); + } catch (IOException e) { + fail("close should not throw exception: " + e.getMessage()); + } + } + + public void testCloseAfterEnabling() throws IOException { + when(mockIndexShard.getOperationPrimaryTerm()).thenReturn(1L); + + wrapper.enablePrimaryTermRouting(mockIndexShard); + + String filename = "_0.si"; + try (IndexOutput output = wrapper.createOutput(filename, IOContext.DEFAULT)) { + output.writeString("test"); + } + + // Should close without issues + try { + wrapper.close(); + } catch (IOException e) { + fail("close should not throw exception: " + e.getMessage()); + } + } + + public void testEnablePrimaryTermRoutingWithException() throws IOException { + // Mock IndexShard to throw exception + when(mockIndexShard.getOperationPrimaryTerm()).thenThrow(new RuntimeException("Mock exception")); + + // Should handle exception gracefully and not enable routing + expectThrows(IOException.class, () -> { + wrapper.enablePrimaryTermRouting(mockIndexShard); + }); + + assertFalse(wrapper.isPrimaryTermRoutingEnabled()); + } +} \ No newline at end of file diff --git a/server/src/test/java/org/opensearch/index/store/distributed/PrimaryTermDirectoryManagerTests.java b/server/src/test/java/org/opensearch/index/store/distributed/PrimaryTermDirectoryManagerTests.java new file mode 100644 index 0000000000000..ffe38b3dd6a07 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/distributed/PrimaryTermDirectoryManagerTests.java @@ -0,0 +1,260 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.distributed; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Set; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class PrimaryTermDirectoryManagerTests extends OpenSearchTestCase { + + private Path tempDir; + private Directory mockBaseDirectory; + private PrimaryTermDirectoryManager manager; + + @Before + public void setUp() throws Exception { + super.setUp(); + tempDir = createTempDir(); + mockBaseDirectory = mock(Directory.class); + manager = new PrimaryTermDirectoryManager(mockBaseDirectory, tempDir); + } + + @After + public void tearDown() throws Exception { + if (manager != null && !manager.isClosed()) { + manager.close(); + } + super.tearDown(); + } + + public void testGetBaseDirectory() { + assertEquals(mockBaseDirectory, manager.getBaseDirectory()); + } + + public void testGetBasePath() { + assertEquals(tempDir, manager.getBasePath()); + } + + public void testGetDirectoryNameForPrimaryTerm() { + assertEquals("primary_term_0", manager.getDirectoryNameForPrimaryTerm(0L)); + assertEquals("primary_term_5", manager.getDirectoryNameForPrimaryTerm(5L)); + assertEquals("primary_term_100", manager.getDirectoryNameForPrimaryTerm(100L)); + } + + public void testCreateDirectoryForPrimaryTerm() throws IOException { + long primaryTerm = 1L; + + assertFalse(manager.hasDirectoryForPrimaryTerm(primaryTerm)); + assertEquals(0, manager.getDirectoryCount()); + + Directory directory = manager.getDirectoryForPrimaryTerm(primaryTerm); + + assertNotNull(directory); + assertTrue(manager.hasDirectoryForPrimaryTerm(primaryTerm)); + assertEquals(1, manager.getDirectoryCount()); + + // Verify directory was created on filesystem + Path expectedPath = tempDir.resolve("primary_term_1"); + assertTrue(Files.exists(expectedPath)); + assertTrue(Files.isDirectory(expectedPath)); + } + + public void testGetDirectoryForPrimaryTermReturnsExisting() throws IOException { + long primaryTerm = 2L; + + Directory directory1 = manager.getDirectoryForPrimaryTerm(primaryTerm); + Directory directory2 = manager.getDirectoryForPrimaryTerm(primaryTerm); + + assertSame("Should return same directory instance", directory1, directory2); + assertEquals(1, manager.getDirectoryCount()); + } + + public void testMultiplePrimaryTermDirectories() throws IOException { + long[] primaryTerms = {1L, 3L, 5L}; + + for (long primaryTerm : primaryTerms) { + Directory directory = manager.getDirectoryForPrimaryTerm(primaryTerm); + assertNotNull(directory); + assertTrue(manager.hasDirectoryForPrimaryTerm(primaryTerm)); + } + + assertEquals(primaryTerms.length, manager.getDirectoryCount()); + + Set allPrimaryTerms = manager.getAllPrimaryTerms(); + assertEquals(primaryTerms.length, allPrimaryTerms.size()); + for (long primaryTerm : primaryTerms) { + assertTrue(allPrimaryTerms.contains(primaryTerm)); + } + } + + public void testListAllDirectories() throws IOException { + long[] primaryTerms = {1L, 2L, 3L}; + + for (long primaryTerm : primaryTerms) { + manager.getDirectoryForPrimaryTerm(primaryTerm); + } + + var directories = manager.listAllDirectories(); + assertEquals(primaryTerms.length, directories.size()); + + // All directories should be different instances + for (int i = 0; i < directories.size(); i++) { + for (int j = i + 1; j < directories.size(); j++) { + assertNotSame(directories.get(i), directories.get(j)); + } + } + } + + public void testValidateDirectories() throws IOException { + // Create some directories + manager.getDirectoryForPrimaryTerm(1L); + manager.getDirectoryForPrimaryTerm(2L); + + // Should not throw exception for valid directories + try { + manager.validateDirectories(); + } catch (IOException e) { + fail("validateDirectories should not throw exception for valid directories: " + e.getMessage()); + } + } + + public void testValidatePrimaryTermDirectory() throws IOException { + long primaryTerm = 1L; + manager.getDirectoryForPrimaryTerm(primaryTerm); + + // Should not throw exception for valid directory + try { + manager.validatePrimaryTermDirectory(primaryTerm); + } catch (IOException e) { + fail("validatePrimaryTermDirectory should not throw exception for valid directory: " + e.getMessage()); + } + } + + public void testValidatePrimaryTermDirectoryNotFound() { + long primaryTerm = 999L; + + PrimaryTermRoutingException exception = expectThrows( + PrimaryTermRoutingException.class, + () -> manager.validatePrimaryTermDirectory(primaryTerm) + ); + + assertEquals(PrimaryTermRoutingException.ErrorType.DIRECTORY_VALIDATION_ERROR, exception.getErrorType()); + assertEquals(primaryTerm, exception.getPrimaryTerm()); + } + + public void testGetDirectoryStats() throws IOException { + // Initially no directories + var stats = manager.getDirectoryStats(); + assertEquals(0, stats.getTotalDirectories()); + assertEquals(-1L, stats.getMinPrimaryTerm()); + assertEquals(-1L, stats.getMaxPrimaryTerm()); + assertEquals(tempDir.toString(), stats.getBasePath()); + + // Add some directories + manager.getDirectoryForPrimaryTerm(3L); + manager.getDirectoryForPrimaryTerm(1L); + manager.getDirectoryForPrimaryTerm(7L); + + stats = manager.getDirectoryStats(); + assertEquals(3, stats.getTotalDirectories()); + assertEquals(1L, stats.getMinPrimaryTerm()); + assertEquals(7L, stats.getMaxPrimaryTerm()); + } + + public void testCleanupUnusedDirectories() throws IOException { + // Create directories for primary terms 1, 2, 3 + manager.getDirectoryForPrimaryTerm(1L); + manager.getDirectoryForPrimaryTerm(2L); + manager.getDirectoryForPrimaryTerm(3L); + + assertEquals(3, manager.getDirectoryCount()); + + // Keep only primary terms 1 and 3 + Set activePrimaryTerms = Set.of(1L, 3L); + int cleanedUp = manager.cleanupUnusedDirectories(activePrimaryTerms); + + assertEquals(1, cleanedUp); // Should have cleaned up primary term 2 + assertEquals(2, manager.getDirectoryCount()); + assertTrue(manager.hasDirectoryForPrimaryTerm(1L)); + assertFalse(manager.hasDirectoryForPrimaryTerm(2L)); + assertTrue(manager.hasDirectoryForPrimaryTerm(3L)); + } + + public void testHasAvailableSpace() { + // Should return true for reasonable space requirements + assertTrue(manager.hasAvailableSpace(1024L)); // 1KB + assertTrue(manager.hasAvailableSpace(1024L * 1024L)); // 1MB + + // Should return false for unreasonably large space requirements + assertFalse(manager.hasAvailableSpace(Long.MAX_VALUE)); + } + + public void testSyncAllDirectories() throws IOException { + // Create some directories + manager.getDirectoryForPrimaryTerm(1L); + manager.getDirectoryForPrimaryTerm(2L); + + // Should not throw exception + try { + manager.syncAllDirectories(); + } catch (IOException e) { + fail("syncAllDirectories should not throw exception: " + e.getMessage()); + } + } + + public void testClose() throws IOException { + // Create some directories + manager.getDirectoryForPrimaryTerm(1L); + manager.getDirectoryForPrimaryTerm(2L); + + assertFalse(manager.isClosed()); + assertEquals(2, manager.getDirectoryCount()); + + manager.close(); + + assertTrue(manager.isClosed()); + assertEquals(0, manager.getDirectoryCount()); + } + + public void testOperationsAfterClose() throws IOException { + manager.close(); + + IllegalStateException exception = expectThrows( + IllegalStateException.class, + () -> manager.getDirectoryForPrimaryTerm(1L) + ); + + assertTrue(exception.getMessage().contains("has been closed")); + } + + public void testConcurrentDirectoryCreation() throws IOException { + long primaryTerm = 1L; + + // Simulate concurrent access by calling getDirectoryForPrimaryTerm multiple times + // This tests the double-check locking pattern + Directory dir1 = manager.getDirectoryForPrimaryTerm(primaryTerm); + Directory dir2 = manager.getDirectoryForPrimaryTerm(primaryTerm); + Directory dir3 = manager.getDirectoryForPrimaryTerm(primaryTerm); + + assertSame(dir1, dir2); + assertSame(dir2, dir3); + assertEquals(1, manager.getDirectoryCount()); + } +} \ No newline at end of file diff --git a/server/src/test/java/org/opensearch/index/store/distributed/PrimaryTermRouterTests.java b/server/src/test/java/org/opensearch/index/store/distributed/PrimaryTermRouterTests.java new file mode 100644 index 0000000000000..06d442b1533d1 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/distributed/PrimaryTermRouterTests.java @@ -0,0 +1,189 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.distributed; + +import org.apache.lucene.store.Directory; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.io.IOException; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; + +public class PrimaryTermRouterTests extends OpenSearchTestCase { + + private IndexShardContext mockShardContext; + private PrimaryTermDirectoryManager mockDirectoryManager; + private Directory mockBaseDirectory; + private Directory mockPrimaryTermDirectory; + private PrimaryTermRouter router; + + @Before + public void setUp() throws Exception { + super.setUp(); + mockShardContext = mock(IndexShardContext.class); + mockDirectoryManager = mock(PrimaryTermDirectoryManager.class); + mockBaseDirectory = mock(Directory.class); + mockPrimaryTermDirectory = mock(Directory.class); + + when(mockDirectoryManager.getBaseDirectory()).thenReturn(mockBaseDirectory); + + router = new PrimaryTermRouter(mockShardContext); + } + + public void testGetDirectoryForExcludedFile() throws IOException { + String[] excludedFiles = { + "segments_1", + "pending_segments_1", + "write.lock", + "test.tmp" + }; + + for (String filename : excludedFiles) { + Directory result = router.getDirectoryForFile(filename, mockDirectoryManager); + assertEquals("File " + filename + " should use base directory", mockBaseDirectory, result); + } + } + + public void testGetDirectoryForRegularFile() throws IOException { + String filename = "_0.si"; + long primaryTerm = 5L; + + when(mockShardContext.isAvailable()).thenReturn(true); + when(mockShardContext.getPrimaryTerm()).thenReturn(primaryTerm); + when(mockDirectoryManager.getDirectoryForPrimaryTerm(primaryTerm)).thenReturn(mockPrimaryTermDirectory); + + Directory result = router.getDirectoryForFile(filename, mockDirectoryManager); + + assertEquals(mockPrimaryTermDirectory, result); + verify(mockDirectoryManager).getDirectoryForPrimaryTerm(primaryTerm); + } + + public void testGetDirectoryForFileWithUnavailableContext() throws IOException { + String filename = "_0.cfs"; + + when(mockShardContext.isAvailable()).thenReturn(false); + when(mockDirectoryManager.getDirectoryForPrimaryTerm(IndexShardContext.DEFAULT_PRIMARY_TERM)) + .thenReturn(mockBaseDirectory); + + Directory result = router.getDirectoryForFile(filename, mockDirectoryManager); + + assertEquals(mockBaseDirectory, result); + verify(mockDirectoryManager).getDirectoryForPrimaryTerm(IndexShardContext.DEFAULT_PRIMARY_TERM); + } + + public void testGetDirectoryForFileWithNullContext() throws IOException { + router = new PrimaryTermRouter(null); + String filename = "_0.cfe"; + + when(mockDirectoryManager.getDirectoryForPrimaryTerm(IndexShardContext.DEFAULT_PRIMARY_TERM)) + .thenReturn(mockBaseDirectory); + + Directory result = router.getDirectoryForFile(filename, mockDirectoryManager); + + assertEquals(mockBaseDirectory, result); + } + + public void testGetDirectoryForFileWithDirectoryManagerException() throws IOException { + String filename = "_1.si"; + long primaryTerm = 3L; + + when(mockShardContext.isAvailable()).thenReturn(true); + when(mockShardContext.getPrimaryTerm()).thenReturn(primaryTerm); + when(mockDirectoryManager.getDirectoryForPrimaryTerm(primaryTerm)) + .thenThrow(new IOException("Directory creation failed")); + + Directory result = router.getDirectoryForFile(filename, mockDirectoryManager); + + // Should fall back to base directory on exception + assertEquals(mockBaseDirectory, result); + } + + public void testIsExcludedFile() { + // Test excluded prefixes + assertTrue(router.isExcludedFile("segments_1")); + assertTrue(router.isExcludedFile("pending_segments_2")); + assertTrue(router.isExcludedFile("write.lock")); + + // Test temporary files + assertTrue(router.isExcludedFile("test.tmp")); + assertTrue(router.isExcludedFile("_0.si.tmp")); + + // Test null and empty + assertTrue(router.isExcludedFile(null)); + assertTrue(router.isExcludedFile("")); + + // Test regular files + assertFalse(router.isExcludedFile("_0.si")); + assertFalse(router.isExcludedFile("_0.cfs")); + assertFalse(router.isExcludedFile("_0.cfe")); + assertFalse(router.isExcludedFile("_1.doc")); + } + + public void testGetCurrentPrimaryTerm() { + long expectedPrimaryTerm = 7L; + + // Test with available context + when(mockShardContext.isAvailable()).thenReturn(true); + when(mockShardContext.getPrimaryTerm()).thenReturn(expectedPrimaryTerm); + + assertEquals(expectedPrimaryTerm, router.getCurrentPrimaryTerm()); + + // Test with unavailable context + when(mockShardContext.isAvailable()).thenReturn(false); + + assertEquals(IndexShardContext.DEFAULT_PRIMARY_TERM, router.getCurrentPrimaryTerm()); + } + + public void testGetCurrentPrimaryTermWithNullContext() { + router = new PrimaryTermRouter(null); + + assertEquals(IndexShardContext.DEFAULT_PRIMARY_TERM, router.getCurrentPrimaryTerm()); + } + + public void testGetDirectoryNameForPrimaryTerm() { + assertEquals("primary_term_0", router.getDirectoryNameForPrimaryTerm(0L)); + assertEquals("primary_term_5", router.getDirectoryNameForPrimaryTerm(5L)); + assertEquals("primary_term_100", router.getDirectoryNameForPrimaryTerm(100L)); + } + + public void testGetDirectoryForFileWithNullFilename() { + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> { + router.getDirectoryForFile(null, mockDirectoryManager); + }); + + assertTrue(exception.getMessage().contains("Filename cannot be null or empty")); + } + + public void testGetDirectoryForFileWithEmptyFilename() { + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> { + router.getDirectoryForFile("", mockDirectoryManager); + }); + + assertTrue(exception.getMessage().contains("Filename cannot be null or empty")); + } + + public void testGetExcludedPrefixes() { + var excludedPrefixes = router.getExcludedPrefixes(); + + assertTrue(excludedPrefixes.contains("segments_")); + assertTrue(excludedPrefixes.contains("pending_segments_")); + assertTrue(excludedPrefixes.contains("write.lock")); + assertEquals(3, excludedPrefixes.size()); + } + + public void testGetShardContext() { + assertEquals(mockShardContext, router.getShardContext()); + + router = new PrimaryTermRouter(null); + assertNull(router.getShardContext()); + } +} \ No newline at end of file