Skip to content

Commit

Permalink
Scale to Zero
Browse files Browse the repository at this point in the history
Signed-off-by: Prudhvi Godithi <pgodithi@amazon.com>
  • Loading branch information
prudhvigodithi committed Dec 31, 2024
1 parent 6b41e4f commit 99e2966
Show file tree
Hide file tree
Showing 15 changed files with 961 additions and 3 deletions.
9 changes: 9 additions & 0 deletions gradle/run.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ testClusters {
plugin('plugins:'.concat(p))
}
}
setting 'opensearch.experimental.feature.read.write.split.enabled', 'true'
setting 'path.repo', '["/tmp/my-repo"]'
setting 'node.attr.remote_store', 'true'
setting 'cluster.remote_store.state.enabled', 'true'
setting 'node.attr.remote_store.segment.repository', 'my-repository'
setting 'node.attr.remote_store.translog.repository', 'my-repository'
setting 'node.attr.remote_store.repository.my-repository.type', 'fs'
setting 'node.attr.remote_store.state.repository', 'my-repository'
setting 'node.attr.remote_store.repository.my-repository.settings.location', '/tmp/my-repo'
}
}

Expand Down
8 changes: 8 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@
import org.opensearch.action.admin.indices.resolve.ResolveIndexAction;
import org.opensearch.action.admin.indices.rollover.RolloverAction;
import org.opensearch.action.admin.indices.rollover.TransportRolloverAction;
import org.opensearch.action.admin.indices.scale.PreScaleSyncAction;
import org.opensearch.action.admin.indices.scale.TransportPreScaleSyncAction;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction;
import org.opensearch.action.admin.indices.segments.PitSegmentsAction;
import org.opensearch.action.admin.indices.segments.TransportIndicesSegmentsAction;
Expand Down Expand Up @@ -420,6 +422,7 @@
import org.opensearch.rest.action.admin.indices.RestResizeHandler;
import org.opensearch.rest.action.admin.indices.RestResolveIndexAction;
import org.opensearch.rest.action.admin.indices.RestRolloverIndexAction;
import org.opensearch.rest.action.admin.indices.RestScaleAction;
import org.opensearch.rest.action.admin.indices.RestSimulateIndexTemplateAction;
import org.opensearch.rest.action.admin.indices.RestSimulateTemplateAction;
import org.opensearch.rest.action.admin.indices.RestSyncedFlushAction;
Expand Down Expand Up @@ -685,6 +688,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(AutoPutMappingAction.INSTANCE, TransportAutoPutMappingAction.class);
actions.register(IndicesAliasesAction.INSTANCE, TransportIndicesAliasesAction.class);
actions.register(UpdateSettingsAction.INSTANCE, TransportUpdateSettingsAction.class);

actions.register(PreScaleSyncAction.INSTANCE, TransportPreScaleSyncAction.class);

actions.register(AnalyzeAction.INSTANCE, TransportAnalyzeAction.class);
actions.register(PutIndexTemplateAction.INSTANCE, TransportPutIndexTemplateAction.class);
actions.register(GetIndexTemplatesAction.INSTANCE, TransportGetIndexTemplatesAction.class);
Expand Down Expand Up @@ -907,6 +913,8 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestUpgradeStatusAction());
registerHandler.accept(new RestClearIndicesCacheAction());

registerHandler.accept(new RestScaleAction());

registerHandler.accept(new RestIndexAction());
registerHandler.accept(new CreateHandler());
registerHandler.accept(new AutoIdHandler(nodesInCluster));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.scale;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.transport.TransportRequest;

import java.io.IOException;
import java.util.List;

