@@ -523,4 +523,269 @@ public void multipleConsumersCanCallNext() throws Exception {
523523 executor .shutdown ();
524524 }
525525 }
526+
527+ @ Test
528+ public void nonRecoverableHttpErrorStopsPolling () throws Exception {
529+ FDv2Requestor requestor = mockRequestor ();
530+ SelectorSource selectorSource = mockSelectorSource ();
531+ ScheduledExecutorService executor = Executors .newScheduledThreadPool (1 );
532+
533+ AtomicInteger callCount = new AtomicInteger (0 );
534+ when (requestor .Poll (any (Selector .class ))).thenAnswer (invocation -> {
535+ int count = callCount .incrementAndGet ();
536+ // First call returns 401 (non-recoverable)
537+ if (count == 1 ) {
538+ return failedFuture (new com .launchdarkly .sdk .internal .http .HttpErrors .HttpErrorException (401 ));
539+ } else {
540+ // Subsequent calls should not happen, but return success if they do
541+ return CompletableFuture .completedFuture (makeSuccessResponse ());
542+ }
543+ });
544+
545+ try {
546+ PollingSynchronizerImpl synchronizer = new PollingSynchronizerImpl (
547+ requestor ,
548+ testLogger ,
549+ selectorSource ,
550+ executor ,
551+ Duration .ofMillis (50 )
552+ );
553+
554+ // First result should be terminal error
555+ FDv2SourceResult result1 = synchronizer .next ().get (1 , TimeUnit .SECONDS );
556+ assertNotNull (result1 );
557+ assertEquals (FDv2SourceResult .ResultType .STATUS , result1 .getResultType ());
558+ assertEquals (FDv2SourceResult .State .TERMINAL_ERROR , result1 .getStatus ().getState ());
559+ assertEquals (DataSourceStatusProvider .ErrorKind .ERROR_RESPONSE , result1 .getStatus ().getErrorInfo ().getKind ());
560+
561+ // Wait to see if polling continues
562+ Thread .sleep (200 );
563+
564+ // Should have only called requestor once - polling stopped after terminal error
565+ assertEquals ("Polling should have stopped after terminal error" , 1 , callCount .get ());
566+
567+ // Don't call next() again after terminal error - that's incorrect usage
568+ synchronizer .close ();
569+ } finally {
570+ executor .shutdown ();
571+ }
572+ }
573+
574+ @ Test
575+ public void goodbyeEventStopsPolling () throws Exception {
576+ FDv2Requestor requestor = mockRequestor ();
577+ SelectorSource selectorSource = mockSelectorSource ();
578+ ScheduledExecutorService executor = Executors .newScheduledThreadPool (1 );
579+
580+ AtomicInteger callCount = new AtomicInteger (0 );
581+
582+ // Create a response with a goodbye event
583+ String goodbyeJson = "{\n " +
584+ " \" events\" : [\n " +
585+ " {\n " +
586+ " \" event\" : \" goodbye\" ,\n " +
587+ " \" data\" : {\n " +
588+ " \" reason\" : \" Service is shutting down\" \n " +
589+ " }\n " +
590+ " }\n " +
591+ " ]\n " +
592+ "}" ;
593+
594+ try {
595+ FDv2Requestor .FDv2PayloadResponse goodbyeResponse = new FDv2Requestor .FDv2PayloadResponse (
596+ com .launchdarkly .sdk .internal .fdv2 .payloads .FDv2Event .parseEventsArray (goodbyeJson ),
597+ okhttp3 .Headers .of ()
598+ );
599+
600+ when (requestor .Poll (any (Selector .class ))).thenAnswer (invocation -> {
601+ int count = callCount .incrementAndGet ();
602+ // First call returns goodbye
603+ if (count == 1 ) {
604+ return CompletableFuture .completedFuture (goodbyeResponse );
605+ } else {
606+ // Subsequent calls should not happen, but return success if they do
607+ return CompletableFuture .completedFuture (makeSuccessResponse ());
608+ }
609+ });
610+
611+ PollingSynchronizerImpl synchronizer = new PollingSynchronizerImpl (
612+ requestor ,
613+ testLogger ,
614+ selectorSource ,
615+ executor ,
616+ Duration .ofMillis (50 )
617+ );
618+
619+ // First result should be goodbye
620+ FDv2SourceResult result1 = synchronizer .next ().get (1 , TimeUnit .SECONDS );
621+ assertNotNull (result1 );
622+ assertEquals (FDv2SourceResult .ResultType .STATUS , result1 .getResultType ());
623+ assertEquals (FDv2SourceResult .State .GOODBYE , result1 .getStatus ().getState ());
624+
625+ // Wait to see if polling continues
626+ Thread .sleep (200 );
627+
628+ // Should have only called requestor once - polling stopped after goodbye
629+ assertEquals ("Polling should have stopped after goodbye" , 1 , callCount .get ());
630+
631+ // Don't call next() again after goodbye - that's incorrect usage
632+ synchronizer .close ();
633+ } catch (Exception e ) {
634+ throw new RuntimeException (e );
635+ } finally {
636+ executor .shutdown ();
637+ }
638+ }
639+
640+ @ Test
641+ public void recoverableHttpErrorContinuesPolling () throws Exception {
642+ FDv2Requestor requestor = mockRequestor ();
643+ SelectorSource selectorSource = mockSelectorSource ();
644+ ScheduledExecutorService executor = Executors .newScheduledThreadPool (1 );
645+
646+ AtomicInteger callCount = new AtomicInteger (0 );
647+ AtomicInteger successCount = new AtomicInteger (0 );
648+ when (requestor .Poll (any (Selector .class ))).thenAnswer (invocation -> {
649+ int count = callCount .incrementAndGet ();
650+ // First call returns 429 (recoverable - too many requests)
651+ if (count == 1 ) {
652+ return failedFuture (new com .launchdarkly .sdk .internal .http .HttpErrors .HttpErrorException (429 ));
653+ } else {
654+ // Subsequent calls succeed
655+ successCount .incrementAndGet ();
656+ return CompletableFuture .completedFuture (makeSuccessResponse ());
657+ }
658+ });
659+
660+ try {
661+ PollingSynchronizerImpl synchronizer = new PollingSynchronizerImpl (
662+ requestor ,
663+ testLogger ,
664+ selectorSource ,
665+ executor ,
666+ Duration .ofMillis (50 )
667+ );
668+
669+ // Wait for multiple polls
670+ Thread .sleep (250 );
671+
672+ // First result should be interrupted error
673+ FDv2SourceResult result1 = synchronizer .next ().get (1 , TimeUnit .SECONDS );
674+ assertNotNull (result1 );
675+ assertEquals (FDv2SourceResult .ResultType .STATUS , result1 .getResultType ());
676+ assertEquals (FDv2SourceResult .State .INTERRUPTED , result1 .getStatus ().getState ());
677+ assertEquals (DataSourceStatusProvider .ErrorKind .ERROR_RESPONSE , result1 .getStatus ().getErrorInfo ().getKind ());
678+
679+ // Second result should be success (polling continued)
680+ FDv2SourceResult result2 = synchronizer .next ().get (1 , TimeUnit .SECONDS );
681+ assertNotNull (result2 );
682+ assertEquals (FDv2SourceResult .ResultType .CHANGE_SET , result2 .getResultType ());
683+
684+ // Verify polling continued after recoverable error
685+ assertTrue ("Should have at least 2 successful polls after recoverable error" , successCount .get () >= 2 );
686+
687+ synchronizer .close ();
688+ } finally {
689+ executor .shutdown ();
690+ }
691+ }
692+
693+ @ Test
694+ public void multipleRecoverableErrorsContinuePolling () throws Exception {
695+ FDv2Requestor requestor = mockRequestor ();
696+ SelectorSource selectorSource = mockSelectorSource ();
697+ ScheduledExecutorService executor = Executors .newScheduledThreadPool (1 );
698+
699+ AtomicInteger callCount = new AtomicInteger (0 );
700+ when (requestor .Poll (any (Selector .class ))).thenAnswer (invocation -> {
701+ int count = callCount .incrementAndGet ();
702+ // Multiple recoverable errors: 408, 429, network error, success pattern
703+ if (count == 1 ) {
704+ return failedFuture (new com .launchdarkly .sdk .internal .http .HttpErrors .HttpErrorException (408 ));
705+ } else if (count == 2 ) {
706+ return failedFuture (new com .launchdarkly .sdk .internal .http .HttpErrors .HttpErrorException (429 ));
707+ } else if (count == 3 ) {
708+ return failedFuture (new IOException ("Connection timeout" ));
709+ } else {
710+ return CompletableFuture .completedFuture (makeSuccessResponse ());
711+ }
712+ });
713+
714+ try {
715+ PollingSynchronizerImpl synchronizer = new PollingSynchronizerImpl (
716+ requestor ,
717+ testLogger ,
718+ selectorSource ,
719+ executor ,
720+ Duration .ofMillis (50 )
721+ );
722+
723+ // Wait for multiple polls
724+ Thread .sleep (300 );
725+
726+ // Get first three interrupted results
727+ FDv2SourceResult result1 = synchronizer .next ().get (1 , TimeUnit .SECONDS );
728+ assertEquals (FDv2SourceResult .State .INTERRUPTED , result1 .getStatus ().getState ());
729+
730+ FDv2SourceResult result2 = synchronizer .next ().get (1 , TimeUnit .SECONDS );
731+ assertEquals (FDv2SourceResult .State .INTERRUPTED , result2 .getStatus ().getState ());
732+
733+ FDv2SourceResult result3 = synchronizer .next ().get (1 , TimeUnit .SECONDS );
734+ assertEquals (FDv2SourceResult .State .INTERRUPTED , result3 .getStatus ().getState ());
735+
736+ // Fourth result should be success
737+ FDv2SourceResult result4 = synchronizer .next ().get (1 , TimeUnit .SECONDS );
738+ assertEquals (FDv2SourceResult .ResultType .CHANGE_SET , result4 .getResultType ());
739+
740+ // Verify polling continued through multiple errors
741+ assertTrue ("Should have made at least 4 calls" , callCount .get () >= 4 );
742+
743+ synchronizer .close ();
744+ } finally {
745+ executor .shutdown ();
746+ }
747+ }
748+
749+ @ Test
750+ public void nonRecoverableThenRecoverableErrorStopsPolling () throws Exception {
751+ FDv2Requestor requestor = mockRequestor ();
752+ SelectorSource selectorSource = mockSelectorSource ();
753+ ScheduledExecutorService executor = Executors .newScheduledThreadPool (1 );
754+
755+ AtomicInteger callCount = new AtomicInteger (0 );
756+ when (requestor .Poll (any (Selector .class ))).thenAnswer (invocation -> {
757+ int count = callCount .incrementAndGet ();
758+ // First call returns 403 (non-recoverable)
759+ if (count == 1 ) {
760+ return failedFuture (new com .launchdarkly .sdk .internal .http .HttpErrors .HttpErrorException (403 ));
761+ } else {
762+ // Any subsequent calls should not happen
763+ return failedFuture (new IOException ("Network error" ));
764+ }
765+ });
766+
767+ try {
768+ PollingSynchronizerImpl synchronizer = new PollingSynchronizerImpl (
769+ requestor ,
770+ testLogger ,
771+ selectorSource ,
772+ executor ,
773+ Duration .ofMillis (50 )
774+ );
775+
776+ // First result should be terminal error
777+ FDv2SourceResult result1 = synchronizer .next ().get (1 , TimeUnit .SECONDS );
778+ assertEquals (FDv2SourceResult .State .TERMINAL_ERROR , result1 .getStatus ().getState ());
779+
780+ // Wait to ensure no more polling
781+ Thread .sleep (200 );
782+
783+ // Should have only called requestor once
784+ assertEquals ("Polling should have stopped after terminal error" , 1 , callCount .get ());
785+
786+ synchronizer .close ();
787+ } finally {
788+ executor .shutdown ();
789+ }
790+ }
526791}
0 commit comments