From 99e296691633d2ef96740adb557a36f447978ef1 Mon Sep 17 00:00:00 2001 From: Prudhvi Godithi Date: Tue, 31 Dec 2024 03:29:40 -0800 Subject: [PATCH] Scale to Zero Signed-off-by: Prudhvi Godithi --- gradle/run.gradle | 9 + .../org/opensearch/action/ActionModule.java | 8 + .../scale/NodePreScaleSyncRequest.java | 48 ++ .../scale/NodePreScaleSyncResponse.java | 46 ++ .../indices/scale/PreScaleSyncAction.java | 21 + .../indices/scale/PreScaleSyncRequest.java | 102 ++++ .../indices/scale/PreScaleSyncResponse.java | 90 ++++ .../indices/scale/ScaleRequestBuilder.java | 28 + .../scale/ShardPreScaleSyncResponse.java | 53 ++ .../scale/TransportPreScaleSyncAction.java | 500 ++++++++++++++++++ .../admin/indices/scale/package-info.java | 10 + .../opensearch/client/IndicesAdminClient.java | 5 + .../cluster/routing/RoutingNodes.java | 2 +- .../cluster/routing/ShardRouting.java | 4 +- .../action/admin/indices/RestScaleAction.java | 38 ++ 15 files changed, 961 insertions(+), 3 deletions(-) create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/scale/NodePreScaleSyncRequest.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/scale/NodePreScaleSyncResponse.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/scale/PreScaleSyncAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/scale/PreScaleSyncRequest.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/scale/PreScaleSyncResponse.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/scale/ScaleRequestBuilder.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/scale/ShardPreScaleSyncResponse.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/scale/TransportPreScaleSyncAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/scale/package-info.java create mode 100644 server/src/main/java/org/opensearch/rest/action/admin/indices/RestScaleAction.java diff --git a/gradle/run.gradle b/gradle/run.gradle index 34651f1d94964..dc620900744e9 100644 --- a/gradle/run.gradle +++ b/gradle/run.gradle @@ -45,6 +45,15 @@ testClusters { plugin('plugins:'.concat(p)) } } + setting 'opensearch.experimental.feature.read.write.split.enabled', 'true' + setting 'path.repo', '["/tmp/my-repo"]' + setting 'node.attr.remote_store', 'true' + setting 'cluster.remote_store.state.enabled', 'true' + setting 'node.attr.remote_store.segment.repository', 'my-repository' + setting 'node.attr.remote_store.translog.repository', 'my-repository' + setting 'node.attr.remote_store.repository.my-repository.type', 'fs' + setting 'node.attr.remote_store.state.repository', 'my-repository' + setting 'node.attr.remote_store.repository.my-repository.settings.location', '/tmp/my-repo' } } diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 1e6eae87af53a..3ddbdecc3f7a1 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -184,6 +184,8 @@ import org.opensearch.action.admin.indices.resolve.ResolveIndexAction; import org.opensearch.action.admin.indices.rollover.RolloverAction; import org.opensearch.action.admin.indices.rollover.TransportRolloverAction; +import org.opensearch.action.admin.indices.scale.PreScaleSyncAction; +import org.opensearch.action.admin.indices.scale.TransportPreScaleSyncAction; import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction; import org.opensearch.action.admin.indices.segments.PitSegmentsAction; import org.opensearch.action.admin.indices.segments.TransportIndicesSegmentsAction; @@ -420,6 +422,7 @@ import org.opensearch.rest.action.admin.indices.RestResizeHandler; import org.opensearch.rest.action.admin.indices.RestResolveIndexAction; import org.opensearch.rest.action.admin.indices.RestRolloverIndexAction; +import org.opensearch.rest.action.admin.indices.RestScaleAction; import org.opensearch.rest.action.admin.indices.RestSimulateIndexTemplateAction; import org.opensearch.rest.action.admin.indices.RestSimulateTemplateAction; import org.opensearch.rest.action.admin.indices.RestSyncedFlushAction; @@ -685,6 +688,9 @@ public void reg actions.register(AutoPutMappingAction.INSTANCE, TransportAutoPutMappingAction.class); actions.register(IndicesAliasesAction.INSTANCE, TransportIndicesAliasesAction.class); actions.register(UpdateSettingsAction.INSTANCE, TransportUpdateSettingsAction.class); + + actions.register(PreScaleSyncAction.INSTANCE, TransportPreScaleSyncAction.class); + actions.register(AnalyzeAction.INSTANCE, TransportAnalyzeAction.class); actions.register(PutIndexTemplateAction.INSTANCE, TransportPutIndexTemplateAction.class); actions.register(GetIndexTemplatesAction.INSTANCE, TransportGetIndexTemplatesAction.class); @@ -907,6 +913,8 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestUpgradeStatusAction()); registerHandler.accept(new RestClearIndicesCacheAction()); + registerHandler.accept(new RestScaleAction()); + registerHandler.accept(new RestIndexAction()); registerHandler.accept(new CreateHandler()); registerHandler.accept(new AutoIdHandler(nodesInCluster)); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/scale/NodePreScaleSyncRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/scale/NodePreScaleSyncRequest.java new file mode 100644 index 0000000000000..8b92682d9f09f --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/scale/NodePreScaleSyncRequest.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.scale; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.transport.TransportRequest; + +import java.io.IOException; +import java.util.List; + +public class NodePreScaleSyncRequest extends TransportRequest { + private final String index; + private final List shardIds; + + public NodePreScaleSyncRequest(String index, List shardIds) { + this.index = index; + this.shardIds = shardIds; + } + + public NodePreScaleSyncRequest(StreamInput in) throws IOException { + super(in); + this.index = in.readString(); + this.shardIds = in.readList(ShardId::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(index); + out.writeList(shardIds); + } + + public String getIndex() { + return index; + } + + public List getShardIds() { + return shardIds; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/scale/NodePreScaleSyncResponse.java b/server/src/main/java/org/opensearch/action/admin/indices/scale/NodePreScaleSyncResponse.java new file mode 100644 index 0000000000000..3d3a3419c61c3 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/scale/NodePreScaleSyncResponse.java @@ -0,0 +1,46 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.scale; + +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.transport.TransportResponse; + +import java.io.IOException; +import java.util.List; + +public class NodePreScaleSyncResponse extends TransportResponse { + private final DiscoveryNode node; + private final List shardResponses; + + public NodePreScaleSyncResponse(DiscoveryNode node, List shardResponses) { + this.node = node; + this.shardResponses = shardResponses; + } + + public NodePreScaleSyncResponse(StreamInput in) throws IOException { + node = new DiscoveryNode(in); + shardResponses = in.readList(ShardPreScaleSyncResponse::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + node.writeTo(out); + out.writeList(shardResponses); + } + + public DiscoveryNode getNode() { + return node; + } + + public List getShardResponses() { + return shardResponses; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/scale/PreScaleSyncAction.java b/server/src/main/java/org/opensearch/action/admin/indices/scale/PreScaleSyncAction.java new file mode 100644 index 0000000000000..2dcf02b2efeaf --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/scale/PreScaleSyncAction.java @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.scale; + +import org.opensearch.action.ActionType; +import org.opensearch.action.support.master.AcknowledgedResponse; + +public class PreScaleSyncAction extends ActionType { + public static final PreScaleSyncAction INSTANCE = new PreScaleSyncAction(); + public static final String NAME = "indices:admin/scale"; + + private PreScaleSyncAction() { + super(NAME, AcknowledgedResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/scale/PreScaleSyncRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/scale/PreScaleSyncRequest.java new file mode 100644 index 0000000000000..ff7d32672e265 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/scale/PreScaleSyncRequest.java @@ -0,0 +1,102 @@ +package org.opensearch.action.admin.indices.scale; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.ValidateActions; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.action.support.master.AcknowledgedRequest; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Objects; + +public class PreScaleSyncRequest extends AcknowledgedRequest { + private String[] indices; + private boolean scaleDown; + private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen(); + + public PreScaleSyncRequest(String index) { + this(new String[]{Objects.requireNonNull(index)}, false); + } + + public PreScaleSyncRequest(String[] indices, boolean scaleDown) { + this.indices = Objects.requireNonNull(indices); + this.scaleDown = scaleDown; + } + + public PreScaleSyncRequest(StreamInput in) throws IOException { + super(in); + indices = in.readStringArray(); + scaleDown = in.readBoolean(); + indicesOptions = IndicesOptions.readIndicesOptions(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(indices); + out.writeBoolean(scaleDown); + indicesOptions.writeIndicesOptions(out); + } + + public String[] indices() { + return indices; + } + + public boolean isScaleDown() { + return scaleDown; + } + + public IndicesOptions indicesOptions() { + return indicesOptions; + } + + public PreScaleSyncRequest indicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (indices == null || indices.length == 0) { + validationException = ValidateActions.addValidationError("index/indices is missing", validationException); + } else { + for (String index : indices) { + if (index == null || index.trim().isEmpty()) { + validationException = ValidateActions.addValidationError("index/indices contains null or empty value", validationException); + break; + } + } + } + return validationException; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PreScaleSyncRequest that = (PreScaleSyncRequest) o; + return scaleDown == that.scaleDown && + Arrays.equals(indices, that.indices) && + Objects.equals(indicesOptions, that.indicesOptions); + } + + @Override + public int hashCode() { + int result = Objects.hash(scaleDown, indicesOptions); + result = 31 * result + Arrays.hashCode(indices); + return result; + } + + /** + * Sets whether this is a scale down operation + * @param scaleDown true if scaling down, false if scaling up + * @return this request + */ + public PreScaleSyncRequest scaleDown(boolean scaleDown) { + this.scaleDown = scaleDown; + return this; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/scale/PreScaleSyncResponse.java b/server/src/main/java/org/opensearch/action/admin/indices/scale/PreScaleSyncResponse.java new file mode 100644 index 0000000000000..b937c606570fa --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/scale/PreScaleSyncResponse.java @@ -0,0 +1,90 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.scale; + +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; + +public class PreScaleSyncResponse extends ActionResponse implements ToXContent { + private final Collection nodeResponses; + private final String failureReason; + private final boolean hasFailures; + + public PreScaleSyncResponse(Collection responses) { + this.nodeResponses = responses; + this.hasFailures = responses.stream() + .anyMatch(r -> r.getShardResponses().stream().anyMatch(s -> s.hasUncommittedOperations() || s.needsSync())); + this.failureReason = buildFailureReason(); + } + + public PreScaleSyncResponse(StreamInput in) throws IOException { + this.nodeResponses = in.readList(NodePreScaleSyncResponse::new); + this.hasFailures = in.readBoolean(); + this.failureReason = in.readOptionalString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeList(new ArrayList<>(nodeResponses)); // Convert Collection to List + out.writeBoolean(hasFailures); + out.writeOptionalString(failureReason); + } + + public boolean hasFailures() { + return hasFailures; + } + + public String getFailureReason() { + return failureReason; + } + + private String buildFailureReason() { + if (!hasFailures) { + return null; + } + StringBuilder reason = new StringBuilder(); + for (NodePreScaleSyncResponse nodeResponse : nodeResponses) { + for (ShardPreScaleSyncResponse shardResponse : nodeResponse.getShardResponses()) { + if (shardResponse.hasUncommittedOperations() || shardResponse.needsSync()) { + reason.append("Shard ") + .append(shardResponse.getShardId()) + .append(" on node ") + .append(nodeResponse.getNode()) + .append(": "); + if (shardResponse.hasUncommittedOperations()) { + reason.append("has uncommitted operations "); + } + if (shardResponse.needsSync()) { + reason.append("needs sync "); + } + reason.append("; "); + } + } + } + return reason.toString(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + builder.field("has_failures", hasFailures); + if (failureReason != null) { + builder.field("failure_reason", failureReason); + } + builder.endObject(); + return builder; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/scale/ScaleRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/indices/scale/ScaleRequestBuilder.java new file mode 100644 index 0000000000000..03c3a7a637489 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/scale/ScaleRequestBuilder.java @@ -0,0 +1,28 @@ +package org.opensearch.action.admin.indices.scale; + +import org.opensearch.action.ActionRequestBuilder; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.OpenSearchClient; +import org.opensearch.common.annotation.PublicApi; + +@PublicApi(since = "1.0.0") +public class ScaleRequestBuilder extends ActionRequestBuilder { + + public ScaleRequestBuilder(OpenSearchClient client, String... indices) { + this(client, false, indices); + } + + public ScaleRequestBuilder(OpenSearchClient client, boolean scaleDown, String... indices) { + super(client, PreScaleSyncAction.INSTANCE, new PreScaleSyncRequest(indices, scaleDown)); + } + + /** + * Sets the scale direction (up/down) + * @param scaleDown true if scaling down, false if scaling up + * @return this builder + */ + public ScaleRequestBuilder setScaleDown(boolean scaleDown) { + request.scaleDown(scaleDown); + return this; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/scale/ShardPreScaleSyncResponse.java b/server/src/main/java/org/opensearch/action/admin/indices/scale/ShardPreScaleSyncResponse.java new file mode 100644 index 0000000000000..0aaa5f8a6c28a --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/scale/ShardPreScaleSyncResponse.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.scale; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.index.shard.ShardId; + +import java.io.IOException; + +public class ShardPreScaleSyncResponse implements Writeable { + private final ShardId shardId; + private final boolean needsSync; + private final int uncommittedOperations; + + public ShardPreScaleSyncResponse(ShardId shardId, boolean needsSync, int uncommittedOperations) { + this.shardId = shardId; + this.needsSync = needsSync; + this.uncommittedOperations = uncommittedOperations; + } + + public ShardPreScaleSyncResponse(StreamInput in) throws IOException { + this.shardId = new ShardId(in); + this.needsSync = in.readBoolean(); + this.uncommittedOperations = in.readVInt(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + shardId.writeTo(out); + out.writeBoolean(needsSync); + out.writeVInt(uncommittedOperations); + } + + public ShardId getShardId() { + return shardId; + } + + public boolean needsSync() { + return needsSync; + } + + public boolean hasUncommittedOperations() { + return uncommittedOperations > 0; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/scale/TransportPreScaleSyncAction.java b/server/src/main/java/org/opensearch/action/admin/indices/scale/TransportPreScaleSyncAction.java new file mode 100644 index 0000000000000..8aa9c1f19e3c0 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/scale/TransportPreScaleSyncAction.java @@ -0,0 +1,500 @@ +package org.opensearch.action.admin.indices.scale; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.Version; +import org.opensearch.action.admin.indices.flush.FlushRequest; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.GroupedActionListener; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.RecoverySource; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.repositories.IndexId; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponseHandler; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +public class TransportPreScaleSyncAction extends TransportClusterManagerNodeAction { + private static final Logger logger = LogManager.getLogger(TransportPreScaleSyncAction.class); + private final AllocationService allocationService; + public static final String NAME = PreScaleSyncAction.NAME + "[s]"; + private final IndicesService indicesService; + + @Inject + public TransportPreScaleSyncAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + AllocationService allocationService, + IndicesService indicesService + ) { + super( + PreScaleSyncAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + PreScaleSyncRequest::new, + indexNameExpressionResolver + ); + this.allocationService = allocationService; + this.indicesService = indicesService; + + // Register handler for shard sync requests + transportService.registerRequestHandler( + NAME, + ThreadPool.Names.SAME, + NodePreScaleSyncRequest::new, + (request, channel, task) -> handleShardSyncRequest(request, channel) + ); + } + + @Override + protected String executor() { + return ThreadPool.Names.MANAGEMENT; + } + + @Override + protected AcknowledgedResponse read(StreamInput in) throws IOException { + return new AcknowledgedResponse(in); + } + + @Override + protected void clusterManagerOperation( + PreScaleSyncRequest request, + ClusterState state, + ActionListener listener + ) { + // First perform settings validation + final String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(state, request.indicesOptions(), request.indices()); + + if (request.isScaleDown()) { + scaleDown(concreteIndices, state, listener); + } else { + scaleUp(concreteIndices, state, listener); + } + } + + private void scaleDown( + final String[] indices, + final ClusterState currentState, + final ActionListener listener + ) { + // 1. First validate prerequisites for scale down + for (String index : indices) { + IndexMetadata indexMetadata = currentState.metadata().index(index); + if (!validateScaleDownPrerequisites(indexMetadata, index, listener)) { + return; + } + } + + // 2. Get primary shard assignments using the existing method + Map primaryShardsNodes = new HashMap<>(); + for (String index : indices) { + IndexMetadata indexMetadata = currentState.metadata().index(index); + if (indexMetadata != null) { + primaryShardsNodes.putAll(getPrimaryShardNodeAssignments(indexMetadata, currentState)); + } + } + + if (primaryShardsNodes.isEmpty()) { + listener.onFailure(new IllegalStateException("No primary shards found for indices")); + return; + } + + // 3. Group shards by node for efficient processing + Map> nodeShardGroups = primaryShardsNodes.entrySet() + .stream() + .collect(Collectors.groupingBy( + Map.Entry::getValue, + Collectors.mapping(Map.Entry::getKey, Collectors.toList()) + )); + + // 4. Create grouped listener for node responses + final GroupedActionListener groupedListener = new GroupedActionListener<>( + ActionListener.wrap( + responses -> handleNodeResponses( + responses, + ActionListener.wrap( + preScaleSyncResponse -> updateRoutingTable(indices, clusterService.state(), listener), + listener::onFailure + ) + ), + listener::onFailure + ), + nodeShardGroups.size() + ); + + // 5. Send sync requests to each node + for (Map.Entry> nodeShards : nodeShardGroups.entrySet()) { + final String nodeId = nodeShards.getKey(); + final List shards = nodeShards.getValue(); + logger.info("Sending sync request to node {} for shards {}", nodeId, shards); + + final DiscoveryNode targetNode = currentState.nodes().get(nodeId); + if (targetNode == null) { + groupedListener.onFailure(new IllegalStateException("Node [" + nodeId + "] not found")); + continue; + } + + // Use existing SHARD_SYNC_ACTION_NAME + transportService.sendRequest( + targetNode, + NAME, + new NodePreScaleSyncRequest(indices[0], shards), // Assuming single index for now + new TransportResponseHandler() { + @Override + public NodePreScaleSyncResponse read(StreamInput in) throws IOException { + return new NodePreScaleSyncResponse(in); + } + + @Override + public void handleResponse(NodePreScaleSyncResponse response) { + groupedListener.onResponse(response); + } + + @Override + public void handleException(TransportException exp) { + groupedListener.onFailure(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + } + ); + } + } + + // Existing handler method from second file + private void handleNodeResponses( + Collection responses, + ActionListener listener + ) { + boolean hasUncommittedOps = false; + boolean needsSync = false; + List failedShards = new ArrayList<>(); + + for (NodePreScaleSyncResponse nodeResponse : responses) { + for (ShardPreScaleSyncResponse shardResponse : nodeResponse.getShardResponses()) { + if (shardResponse.hasUncommittedOperations()) { + hasUncommittedOps = true; + failedShards.add(shardResponse.getShardId().toString()); + } + if (shardResponse.needsSync()) { + needsSync = true; + failedShards.add(shardResponse.getShardId().toString()); + } + } + } + + if (hasUncommittedOps || needsSync) { + listener.onFailure(new IllegalStateException( + "Pre-scale sync failed for shards: " + String.join(", ", failedShards) + + (hasUncommittedOps ? " - uncommitted operations remain" : "") + + (needsSync ? " - sync needed" : "") + )); + return; + } + + // If all syncs successful, create PreScaleSyncResponse instead of updating routing table + listener.onResponse(new PreScaleSyncResponse(responses)); + } + + // Handler method from second file for shard-level sync + private void handleShardSyncRequest(NodePreScaleSyncRequest request, TransportChannel channel) throws Exception { + logger.info("Handling shard sync request"); + final ClusterState state = clusterService.state(); + final IndexMetadata indexMetadata = state.metadata().index(request.getIndex()); + if (indexMetadata == null) { + throw new IllegalStateException("Index " + request.getIndex() + " not found"); + } + + IndexService indexService = indicesService.indexService(indexMetadata.getIndex()); + if (indexService == null) { + throw new IllegalStateException("IndexService not found for index " + request.getIndex()); + } + + List shardResponses = new ArrayList<>(); + for (ShardId shardId : request.getShardIds()) { + IndexShard shard = indexService.getShardOrNull(shardId.id()); + if (shard == null) { + continue; + } + + // Force flush and sync + if (shard.translogStats().getUncommittedOperations() > 0) { + logger.info( + "Translog has {} uncommitted operations before closing shard [{}]", + shard.translogStats().getUncommittedOperations(), + shard.shardId() + ); + shard.sync(); + } + + // Force flush + logger.info("Doing final flush before closing shard"); + shard.flush(new FlushRequest().force(true).waitIfOngoing(true)); + + // Force sync if needed + if (shard.translogStats().getUncommittedOperations() > 0) { + logger.info( + "Translog has {} uncommitted operations before closing shard [{}]", + shard.translogStats().getUncommittedOperations(), + shard.shardId() + ); + shard.sync(); + } + + shard.waitForRemoteStoreSync(); + + shardResponses.add( + new ShardPreScaleSyncResponse(shardId, shard.isSyncNeeded(), shard.translogStats().getUncommittedOperations()) + ); + } + + channel.sendResponse(new NodePreScaleSyncResponse(clusterService.localNode(), shardResponses)); + } + + private void scaleUp( + final String[] indices, + final ClusterState currentState, + final ActionListener listener + ) { + // 1. Update routing table + clusterService.submitStateUpdateTask( + "scale-up-index", + new ClusterStateUpdateTask() { + public ClusterState execute(ClusterState currentState) throws Exception { + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable()); + + // For each index, modify its routing table + for (String index : indices) { + IndexRoutingTable indexRoutingTable = currentState.routingTable().index(index); + if (indexRoutingTable == null) continue; + + // Build new routing table + IndexRoutingTable.Builder indexBuilder = new IndexRoutingTable.Builder(indexRoutingTable.getIndex()); + + for (IndexShardRoutingTable shardTable : indexRoutingTable) { + IndexShardRoutingTable.Builder shardBuilder = new IndexShardRoutingTable.Builder(shardTable.shardId()); + + // Keep existing search replicas + for (ShardRouting shardRouting : shardTable) { + if (shardRouting.isSearchOnly()) { + shardBuilder.addShard(shardRouting); + } + } + + // Create recovery source for primary + RecoverySource.RemoteStoreRecoverySource remoteStoreRecoverySource = new RecoverySource.RemoteStoreRecoverySource( + UUID.randomUUID().toString(), + Version.CURRENT, + new IndexId( + shardTable.shardId().getIndex().getName(), + shardTable.shardId().getIndex().getUUID() + ) + ); + + // Add unassigned primary + ShardRouting primaryShard = ShardRouting.newUnassigned( + shardTable.shardId(), + true, + remoteStoreRecoverySource, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "Restoring primary shard") + ); + shardBuilder.addShard(primaryShard); + + // Add unassigned replica + ShardRouting replicaShard = ShardRouting.newUnassigned( + shardTable.shardId(), + false, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "Restoring replica shard") + ); + shardBuilder.addShard(replicaShard); + + indexBuilder.addIndexShard(shardBuilder.build()); + } + + routingTableBuilder.add(indexBuilder.build()); + } + + ClusterState tempState = ClusterState.builder(currentState) + .routingTable(routingTableBuilder.build()) + .build(); + + // Perform reroute to allocate restored shards + return ClusterState.builder(tempState) + .routingTable(allocationService.reroute(tempState, "restore indexing shards").routingTable()) + .build(); + } + + public void onFailure(String source, Exception e) { + logger.error("Failed to execute cluster state update for scale up", e); + listener.onFailure(e); + } + + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(new AcknowledgedResponse(true)); + } + } + ); + } + + @Override + protected ClusterBlockException checkBlock(PreScaleSyncRequest request, ClusterState state) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, + indexNameExpressionResolver.concreteIndexNames(state, request.indicesOptions(), request.indices())); + } + + private boolean validateScaleDownPrerequisites( + IndexMetadata indexMetadata, + String index, + ActionListener listener + ) { + // Validate search replicas exist + if (indexMetadata.getNumberOfSearchOnlyReplicas() == 0) { + listener.onFailure(new IllegalArgumentException( + "Cannot scale to zero without search replicas for index: " + index + )); + return false; + } + + // Validate remote store is enabled + if (!indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false)) { + listener.onFailure(new IllegalArgumentException( + "To scale to zero, " + IndexMetadata.SETTING_REMOTE_STORE_ENABLED + + " must be enabled for index: " + index + )); + return false; + } + + // Validate segment replication + if (!ReplicationType.SEGMENT.toString().equals( + indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE)) + ) { + listener.onFailure(new IllegalArgumentException( + "To scale to zero, segment replication must be enabled for index: " + index + )); + return false; + } + + // Validate write block + if (!indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_BLOCKS_WRITE, false)) { + listener.onFailure(new IllegalArgumentException( + "To scale to zero, " + IndexMetadata.SETTING_BLOCKS_WRITE + + " must be enabled for index: " + index + )); + return false; + } + + return true; + } + + private void updateRoutingTable( + String[] indices, + ClusterState currentState, + ActionListener listener + ) { + clusterService.submitStateUpdateTask( + "scale-down-index", + new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable()); + + for (String index : indices) { + IndexRoutingTable indexRoutingTable = currentState.routingTable().index(index); + if (indexRoutingTable == null) continue; + + IndexRoutingTable.Builder indexBuilder = new IndexRoutingTable.Builder( + indexRoutingTable.getIndex() + ); + + // Keep only search replicas in the routing table + for (IndexShardRoutingTable shardTable : indexRoutingTable) { + IndexShardRoutingTable.Builder shardBuilder = new IndexShardRoutingTable.Builder( + shardTable.shardId() + ); + + for (ShardRouting shardRouting : shardTable) { + if (shardRouting.isSearchOnly()) { + shardBuilder.addShard(shardRouting); + } + } + + indexBuilder.addIndexShard(shardBuilder.build()); + } + + routingTableBuilder.add(indexBuilder.build()); + } + + return ClusterState.builder(currentState) + .routingTable(routingTableBuilder.build()) + .build(); + } + + @Override + public void onFailure(String source, Exception e) { + logger.error("Failed to execute cluster state update for scale down", e); + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(new AcknowledgedResponse(true)); + } + } + ); + } + + private Map getPrimaryShardNodeAssignments(IndexMetadata indexMetadata, ClusterState state) { + Map assignments = new HashMap<>(); + for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { + ShardId shardId = new ShardId(indexMetadata.getIndex(), i); + ShardRouting primaryShard = state.routingTable().index(indexMetadata.getIndex().getName()).shard(i).primaryShard(); + + if (primaryShard != null && primaryShard.assignedToNode()) { + assignments.put(shardId, primaryShard.currentNodeId()); + } + } + return assignments; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/scale/package-info.java b/server/src/main/java/org/opensearch/action/admin/indices/scale/package-info.java new file mode 100644 index 0000000000000..a01296c3e6827 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/scale/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Index Rollover transport handlers. */ +package org.opensearch.action.admin.indices.scale; diff --git a/server/src/main/java/org/opensearch/client/IndicesAdminClient.java b/server/src/main/java/org/opensearch/client/IndicesAdminClient.java index 588584cd8a280..c86235d4155b8 100644 --- a/server/src/main/java/org/opensearch/client/IndicesAdminClient.java +++ b/server/src/main/java/org/opensearch/client/IndicesAdminClient.java @@ -92,6 +92,7 @@ import org.opensearch.action.admin.indices.rollover.RolloverRequest; import org.opensearch.action.admin.indices.rollover.RolloverRequestBuilder; import org.opensearch.action.admin.indices.rollover.RolloverResponse; +import org.opensearch.action.admin.indices.scale.ScaleRequestBuilder; import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequestBuilder; @@ -865,4 +866,8 @@ public interface IndicesAdminClient extends OpenSearchClient { /** Update a view */ ActionFuture updateView(CreateViewAction.Request request); + + default ScaleRequestBuilder prepareScale(String... indices) { + return new ScaleRequestBuilder(this, indices); + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index 76111f623e0a5..ea6ef565465d8 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -127,7 +127,7 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) { // also fill replicaSet information for (final IndexRoutingTable indexRoutingTable : routingTable.indicesRouting().values()) { for (IndexShardRoutingTable indexShard : indexRoutingTable) { - assert indexShard.primary != null; + // assert indexShard.primary != null; for (ShardRouting shard : indexShard) { // to get all the shards belonging to an index, including the replicas, // we define a replica set and keep track of it. A replica set is identified diff --git a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java index ada35caa1e61e..92845404e8825 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java @@ -115,8 +115,8 @@ protected ShardRouting( assert !(state == ShardRoutingState.UNASSIGNED && unassignedInfo == null) : "unassigned shard must be created with meta"; assert (state == ShardRoutingState.UNASSIGNED || state == ShardRoutingState.INITIALIZING) == (recoverySource != null) : "recovery source only available on unassigned or initializing shard but was " + state; - assert recoverySource == null || recoverySource == PeerRecoverySource.INSTANCE || primary - : "replica shards always recover from primary"; + /*assert recoverySource == null || recoverySource == PeerRecoverySource.INSTANCE || primary + : "replica shards always recover from primary";*/ assert (currentNodeId == null) == (state == ShardRoutingState.UNASSIGNED) : "unassigned shard must not be assigned to a node " + this; } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/indices/RestScaleAction.java b/server/src/main/java/org/opensearch/rest/action/admin/indices/RestScaleAction.java new file mode 100644 index 0000000000000..6efd6cea24eb7 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/admin/indices/RestScaleAction.java @@ -0,0 +1,38 @@ +package org.opensearch.rest.action.admin.indices; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.common.Strings; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; + +import java.util.List; + +import static org.opensearch.rest.RestRequest.Method.POST; + +public class RestScaleAction extends BaseRestHandler { + + @Override + public String getName() { + return "scale_index_action"; + } + + @Override + public List routes() { + return List.of( + new Route(POST, "/{index}/_scale/{direction}") + ); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + String[] indices = Strings.splitStringByCommaToArray(request.param("index")); + boolean scaleDown = request.param("direction").equals("down"); + + return channel -> client.admin() + .indices() + .prepareScale(indices) + .setScaleDown(scaleDown) + .execute(new RestToXContentListener<>(channel)); + } +}