@@ -33,8 +33,7 @@ class FDv2DataSource implements DataSource {
3333 */
3434 private static final long defaultRecoveryTimeout = 5 * 60 ;
3535
36- private final List <DataSourceFactory <Initializer >> initializers ;
37- private final SynchronizerStateManager synchronizerStateManager ;
36+ private final SourceStateManager sourceStateManager ;
3837
3938 private final List <ConditionFactory > conditionFactories ;
4039
@@ -47,6 +46,8 @@ class FDv2DataSource implements DataSource {
4746
4847 private final LDLogger logger ;
4948
49+ private volatile boolean closed = false ;
50+
5051 public interface DataSourceFactory <T > {
5152 T build ();
5253 }
@@ -84,7 +85,6 @@ public FDv2DataSource(
8485 long fallbackTimeout ,
8586 long recoveryTimeout
8687 ) {
87- this .initializers = initializers ;
8888 List <SynchronizerFactoryWithState > synchronizerFactories = synchronizers
8989 .stream ()
9090 .map (SynchronizerFactoryWithState ::new )
@@ -102,7 +102,7 @@ public FDv2DataSource(
102102 // configuration.
103103 }
104104
105- this .synchronizerStateManager = new SynchronizerStateManager (synchronizerFactories );
105+ this .sourceStateManager = new SourceStateManager (synchronizerFactories , initializers );
106106 this .dataSourceUpdates = dataSourceUpdates ;
107107 this .threadPriority = threadPriority ;
108108 this .logger = logger ;
@@ -113,30 +113,33 @@ public FDv2DataSource(
113113
114114 private void run () {
115115 Thread runThread = new Thread (() -> {
116- if (initializers . isEmpty () && synchronizerStateManager . getAvailableSynchronizerCount () == 0 ) {
116+ if (! sourceStateManager . hasAvailableSources () ) {
117117 // There are not any initializer or synchronizers, so we are at the best state that
118118 // can be achieved.
119119 dataSourceUpdates .updateStatus (DataSourceStatusProvider .State .VALID , null );
120120 startFuture .complete (true );
121121 return ;
122122 }
123- if (! initializers . isEmpty ()) {
123+ if (sourceStateManager . hasInitializers ()) {
124124 runInitializers ();
125125 }
126- boolean synchronizersAvailable = synchronizerStateManager . getAvailableSynchronizerCount () != 0 ;
126+ boolean synchronizersAvailable = sourceStateManager . hasAvailableSynchronizers () ;
127127 if (!synchronizersAvailable ) {
128128 // If already completed by the initializers, then this will have no effect.
129- startFuture . complete ( false );
130- if (! isInitialized ()) {
129+ if (! isInitialized () && ! closed ) {
130+ // If we were closed, then closing would have handled our terminal update.
131131 dataSourceUpdates .updateStatus (
132132 DataSourceStatusProvider .State .OFF ,
133+ // If we were shutdown during initialization, then we don't need to include an error.
133134 new DataSourceStatusProvider .ErrorInfo (
134135 DataSourceStatusProvider .ErrorKind .UNKNOWN ,
135136 0 ,
136137 "Initializers exhausted and there are no synchronizers" ,
137138 new Date ().toInstant ())
138139 );
139140 }
141+ // If already completed has no effect.
142+ startFuture .complete (false );
140143 return ;
141144 }
142145
@@ -145,7 +148,9 @@ private void run() {
145148
146149 dataSourceUpdates .updateStatus (
147150 DataSourceStatusProvider .State .OFF ,
148- new DataSourceStatusProvider .ErrorInfo (
151+ // If the data source was closed, then we just report we are OFF without an
152+ // associated error.
153+ closed ? null : new DataSourceStatusProvider .ErrorInfo (
149154 DataSourceStatusProvider .ErrorKind .UNKNOWN ,
150155 0 ,
151156 "All data source acquisition methods have been exhausted." ,
@@ -164,10 +169,11 @@ private void run() {
164169
165170 private void runInitializers () {
166171 boolean anyDataReceived = false ;
167- for (DataSourceFactory <Initializer > factory : initializers ) {
172+ DataSourceFactory <Initializer > factory = sourceStateManager .getNextInitializer ();
173+ while (factory != null ) {
168174 try {
169175 Initializer initializer = factory .build ();
170- if (synchronizerStateManager .setActiveSource (initializer )) return ;
176+ if (sourceStateManager .setActiveSource (initializer )) return ;
171177 FDv2SourceResult result = initializer .run ().get ();
172178 switch (result .getResultType ()) {
173179 case CHANGE_SET :
@@ -214,6 +220,7 @@ private void runInitializers() {
214220 new Date ().toInstant ()));
215221 logger .warn ("Error running initializer: {}" , e .toString ());
216222 }
223+ factory = sourceStateManager .getNextInitializer ();
217224 }
218225 // We received data without a selector, and we have exhausted initializers, so we are going to
219226 // consider ourselves initialized.
@@ -232,8 +239,8 @@ private void runInitializers() {
232239 * @return a list of conditions to apply to the synchronizer
233240 */
234241 private List <Condition > getConditions () {
235- int availableSynchronizers = synchronizerStateManager .getAvailableSynchronizerCount ();
236- boolean isPrimeSynchronizer = synchronizerStateManager .isPrimeSynchronizer ();
242+ int availableSynchronizers = sourceStateManager .getAvailableSynchronizerCount ();
243+ boolean isPrimeSynchronizer = sourceStateManager .isPrimeSynchronizer ();
237244
238245 if (availableSynchronizers == 1 ) {
239246 // If there is only 1 synchronizer, then we cannot fall back or recover, so we don't need any conditions.
@@ -252,14 +259,14 @@ private List<Condition> getConditions() {
252259 private void runSynchronizers () {
253260 // When runSynchronizers exists, no matter how it exits, the synchronizerStateManager will be closed.
254261 try {
255- SynchronizerFactoryWithState availableSynchronizer = synchronizerStateManager .getNextAvailableSynchronizer ();
262+ SynchronizerFactoryWithState availableSynchronizer = sourceStateManager .getNextAvailableSynchronizer ();
256263
257264 // We want to continue running synchronizers for as long as any are available.
258265 while (availableSynchronizer != null ) {
259266 Synchronizer synchronizer = availableSynchronizer .build ();
260267
261268 // Returns true if shutdown.
262- if (synchronizerStateManager .setActiveSource (synchronizer )) return ;
269+ if (sourceStateManager .setActiveSource (synchronizer )) return ;
263270
264271 try {
265272 boolean running = true ;
@@ -284,7 +291,7 @@ private void runSynchronizers() {
284291 case RECOVERY :
285292 // For recovery, we will start at the first available synchronizer.
286293 // So we reset the source index, and finding the source will start at the beginning.
287- synchronizerStateManager .resetSourceIndex ();
294+ sourceStateManager .resetSourceIndex ();
288295 logger .debug ("The data source is attempting to recover to a higher priority synchronizer." );
289296 break ;
290297 }
@@ -341,12 +348,12 @@ private void runSynchronizers() {
341348 // Only trigger fallback if we're not already running the FDv1 fallback synchronizer.
342349 if (
343350 result .isFdv1Fallback () &&
344- synchronizerStateManager .hasFDv1Fallback () &&
351+ sourceStateManager .hasFDv1Fallback () &&
345352 // This shouldn't happen in practice, an FDv1 source shouldn't request fallback
346353 // to FDv1. But if it does, then we will discard its request.
347354 !availableSynchronizer .isFDv1Fallback ()
348355 ) {
349- synchronizerStateManager .fdv1Fallback ();
356+ sourceStateManager .fdv1Fallback ();
350357 running = false ;
351358 }
352359 }
@@ -362,10 +369,12 @@ private void runSynchronizers() {
362369 logger .warn ("Error running synchronizer: {}, will try next synchronizer, or retry." , e .toString ());
363370 // Move to the next synchronizer.
364371 }
365- availableSynchronizer = synchronizerStateManager .getNextAvailableSynchronizer ();
372+ availableSynchronizer = sourceStateManager .getNextAvailableSynchronizer ();
366373 }
367- } finally {
368- synchronizerStateManager .close ();
374+ } catch (Exception e ) {
375+ logger .error ("Unexpected error in DataSource: {}" , e .toString ());
376+ }finally {
377+ sourceStateManager .close ();
369378 }
370379 }
371380
@@ -388,11 +397,14 @@ public boolean isInitialized() {
388397
389398 @ Override
390399 public void close () {
400+ closed = true ;
391401 // If there is an active source, we will shut it down, and that will result in the loop handling that source
392402 // exiting.
393403 // If we do not have an active source, then the loop will check isShutdown when attempting to set one. When
394404 // it detects shutdown, it will exit the loop.
395- synchronizerStateManager .close ();
405+ sourceStateManager .close ();
406+
407+ dataSourceUpdates .updateStatus (DataSourceStatusProvider .State .OFF , null );
396408
397409 // If this is already set, then this has no impact.
398410 startFuture .complete (false );
0 commit comments