Skip to content

Commit 42b7e87

Browse files
Add recovery chunk size setting (#13997)
Signed-off-by: Shubh Sahu <shubhvs@amazon.com> (cherry picked from commit 53ea952) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 97dd4ae commit 42b7e87

File tree

10 files changed

+33
-93
lines changed

10 files changed

+33
-93
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1414
- Allow setting query parameters on requests ([#13776](https://github.com/opensearch-project/OpenSearch/issues/13776))
1515
- Add dynamic action retry timeout setting ([#14022](https://github.com/opensearch-project/OpenSearch/issues/14022))
1616
- [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027))
17+
- Add recovery chunk size setting ([#13997](https://github.com/opensearch-project/OpenSearch/pull/13997))
1718
- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))
1819
- [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374))
1920
- Move Remote Store Migration from DocRep to GA and modify remote migration settings name ([#14100](https://github.com/opensearch-project/OpenSearch/pull/14100))

server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@
104104
import org.opensearch.indices.recovery.RecoveryState.Stage;
105105
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
106106
import org.opensearch.node.NodeClosedException;
107-
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
108107
import org.opensearch.plugins.AnalysisPlugin;
109108
import org.opensearch.plugins.Plugin;
110109
import org.opensearch.plugins.PluginsService;
@@ -156,7 +155,7 @@
156155
import static java.util.stream.Collectors.toList;
157156
import static org.opensearch.action.DocWriteResponse.Result.CREATED;
158157
import static org.opensearch.action.DocWriteResponse.Result.UPDATED;
159-
import static org.opensearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
158+
import static org.opensearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING;
160159
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
161160
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
162161
import static org.hamcrest.Matchers.empty;
@@ -187,7 +186,6 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
187186
return Arrays.asList(
188187
MockTransportService.TestPlugin.class,
189188
MockFSIndexStore.TestPlugin.class,
190-
RecoverySettingsChunkSizePlugin.class,
191189
TestAnalysisPlugin.class,
192190
InternalSettingsPlugin.class,
193191
MockEngineFactoryPlugin.class
@@ -263,7 +261,7 @@ private void slowDownRecovery(ByteSizeValue shardSize) {
263261
// one chunk per sec..
264262
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), chunkSize, ByteSizeUnit.BYTES)
265263
// small chunks
266-
.put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSize, ByteSizeUnit.BYTES))
264+
.put(INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSize, ByteSizeUnit.BYTES))
267265
)
268266
.get()
269267
.isAcknowledged()
@@ -278,7 +276,10 @@ private void restoreRecoverySpeed() {
278276
.setTransientSettings(
279277
Settings.builder()
280278
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "20mb")
281-
.put(CHUNK_SIZE_SETTING.getKey(), RecoverySettings.DEFAULT_CHUNK_SIZE)
279+
.put(
280+
INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(),
281+
RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getDefault(Settings.EMPTY)
282+
)
282283
)
283284
.get()
284285
.isAcknowledged()

server/src/internalClusterTest/java/org/opensearch/recovery/TruncatedRecoveryIT.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.opensearch.index.query.QueryBuilders;
4747
import org.opensearch.indices.recovery.FileChunkRequest;
4848
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
49-
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
5049
import org.opensearch.plugins.Plugin;
5150
import org.opensearch.test.OpenSearchIntegTestCase;
5251
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
@@ -61,7 +60,7 @@
6160
import java.util.concurrent.CountDownLatch;
6261
import java.util.concurrent.atomic.AtomicBoolean;
6362

64-
import static org.opensearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
63+
import static org.opensearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING;
6564
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
6665
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
6766
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -81,7 +80,7 @@ public static Collection<Object[]> parameters() {
8180

8281
@Override
8382
protected Collection<Class<? extends Plugin>> nodePlugins() {
84-
return Arrays.asList(MockTransportService.TestPlugin.class, RecoverySettingsChunkSizePlugin.class);
83+
return Arrays.asList(MockTransportService.TestPlugin.class);
8584
}
8685

8786
/**
@@ -96,7 +95,8 @@ public void testCancelRecoveryAndResume() throws Exception {
9695
.cluster()
9796
.prepareUpdateSettings()
9897
.setTransientSettings(
99-
Settings.builder().put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES))
98+
Settings.builder()
99+
.put(INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES))
100100
)
101101
.get()
102102
.isAcknowledged()

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ public void apply(Settings value, Settings current, Settings previous) {
308308
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
309309
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING,
310310
RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT,
311+
RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING,
311312
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
312313
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING,
313314
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,

server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,14 @@ public class RecoverySettings {
177177
);
178178

179179
// choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1.
180-
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES);
180+
public static final Setting<ByteSizeValue> INDICES_RECOVERY_CHUNK_SIZE_SETTING = Setting.byteSizeSetting(
181+
"indices.recovery.chunk_size",
182+
new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES),
183+
new ByteSizeValue(1, ByteSizeUnit.BYTES),
184+
new ByteSizeValue(100, ByteSizeUnit.MB),
185+
Property.Dynamic,
186+
Property.NodeScope
187+
);
181188

182189
private volatile ByteSizeValue recoveryMaxBytesPerSec;
183190
private volatile ByteSizeValue replicationMaxBytesPerSec;
@@ -193,7 +200,7 @@ public class RecoverySettings {
193200
private volatile TimeValue internalActionRetryTimeout;
194201
private volatile TimeValue internalActionLongTimeout;
195202

196-
private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;
203+
private volatile ByteSizeValue chunkSize;
197204
private volatile TimeValue internalRemoteUploadTimeout;
198205

199206
public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
@@ -221,6 +228,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
221228

222229
logger.debug("using recovery max_bytes_per_sec[{}]", recoveryMaxBytesPerSec);
223230
this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings);
231+
this.chunkSize = INDICES_RECOVERY_CHUNK_SIZE_SETTING.get(settings);
224232

225233
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec);
226234
clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec);
@@ -239,11 +247,11 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
239247
);
240248
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setActivityTimeout);
241249
clusterSettings.addSettingsUpdateConsumer(INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT, this::setInternalRemoteUploadTimeout);
250+
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_CHUNK_SIZE_SETTING, this::setChunkSize);
242251
clusterSettings.addSettingsUpdateConsumer(
243252
INDICES_RECOVERY_INTERNAL_ACTION_RETRY_TIMEOUT_SETTING,
244253
this::setInternalActionRetryTimeout
245254
);
246-
247255
}
248256

249257
/**
@@ -296,10 +304,7 @@ public ByteSizeValue getChunkSize() {
296304
return chunkSize;
297305
}
298306

299-
public void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests
300-
if (chunkSize.bytesAsInt() <= 0) {
301-
throw new IllegalArgumentException("chunkSize must be > 0");
302-
}
307+
public void setChunkSize(ByteSizeValue chunkSize) {
303308
this.chunkSize = chunkSize;
304309
}
305310

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1323,7 +1323,6 @@ protected Node(
13231323
b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
13241324
b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
13251325
{
1326-
processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
13271326
b.bind(PeerRecoverySourceService.class)
13281327
.toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings));
13291328
b.bind(PeerRecoveryTargetService.class)
@@ -1431,10 +1430,6 @@ protected TransportService newTransportService(
14311430
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, tracer);
14321431
}
14331432

1434-
protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
1435-
// Noop in production, overridden by tests
1436-
}
1437-
14381433
/**
14391434
* The settings that are used by this node. Contains original settings as well as additional settings provided by plugins.
14401435
*/

server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,14 @@ public void testInternalLongActionTimeout() {
119119
assertEquals(new TimeValue(duration, timeUnit), recoverySettings.internalActionLongTimeout());
120120
}
121121

122+
public void testChunkSize() {
123+
ByteSizeValue chunkSize = new ByteSizeValue(between(1, 1000), ByteSizeUnit.BYTES);
124+
clusterSettings.applySettings(
125+
Settings.builder().put(RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), chunkSize).build()
126+
);
127+
assertEquals(chunkSize, recoverySettings.getChunkSize());
128+
}
129+
122130
public void testInternalActionRetryTimeout() {
123131
long duration = between(1, 1000);
124132
TimeUnit timeUnit = randomFrom(TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS);

test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1148,7 +1148,7 @@ public final void recoverUnstartedReplica(
11481148
startingSeqNo
11491149
);
11501150
long fileChunkSizeInBytes = randomBoolean()
1151-
? RecoverySettings.DEFAULT_CHUNK_SIZE.getBytes()
1151+
? RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes()
11521152
: randomIntBetween(1, 10 * 1024 * 1024);
11531153
final Settings settings = Settings.builder()
11541154
.put("indices.recovery.max_concurrent_file_chunks", Integer.toString(between(1, 4)))

test/framework/src/main/java/org/opensearch/node/MockNode.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import org.opensearch.env.Environment;
5151
import org.opensearch.http.HttpServerTransport;
5252
import org.opensearch.indices.IndicesService;
53-
import org.opensearch.indices.recovery.RecoverySettings;
5453
import org.opensearch.plugins.Plugin;
5554
import org.opensearch.script.MockScriptService;
5655
import org.opensearch.script.ScriptContext;
@@ -232,13 +231,6 @@ protected TransportService newTransportService(
232231
}
233232
}
234233

235-
@Override
236-
protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
237-
if (false == getPluginsService().filterPlugins(RecoverySettingsChunkSizePlugin.class).isEmpty()) {
238-
clusterSettings.addSettingsUpdateConsumer(RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING, recoverySettings::setChunkSize);
239-
}
240-
}
241-
242234
@Override
243235
protected ClusterInfoService newClusterInfoService(
244236
Settings settings,

test/framework/src/main/java/org/opensearch/node/RecoverySettingsChunkSizePlugin.java

Lines changed: 0 additions & 63 deletions
This file was deleted.

0 commit comments

Comments
 (0)