public class NodePreScaleSyncRequest extends TransportRequest {
private final String index;
private final List<ShardId> shardIds;

public NodePreScaleSyncRequest(String index, List<ShardId> shardIds) {
this.index = index;
this.shardIds = shardIds;
}

public NodePreScaleSyncRequest(StreamInput in) throws IOException {
super(in);
this.index = in.readString();
this.shardIds = in.readList(ShardId::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
out.writeList(shardIds);
}

public String getIndex() {
return index;
}

public List<ShardId> getShardIds() {
return shardIds;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.scale;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.transport.TransportResponse;

import java.io.IOException;
import java.util.List;

public class NodePreScaleSyncResponse extends TransportResponse {
private final DiscoveryNode node;
private final List<ShardPreScaleSyncResponse> shardResponses;

public NodePreScaleSyncResponse(DiscoveryNode node, List<ShardPreScaleSyncResponse> shardResponses) {
this.node = node;
this.shardResponses = shardResponses;
}

public NodePreScaleSyncResponse(StreamInput in) throws IOException {
node = new DiscoveryNode(in);
shardResponses = in.readList(ShardPreScaleSyncResponse::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
out.writeList(shardResponses);
}

public DiscoveryNode getNode() {
return node;
}

public List<ShardPreScaleSyncResponse> getShardResponses() {
return shardResponses;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.scale;

import org.opensearch.action.ActionType;
import org.opensearch.action.support.master.AcknowledgedResponse;

public class PreScaleSyncAction extends ActionType<AcknowledgedResponse> {
public static final PreScaleSyncAction INSTANCE = new PreScaleSyncAction();
public static final String NAME = "indices:admin/scale";

private PreScaleSyncAction() {
super(NAME, AcknowledgedResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package org.opensearch.action.admin.indices.scale;

import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.ValidateActions;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.master.AcknowledgedRequest;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;

public class PreScaleSyncRequest extends AcknowledgedRequest<PreScaleSyncRequest> {
private String[] indices;
private boolean scaleDown;
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();

public PreScaleSyncRequest(String index) {
this(new String[]{Objects.requireNonNull(index)}, false);
}

public PreScaleSyncRequest(String[] indices, boolean scaleDown) {
this.indices = Objects.requireNonNull(indices);
this.scaleDown = scaleDown;
}

public PreScaleSyncRequest(StreamInput in) throws IOException {
super(in);
indices = in.readStringArray();
scaleDown = in.readBoolean();
indicesOptions = IndicesOptions.readIndicesOptions(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
out.writeBoolean(scaleDown);
indicesOptions.writeIndicesOptions(out);
}

public String[] indices() {
return indices;
}

public boolean isScaleDown() {
return scaleDown;
}

public IndicesOptions indicesOptions() {
return indicesOptions;
}

public PreScaleSyncRequest indicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
return this;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (indices == null || indices.length == 0) {
validationException = ValidateActions.addValidationError("index/indices is missing", validationException);
} else {
for (String index : indices) {
if (index == null || index.trim().isEmpty()) {
validationException = ValidateActions.addValidationError("index/indices contains null or empty value", validationException);
break;
}
}
}
return validationException;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PreScaleSyncRequest that = (PreScaleSyncRequest) o;
return scaleDown == that.scaleDown &&
Arrays.equals(indices, that.indices) &&
Objects.equals(indicesOptions, that.indicesOptions);
}

@Override
public int hashCode() {
int result = Objects.hash(scaleDown, indicesOptions);
result = 31 * result + Arrays.hashCode(indices);
return result;
}

/**
* Sets whether this is a scale down operation
* @param scaleDown true if scaling down, false if scaling up
* @return this request
*/
public PreScaleSyncRequest scaleDown(boolean scaleDown) {
this.scaleDown = scaleDown;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.scale;

import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;

public class PreScaleSyncResponse extends ActionResponse implements ToXContent {
private final Collection<NodePreScaleSyncResponse> nodeResponses;
private final String failureReason;
private final boolean hasFailures;

public PreScaleSyncResponse(Collection<NodePreScaleSyncResponse> responses) {
this.nodeResponses = responses;
this.hasFailures = responses.stream()
.anyMatch(r -> r.getShardResponses().stream().anyMatch(s -> s.hasUncommittedOperations() || s.needsSync()));
this.failureReason = buildFailureReason();
}

public PreScaleSyncResponse(StreamInput in) throws IOException {
this.nodeResponses = in.readList(NodePreScaleSyncResponse::new);
this.hasFailures = in.readBoolean();
this.failureReason = in.readOptionalString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(new ArrayList<>(nodeResponses)); // Convert Collection to List
out.writeBoolean(hasFailures);
out.writeOptionalString(failureReason);
}

public boolean hasFailures() {
return hasFailures;
}

public String getFailureReason() {
return failureReason;
}

private String buildFailureReason() {
if (!hasFailures) {
return null;
}
StringBuilder reason = new StringBuilder();
for (NodePreScaleSyncResponse nodeResponse : nodeResponses) {
for (ShardPreScaleSyncResponse shardResponse : nodeResponse.getShardResponses()) {
if (shardResponse.hasUncommittedOperations() || shardResponse.needsSync()) {
reason.append("Shard ")
.append(shardResponse.getShardId())
.append(" on node ")
.append(nodeResponse.getNode())
.append(": ");
if (shardResponse.hasUncommittedOperations()) {
reason.append("has uncommitted operations ");
}
if (shardResponse.needsSync()) {
reason.append("needs sync ");
}
reason.append("; ");
}
}
}
return reason.toString();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field("has_failures", hasFailures);
if (failureReason != null) {
builder.field("failure_reason", failureReason);
}
builder.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.opensearch.action.admin.indices.scale;

import org.opensearch.action.ActionRequestBuilder;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.OpenSearchClient;
import org.opensearch.common.annotation.PublicApi;

@PublicApi(since = "1.0.0")
public class ScaleRequestBuilder extends ActionRequestBuilder<PreScaleSyncRequest, AcknowledgedResponse> {

public ScaleRequestBuilder(OpenSearchClient client, String... indices) {
this(client, false, indices);
}

public ScaleRequestBuilder(OpenSearchClient client, boolean scaleDown, String... indices) {
super(client, PreScaleSyncAction.INSTANCE, new PreScaleSyncRequest(indices, scaleDown));
}

/**
* Sets the scale direction (up/down)
* @param scaleDown true if scaling down, false if scaling up
* @return this builder
*/
public ScaleRequestBuilder setScaleDown(boolean scaleDown) {
request.scaleDown(scaleDown);
return this;
}
}
Loading

0 comments on commit 99e2966

Please sign in to comment.