88
88
import static org .opensearch .monitor .StatusInfo .Status .UNHEALTHY ;
89
89
import static org .opensearch .node .Node .NODE_NAME_SETTING ;
90
90
import static org .opensearch .transport .TransportService .HANDSHAKE_ACTION_NAME ;
91
+ import static org .opensearch .transport .TransportService .NOOP_TRANSPORT_INTERCEPTOR ;
91
92
import static org .hamcrest .Matchers .contains ;
92
93
import static org .hamcrest .Matchers .containsInAnyOrder ;
93
94
import static org .hamcrest .Matchers .empty ;
@@ -126,7 +127,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
126
127
final TransportService transportService = mockTransport .createTransportService (
127
128
settings ,
128
129
deterministicTaskQueue .getThreadPool (),
129
- TransportService . NOOP_TRANSPORT_INTERCEPTOR ,
130
+ NOOP_TRANSPORT_INTERCEPTOR ,
130
131
boundTransportAddress -> localNode ,
131
132
null ,
132
133
emptySet (),
@@ -273,6 +274,10 @@ public void testFailsNodeThatIsDisconnected() {
273
274
metricsRegistry
274
275
);
275
276
assertEquals (Integer .valueOf (2 ), metricsRegistry .getCounterStore ().get ("followers.checker.failure.count" ).getCounterValue ());
277
+ assertEquals (
278
+ Integer .valueOf (2 ),
279
+ metricsRegistry .getCounterStore ().get ("followers.checker.attempt.failure.count" ).getCounterValue ()
280
+ );
276
281
}
277
282
278
283
public void testFailsNodeThatDisconnects () {
@@ -307,7 +312,7 @@ public String toString() {
307
312
final TransportService transportService = mockTransport .createTransportService (
308
313
settings ,
309
314
deterministicTaskQueue .getThreadPool (),
310
- TransportService . NOOP_TRANSPORT_INTERCEPTOR ,
315
+ NOOP_TRANSPORT_INTERCEPTOR ,
311
316
boundTransportAddress -> localNode ,
312
317
null ,
313
318
emptySet (),
@@ -341,6 +346,10 @@ public String toString() {
341
346
assertTrue (nodeFailed .get ());
342
347
assertThat (followersChecker .getFaultyNodes (), contains (otherNode ));
343
348
assertEquals (Integer .valueOf (1 ), metricsRegistry .getCounterStore ().get ("followers.checker.failure.count" ).getCounterValue ());
349
+ assertEquals (
350
+ Integer .valueOf (1 ),
351
+ metricsRegistry .getCounterStore ().get ("followers.checker.attempt.failure.count" ).getCounterValue ()
352
+ );
344
353
}
345
354
346
355
public void testFailsNodeThatIsUnhealthy () {
@@ -403,7 +412,7 @@ public String toString() {
403
412
final TransportService transportService = mockTransport .createTransportService (
404
413
settings ,
405
414
deterministicTaskQueue .getThreadPool (),
406
- TransportService . NOOP_TRANSPORT_INTERCEPTOR ,
415
+ NOOP_TRANSPORT_INTERCEPTOR ,
407
416
boundTransportAddress -> localNode ,
408
417
null ,
409
418
emptySet (),
@@ -510,7 +519,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
510
519
final TransportService transportService = mockTransport .createTransportService (
511
520
settings ,
512
521
deterministicTaskQueue .getThreadPool (),
513
- TransportService . NOOP_TRANSPORT_INTERCEPTOR ,
522
+ NOOP_TRANSPORT_INTERCEPTOR ,
514
523
boundTransportAddress -> follower ,
515
524
null ,
516
525
emptySet (),
@@ -587,7 +596,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
587
596
final TransportService transportService = mockTransport .createTransportService (
588
597
settings ,
589
598
deterministicTaskQueue .getThreadPool (),
590
- TransportService . NOOP_TRANSPORT_INTERCEPTOR ,
599
+ NOOP_TRANSPORT_INTERCEPTOR ,
591
600
boundTransportAddress -> follower ,
592
601
null ,
593
602
emptySet (),
@@ -748,7 +757,7 @@ public void testPreferClusterManagerNodes() {
748
757
TransportService transportService = capturingTransport .createTransportService (
749
758
Settings .EMPTY ,
750
759
deterministicTaskQueue .getThreadPool (),
751
- TransportService . NOOP_TRANSPORT_INTERCEPTOR ,
760
+ NOOP_TRANSPORT_INTERCEPTOR ,
752
761
x -> nodes .get (0 ),
753
762
null ,
754
763
emptySet (),
@@ -770,6 +779,83 @@ public void testPreferClusterManagerNodes() {
770
779
assertEquals (sortedFollowerTargets , followerTargets );
771
780
}
772
781
782
+ public void testFollowerCheckerAttemptFailureCountMetric () {
783
+ final DiscoveryNode localNode = new DiscoveryNode ("local-node" , buildNewFakeTransportAddress (), Version .CURRENT );
784
+ final DiscoveryNode follower = new DiscoveryNode ("follower" , buildNewFakeTransportAddress (), Version .CURRENT );
785
+
786
+ final Settings settings = Settings .builder ().put (NODE_NAME_SETTING .getKey (), localNode .getId ()).build ();
787
+ final ClusterSettings clusterSettings = new ClusterSettings (settings , ClusterSettings .BUILT_IN_CLUSTER_SETTINGS );
788
+ final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue (settings , random ());
789
+ AtomicBoolean followerCheckerAttemptFailedOnce = new AtomicBoolean ();
790
+ final MockTransport mockTransport = new MockTransport () {
791
+ @ Override
792
+ protected void onSendRequest (long requestId , String action , TransportRequest request , DiscoveryNode node ) {
793
+ if (action .equals (HANDSHAKE_ACTION_NAME )) {
794
+ handleResponse (requestId , new TransportService .HandshakeResponse (node , ClusterName .DEFAULT , Version .CURRENT ));
795
+ return ;
796
+ }
797
+ assertThat (action , equalTo (FOLLOWER_CHECK_ACTION_NAME ));
798
+ assertEquals (node , follower );
799
+
800
+ deterministicTaskQueue .scheduleNow (new Runnable () {
801
+ @ Override
802
+ public void run () {
803
+ if (followerCheckerAttemptFailedOnce .compareAndSet (false , true )) {
804
+ handleRemoteError (requestId , new OpenSearchException ("simulated error" ));
805
+ } else {
806
+ handleResponse (requestId , Empty .INSTANCE );
807
+ }
808
+ }
809
+
810
+ @ Override
811
+ public String toString () {
812
+ return "response to request " + requestId ;
813
+ }
814
+ });
815
+ }
816
+ };
817
+
818
+ final TransportService transportService = mockTransport .createTransportService (
819
+ settings ,
820
+ deterministicTaskQueue .getThreadPool (),
821
+ NOOP_TRANSPORT_INTERCEPTOR ,
822
+ boundTransportAddress -> localNode ,
823
+ null ,
824
+ emptySet (),
825
+ NoopTracer .INSTANCE
826
+ );
827
+ transportService .start ();
828
+ transportService .acceptIncomingRequests ();
829
+
830
+ final AtomicBoolean followerFailed = new AtomicBoolean ();
831
+ TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry ();
832
+ final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics (metricsRegistry );
833
+ final FollowersChecker followersChecker = new FollowersChecker (
834
+ settings ,
835
+ clusterSettings ,
836
+ transportService ,
837
+ fcr -> { assert false : fcr ; },
838
+ (node , reason ) -> {
839
+ assertThat (reason , equalTo ("disconnected" ));
840
+ assertTrue (followerFailed .compareAndSet (false , true ));
841
+ },
842
+ () -> new StatusInfo (StatusInfo .Status .HEALTHY , "healthy-info" ),
843
+ clusterManagerMetrics
844
+ );
845
+
846
+ followersChecker .setCurrentNodes (DiscoveryNodes .builder ().add (localNode ).add (follower ).localNodeId (localNode .getId ()).build ());
847
+
848
+ deterministicTaskQueue .advanceTime ();
849
+ deterministicTaskQueue .runAllRunnableTasks ();
850
+
851
+ assertFalse (followerFailed .get ());
852
+ assertEquals (Integer .valueOf (0 ), metricsRegistry .getCounterStore ().get ("followers.checker.failure.count" ).getCounterValue ());
853
+ assertEquals (
854
+ Integer .valueOf (1 ),
855
+ metricsRegistry .getCounterStore ().get ("followers.checker.attempt.failure.count" ).getCounterValue ()
856
+ );
857
+ }
858
+
773
859
private static List <DiscoveryNode > randomNodes (final int numNodes ) {
774
860
List <DiscoveryNode > nodesList = new ArrayList <>();
775
861
for (int i = 0 ; i < numNodes ; i ++) {
0 commit comments