@@ -111,6 +111,14 @@ public void firstInitializerFailsSecondInitializerSucceedsWithSelector() throws
111111 Future <Void > startFuture = dataSource .start ();
112112 startFuture .get (2 , TimeUnit .SECONDS );
113113
114+ // In practice this intermediate status will be supressed by the data source updates sink.
115+
116+ // Should receive INTERRUPTED from the first failed initializer, then VALID from second successful initializer
117+ List <DataSourceStatusProvider .State > statuses = sink .awaitStatuses (2 , 2 , TimeUnit .SECONDS );
118+ assertEquals ("Should receive 2 status updates" , 2 , statuses .size ());
119+ assertEquals (DataSourceStatusProvider .State .INTERRUPTED , statuses .get (0 ));
120+ assertEquals (DataSourceStatusProvider .State .VALID , statuses .get (1 ));
121+
114122 assertTrue (dataSource .isInitialized ());
115123 assertEquals (1 , sink .getApplyCount ());
116124 assertEquals (DataSourceStatusProvider .State .VALID , sink .getLastState ());
@@ -157,6 +165,14 @@ public void firstInitializerFailsSecondInitializerSucceedsWithoutSelector() thro
157165 Boolean synchronizerCalled = synchronizerCalledQueue .poll (2 , TimeUnit .SECONDS );
158166 assertNotNull ("Synchronizer should be called" , synchronizerCalled );
159167
168+ // Expected status sequence:
169+ // 1. INTERRUPTED when first initializer fails
170+ // 2. VALID after synchronizer completes (second initializer has no selector, so must wait for synchronizer)
171+ List <DataSourceStatusProvider .State > statuses = sink .awaitStatuses (2 , 2 , TimeUnit .SECONDS );
172+ assertEquals ("Should receive 2 status updates" , 2 , statuses .size ());
173+ assertEquals (DataSourceStatusProvider .State .INTERRUPTED , statuses .get (0 ));
174+ assertEquals (DataSourceStatusProvider .State .VALID , statuses .get (1 ));
175+
160176 // Wait for apply to be processed
161177 sink .awaitApplyCount (2 , 2 , TimeUnit .SECONDS );
162178 assertEquals (2 , sink .getApplyCount ()); // One from initializer, one from synchronizer
@@ -274,6 +290,19 @@ public void allThreeInitializersFailWithNoSynchronizers() throws Exception {
274290 Future <Void > startFuture = dataSource .start ();
275291 startFuture .get (2 , TimeUnit .SECONDS );
276292
293+ // Should receive INTERRUPTED for each failing initializer, then OFF when all sources exhausted
294+ // Expected: 3 INTERRUPTED statuses + 1 OFF status = 4 total
295+ List <DataSourceStatusProvider .State > statuses = sink .awaitStatuses (4 , 2 , TimeUnit .SECONDS );
296+ assertEquals ("Should receive 4 status updates" , 4 , statuses .size ());
297+
298+ // First 3 should be INTERRUPTED (one per failed initializer)
299+ assertEquals (DataSourceStatusProvider .State .INTERRUPTED , statuses .get (0 ));
300+ assertEquals (DataSourceStatusProvider .State .INTERRUPTED , statuses .get (1 ));
301+ assertEquals (DataSourceStatusProvider .State .INTERRUPTED , statuses .get (2 ));
302+
303+ // Final status should be OFF (all sources exhausted, no synchronizers)
304+ assertEquals (DataSourceStatusProvider .State .OFF , statuses .get (3 ));
305+
277306 assertFalse (dataSource .isInitialized ());
278307 assertEquals (0 , sink .getApplyCount ());
279308 assertEquals (DataSourceStatusProvider .State .OFF , sink .getLastState ());
@@ -301,6 +330,11 @@ public void oneInitializerNoSynchronizerIsWellBehaved() throws Exception {
301330 Future <Void > startFuture = dataSource .start ();
302331 startFuture .get (2000 , TimeUnit .SECONDS );
303332
333+ // Expected status: VALID (initializer succeeds with selector, no need to wait for synchronizer)
334+ List <DataSourceStatusProvider .State > statuses = sink .awaitStatuses (1 , 2 , TimeUnit .SECONDS );
335+ assertEquals ("Should receive 1 status update" , 1 , statuses .size ());
336+ assertEquals (DataSourceStatusProvider .State .VALID , statuses .get (0 ));
337+
304338 assertTrue (dataSource .isInitialized ());
305339 assertEquals (1 , sink .getApplyCount ());
306340 assertEquals (DataSourceStatusProvider .State .VALID , sink .getLastState ());
@@ -2556,6 +2590,23 @@ public DataSourceStatusProvider.State awaitStatus(long timeout, TimeUnit unit) t
25562590 return statusUpdates .poll (timeout , unit );
25572591 }
25582592
2593+ public List <DataSourceStatusProvider .State > awaitStatuses (int count , long timeout , TimeUnit unit ) throws InterruptedException {
2594+ List <DataSourceStatusProvider .State > statuses = new ArrayList <>();
2595+ long deadline = System .currentTimeMillis () + unit .toMillis (timeout );
2596+ for (int i = 0 ; i < count ; i ++) {
2597+ long remaining = deadline - System .currentTimeMillis ();
2598+ if (remaining <= 0 ) {
2599+ break ;
2600+ }
2601+ DataSourceStatusProvider .State status = statusUpdates .poll (remaining , TimeUnit .MILLISECONDS );
2602+ if (status == null ) {
2603+ break ;
2604+ }
2605+ statuses .add (status );
2606+ }
2607+ return statuses ;
2608+ }
2609+
25592610 public void awaitApplyCount (int expectedCount , long timeout , TimeUnit unit ) throws InterruptedException {
25602611 long deadline = System .currentTimeMillis () + unit .toMillis (timeout );
25612612 while (applyCount .get () < expectedCount && System .currentTimeMillis () < deadline ) {
0 commit comments