54
54
import org .opensearch .common .settings .Setting ;
55
55
import org .opensearch .common .settings .Setting .Property ;
56
56
import org .opensearch .common .settings .Settings ;
57
+ import org .opensearch .common .unit .TimeValue ;
57
58
58
59
import java .util .HashMap ;
59
60
import java .util .HashSet ;
87
88
public class BalancedShardsAllocator implements ShardsAllocator {
88
89
89
90
private static final Logger logger = LogManager .getLogger (BalancedShardsAllocator .class );
91
+ public static final TimeValue MIN_ALLOCATOR_TIMEOUT = TimeValue .timeValueSeconds (20 );
90
92
91
93
public static final Setting <Float > INDEX_BALANCE_FACTOR_SETTING = Setting .floatSetting (
92
94
"cluster.routing.allocation.balance.index" ,
@@ -169,6 +171,23 @@ public class BalancedShardsAllocator implements ShardsAllocator {
169
171
Property .NodeScope
170
172
);
171
173
174
+ public static final Setting <TimeValue > ALLOCATOR_TIMEOUT_SETTING = Setting .timeSetting (
175
+ "cluster.routing.allocation.balanced_shards_allocator.allocator_timeout" ,
176
+ TimeValue .MINUS_ONE ,
177
+ TimeValue .MINUS_ONE ,
178
+ timeValue -> {
179
+ if (timeValue .compareTo (MIN_ALLOCATOR_TIMEOUT ) < 0 && timeValue .compareTo (TimeValue .MINUS_ONE ) != 0 ) {
180
+ throw new IllegalArgumentException (
181
+ "Setting ["
182
+ + "cluster.routing.allocation.balanced_shards_allocator.allocator_timeout"
183
+ + "] should be more than 20s or -1ms to disable timeout"
184
+ );
185
+ }
186
+ },
187
+ Setting .Property .NodeScope ,
188
+ Setting .Property .Dynamic
189
+ );
190
+
172
191
private volatile boolean movePrimaryFirst ;
173
192
private volatile ShardMovementStrategy shardMovementStrategy ;
174
193
@@ -181,6 +200,8 @@ public class BalancedShardsAllocator implements ShardsAllocator {
181
200
private volatile float threshold ;
182
201
183
202
private volatile boolean ignoreThrottleInRestore ;
203
+ private volatile TimeValue allocatorTimeout ;
204
+ private long startTime ;
184
205
185
206
public BalancedShardsAllocator (Settings settings ) {
186
207
this (settings , new ClusterSettings (settings , ClusterSettings .BUILT_IN_CLUSTER_SETTINGS ));
@@ -197,6 +218,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
197
218
setPreferPrimaryShardBalance (PREFER_PRIMARY_SHARD_BALANCE .get (settings ));
198
219
setPreferPrimaryShardRebalance (PREFER_PRIMARY_SHARD_REBALANCE .get (settings ));
199
220
setShardMovementStrategy (SHARD_MOVEMENT_STRATEGY_SETTING .get (settings ));
221
+ setAllocatorTimeout (ALLOCATOR_TIMEOUT_SETTING .get (settings ));
200
222
clusterSettings .addSettingsUpdateConsumer (PREFER_PRIMARY_SHARD_BALANCE , this ::setPreferPrimaryShardBalance );
201
223
clusterSettings .addSettingsUpdateConsumer (SHARD_MOVE_PRIMARY_FIRST_SETTING , this ::setMovePrimaryFirst );
202
224
clusterSettings .addSettingsUpdateConsumer (SHARD_MOVEMENT_STRATEGY_SETTING , this ::setShardMovementStrategy );
@@ -206,6 +228,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
206
228
clusterSettings .addSettingsUpdateConsumer (PREFER_PRIMARY_SHARD_REBALANCE , this ::setPreferPrimaryShardRebalance );
207
229
clusterSettings .addSettingsUpdateConsumer (THRESHOLD_SETTING , this ::setThreshold );
208
230
clusterSettings .addSettingsUpdateConsumer (IGNORE_THROTTLE_FOR_REMOTE_RESTORE , this ::setIgnoreThrottleInRestore );
231
+ clusterSettings .addSettingsUpdateConsumer (ALLOCATOR_TIMEOUT_SETTING , this ::setAllocatorTimeout );
209
232
}
210
233
211
234
/**
@@ -284,6 +307,20 @@ private void setThreshold(float threshold) {
284
307
this .threshold = threshold ;
285
308
}
286
309
310
+ private void setAllocatorTimeout (TimeValue allocatorTimeout ) {
311
+ this .allocatorTimeout = allocatorTimeout ;
312
+ }
313
+
314
+ protected boolean allocatorTimedOut () {
315
+ if (allocatorTimeout .equals (TimeValue .MINUS_ONE )) {
316
+ if (logger .isTraceEnabled ()) {
317
+ logger .trace ("Allocator timeout is disabled. Will not short circuit allocator tasks" );
318
+ }
319
+ return false ;
320
+ }
321
+ return System .nanoTime () - this .startTime > allocatorTimeout .nanos ();
322
+ }
323
+
287
324
@ Override
288
325
public void allocate (RoutingAllocation allocation ) {
289
326
if (allocation .routingNodes ().size () == 0 ) {
@@ -298,8 +335,10 @@ public void allocate(RoutingAllocation allocation) {
298
335
threshold ,
299
336
preferPrimaryShardBalance ,
300
337
preferPrimaryShardRebalance ,
301
- ignoreThrottleInRestore
338
+ ignoreThrottleInRestore ,
339
+ this ::allocatorTimedOut
302
340
);
341
+ this .startTime = System .nanoTime ();
303
342
localShardsBalancer .allocateUnassigned ();
304
343
localShardsBalancer .moveShards ();
305
344
localShardsBalancer .balance ();
@@ -321,7 +360,8 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f
321
360
threshold ,
322
361
preferPrimaryShardBalance ,
323
362
preferPrimaryShardRebalance ,
324
- ignoreThrottleInRestore
363
+ ignoreThrottleInRestore ,
364
+ () -> false // as we don't need to check if timed out or not while just understanding ShardAllocationDecision
325
365
);
326
366
AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision .NOT_TAKEN ;
327
367
MoveDecision moveDecision = MoveDecision .NOT_TAKEN ;
@@ -585,7 +625,7 @@ public Balancer(
585
625
float threshold ,
586
626
boolean preferPrimaryBalance
587
627
) {
588
- super (logger , allocation , shardMovementStrategy , weight , threshold , preferPrimaryBalance , false , false );
628
+ super (logger , allocation , shardMovementStrategy , weight , threshold , preferPrimaryBalance , false , false , () -> false );
589
629
}
590
630
}
591
631
0 commit comments