@@ -159,38 +159,39 @@ private void runInitializers() {
159159 Initializer initializer = sourceManager .getNextInitializerAndSetActive ();
160160 while (initializer != null ) {
161161 try {
162- FDv2SourceResult result = initializer .run ().get ();
163- switch (result .getResultType ()) {
164- case CHANGE_SET :
165- dataSourceUpdates .apply (result .getChangeSet ());
166- anyDataReceived = true ;
167- if (!result .getChangeSet ().getSelector ().isEmpty ()) {
168- // We received data with a selector, so we end the initialization process.
169- dataSourceUpdates .updateStatus (DataSourceStatusProvider .State .VALID , null );
170- startFuture .complete (true );
171- return ;
172- }
173- break ;
174- case STATUS :
175- FDv2SourceResult .Status status = result .getStatus ();
176- switch (status .getState ()) {
177- case INTERRUPTED :
178- case TERMINAL_ERROR :
179- // The data source updates handler will filter the state during initializing, but this
180- // will make the error information available.
181- dataSourceUpdates .updateStatus (
182- // While the error was terminal to the individual initializer, it isn't terminal
183- // to the data source as a whole.
184- DataSourceStatusProvider .State .INTERRUPTED ,
185- status .getErrorInfo ());
186- break ;
187- case SHUTDOWN :
188- case GOODBYE :
189- // We don't need to inform anyone of these statuses.
190- logger .debug ("Ignoring status {} from initializer" , result .getStatus ().getState ());
191- break ;
192- }
193- break ;
162+ try (FDv2SourceResult result = initializer .run ().get ()) {
163+ switch (result .getResultType ()) {
164+ case CHANGE_SET :
165+ dataSourceUpdates .apply (result .getChangeSet ());
166+ anyDataReceived = true ;
167+ if (!result .getChangeSet ().getSelector ().isEmpty ()) {
168+ // We received data with a selector, so we end the initialization process.
169+ dataSourceUpdates .updateStatus (DataSourceStatusProvider .State .VALID , null );
170+ startFuture .complete (true );
171+ return ;
172+ }
173+ break ;
174+ case STATUS :
175+ FDv2SourceResult .Status status = result .getStatus ();
176+ switch (status .getState ()) {
177+ case INTERRUPTED :
178+ case TERMINAL_ERROR :
179+ // The data source updates handler will filter the state during initializing, but this
180+ // will make the error information available.
181+ dataSourceUpdates .updateStatus (
182+ // While the error was terminal to the individual initializer, it isn't terminal
183+ // to the data source as a whole.
184+ DataSourceStatusProvider .State .INTERRUPTED ,
185+ status .getErrorInfo ());
186+ break ;
187+ case SHUTDOWN :
188+ case GOODBYE :
189+ // We don't need to inform anyone of these statuses.
190+ logger .debug ("Ignoring status {} from initializer" , result .getStatus ().getState ());
191+ break ;
192+ }
193+ break ;
194+ }
194195 }
195196 } catch (ExecutionException | InterruptedException | CancellationException e ) {
196197 // We don't expect these conditions to happen in practice.
@@ -205,7 +206,7 @@ private void runInitializers() {
205206 new Date ().toInstant ()));
206207 logger .warn ("Error running initializer: {}" , e .toString ());
207208 }
208- initializer = sourceManager .getNextInitializerAndSetActive ();
209+ initializer = sourceManager .getNextInitializerAndSetActive ();
209210 }
210211 // We received data without a selector, and we have exhausted initializers, so we are going to
211212 // consider ourselves initialized.
@@ -286,55 +287,56 @@ private void runSynchronizers() {
286287 continue ;
287288 }
288289
289- FDv2SourceResult result = (FDv2SourceResult ) res ;
290- conditions .inform (result );
290+ try ( FDv2SourceResult result = (FDv2SourceResult ) res ) {
291+ conditions .inform (result );
291292
292- switch (result .getResultType ()) {
293- case CHANGE_SET :
294- dataSourceUpdates .apply (result .getChangeSet ());
295- dataSourceUpdates .updateStatus (DataSourceStatusProvider .State .VALID , null );
296- // This could have been completed by any data source. But if it has not been completed before
297- // now, then we complete it.
298- startFuture .complete (true );
299- break ;
300- case STATUS :
301- FDv2SourceResult .Status status = result .getStatus ();
302- switch (status .getState ()) {
303- case INTERRUPTED :
304- // Handled by conditions.
305- dataSourceUpdates .updateStatus (
306- DataSourceStatusProvider .State .INTERRUPTED ,
307- status .getErrorInfo ());
308- break ;
309- case SHUTDOWN :
310- // We should be overall shutting down.
311- logger .debug ("Synchronizer shutdown." );
312- return ;
313- case TERMINAL_ERROR :
314- sourceManager .blockCurrentSynchronizer ();
315- running = false ;
316- dataSourceUpdates .updateStatus (
317- DataSourceStatusProvider .State .INTERRUPTED ,
318- status .getErrorInfo ());
319- break ;
320- case GOODBYE :
321- // We let the synchronizer handle this internally.
322- break ;
323- }
324- break ;
325- }
326- // We have been requested to fall back to FDv1. We handle whatever message was associated,
327- // close the synchronizer, and then fallback.
328- // Only trigger fallback if we're not already running the FDv1 fallback synchronizer.
329- if (
330- result .isFdv1Fallback () &&
331- sourceManager .hasFDv1Fallback () &&
332- // This shouldn't happen in practice, an FDv1 source shouldn't request fallback
333- // to FDv1. But if it does, then we will discard its request.
334- !sourceManager .isCurrentSynchronizerFDv1Fallback ()
335- ) {
336- sourceManager .fdv1Fallback ();
337- running = false ;
293+ switch (result .getResultType ()) {
294+ case CHANGE_SET :
295+ dataSourceUpdates .apply (result .getChangeSet ());
296+ dataSourceUpdates .updateStatus (DataSourceStatusProvider .State .VALID , null );
297+ // This could have been completed by any data source. But if it has not been completed before
298+ // now, then we complete it.
299+ startFuture .complete (true );
300+ break ;
301+ case STATUS :
302+ FDv2SourceResult .Status status = result .getStatus ();
303+ switch (status .getState ()) {
304+ case INTERRUPTED :
305+ // Handled by conditions.
306+ dataSourceUpdates .updateStatus (
307+ DataSourceStatusProvider .State .INTERRUPTED ,
308+ status .getErrorInfo ());
309+ break ;
310+ case SHUTDOWN :
311+ // We should be overall shutting down.
312+ logger .debug ("Synchronizer shutdown." );
313+ return ;
314+ case TERMINAL_ERROR :
315+ sourceManager .blockCurrentSynchronizer ();
316+ running = false ;
317+ dataSourceUpdates .updateStatus (
318+ DataSourceStatusProvider .State .INTERRUPTED ,
319+ status .getErrorInfo ());
320+ break ;
321+ case GOODBYE :
322+ // We let the synchronizer handle this internally.
323+ break ;
324+ }
325+ break ;
326+ }
327+ // We have been requested to fall back to FDv1. We handle whatever message was associated,
328+ // close the synchronizer, and then fallback.
329+ // Only trigger fallback if we're not already running the FDv1 fallback synchronizer.
330+ if (
331+ result .isFdv1Fallback () &&
332+ sourceManager .hasFDv1Fallback () &&
333+ // This shouldn't happen in practice, an FDv1 source shouldn't request fallback
334+ // to FDv1. But if it does, then we will discard its request.
335+ !sourceManager .isCurrentSynchronizerFDv1Fallback ()
336+ ) {
337+ sourceManager .fdv1Fallback ();
338+ running = false ;
339+ }
338340 }
339341 }
340342 }
0 commit comments