Skip to content

Commit cce4e8b

Browse files
committed
Fallback and data source status.
1 parent 9c2c592 commit cce4e8b

File tree

6 files changed

+539
-128
lines changed

6 files changed

+539
-128
lines changed

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSourceSynchronizerAdapter.java

Lines changed: 38 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import java.io.IOException;
1313
import java.time.Instant;
14+
import java.util.AbstractMap;
1415
import java.util.Collections;
1516
import java.util.Map;
1617
import java.util.concurrent.CompletableFuture;
@@ -21,9 +22,9 @@
2122
* Adapter that wraps a DataSource (FDv1 protocol) and exposes it as a Synchronizer (FDv2 protocol).
2223
* <p>
2324
* This adapter bridges the push-based DataSource interface with the pull-based Synchronizer interface
24-
* by intercepting updates through a custom DataSourceUpdateSink and queueing them as FDv2SourceResult objects.
25+
* by listening to updates through a custom DataSourceUpdateSink and queueing them as FDv2SourceResult objects.
2526
* <p>
26-
* The adapter is constructed with a factory function that receives the intercepting update sink and
27+
* The adapter is constructed with a factory function that receives the listening update sink and
2728
* creates the DataSource. This ensures the DataSource uses the adapter's internal sink without exposing it.
2829
*/
2930
class DataSourceSynchronizerAdapter implements Synchronizer {
@@ -47,11 +48,10 @@ public interface DataSourceFactory {
4748
* Creates a new adapter that wraps a DataSource.
4849
*
4950
* @param dataSourceFactory factory that creates the DataSource with the provided update sink
50-
* @param originalUpdateSink the original update sink to delegate to
5151
*/
52-
public DataSourceSynchronizerAdapter(DataSourceFactory dataSourceFactory, DataSourceUpdateSink originalUpdateSink) {
53-
InterceptingUpdateSink interceptingSink = new InterceptingUpdateSink(originalUpdateSink, resultQueue);
54-
this.dataSource = dataSourceFactory.create(interceptingSink);
52+
public DataSourceSynchronizerAdapter(DataSourceFactory dataSourceFactory) {
53+
ConvertingUpdateSink convertingSink = new ConvertingUpdateSink(resultQueue);
54+
this.dataSource = dataSourceFactory.create(convertingSink);
5555
}
5656

5757
@Override
@@ -62,7 +62,7 @@ public CompletableFuture<FDv2SourceResult> next() {
6262
startFuture = dataSource.start();
6363

6464
// Monitor the start future for errors
65-
// The data source will emit updates through the intercepting sink
65+
// The data source will emit updates through the listening sink
6666
CompletableFuture.runAsync(() -> {
6767
try {
6868
startFuture.get();
@@ -100,64 +100,57 @@ public void close() throws IOException {
100100
}
101101

102102
/**
103-
* An intercepting DataSourceUpdateSink that converts DataSource updates into FDv2SourceResult objects.
103+
* A DataSourceUpdateSink that converts DataSource updates into FDv2SourceResult objects.
104+
* This sink doesn't delegate to any other sink - it exists solely to convert FDv1 updates to FDv2 results.
104105
*/
105-
private static class InterceptingUpdateSink implements DataSourceUpdateSink {
106-
private final DataSourceUpdateSink delegate;
106+
private static class ConvertingUpdateSink implements DataSourceUpdateSink {
107107
private final IterableAsyncQueue<FDv2SourceResult> resultQueue;
108108

109-
public InterceptingUpdateSink(DataSourceUpdateSink delegate, IterableAsyncQueue<FDv2SourceResult> resultQueue) {
110-
this.delegate = delegate;
109+
public ConvertingUpdateSink(IterableAsyncQueue<FDv2SourceResult> resultQueue) {
111110
this.resultQueue = resultQueue;
112111
}
113112

114113
@Override
115114
public boolean init(DataStoreTypes.FullDataSet<DataStoreTypes.ItemDescriptor> allData) {
116-
boolean success = delegate.init(allData);
117-
if (success) {
118-
// Convert the full data set into a ChangeSet and emit it
119-
DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet =
120-
new DataStoreTypes.ChangeSet<>(
121-
DataStoreTypes.ChangeSetType.Full,
122-
Selector.EMPTY,
123-
allData.getData(),
124-
null);
125-
resultQueue.put(FDv2SourceResult.changeSet(changeSet, false));
126-
}
127-
return success;
115+
// Convert the full data set into a ChangeSet and emit it
116+
DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet =
117+
new DataStoreTypes.ChangeSet<>(
118+
DataStoreTypes.ChangeSetType.Full,
119+
Selector.EMPTY,
120+
allData.getData(),
121+
null);
122+
resultQueue.put(FDv2SourceResult.changeSet(changeSet, false));
123+
return true;
128124
}
129125

130126
@Override
131127
public boolean upsert(DataStoreTypes.DataKind kind, String key, DataStoreTypes.ItemDescriptor item) {
132-
boolean success = delegate.upsert(kind, key, item);
133-
if (success) {
134-
// Convert the upsert into a ChangeSet with a single item and emit it
135-
DataStoreTypes.KeyedItems<DataStoreTypes.ItemDescriptor> items =
136-
new DataStoreTypes.KeyedItems<>(Collections.singletonList(
137-
Map.entry(key, item)));
138-
Iterable<Map.Entry<DataStoreTypes.DataKind, DataStoreTypes.KeyedItems<DataStoreTypes.ItemDescriptor>>> data =
139-
Collections.singletonList(Map.entry(kind, items));
140-
141-
DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet =
142-
new DataStoreTypes.ChangeSet<>(
143-
DataStoreTypes.ChangeSetType.Partial,
144-
Selector.EMPTY,
145-
data,
146-
null);
147-
resultQueue.put(FDv2SourceResult.changeSet(changeSet, false));
148-
}
149-
return success;
128+
// Convert the upsert into a ChangeSet with a single item and emit it
129+
DataStoreTypes.KeyedItems<DataStoreTypes.ItemDescriptor> items =
130+
new DataStoreTypes.KeyedItems<>(Collections.<Map.Entry<String, DataStoreTypes.ItemDescriptor>>singletonList(
131+
new AbstractMap.SimpleEntry<>(key, item)));
132+
Iterable<Map.Entry<DataStoreTypes.DataKind, DataStoreTypes.KeyedItems<DataStoreTypes.ItemDescriptor>>> data =
133+
Collections.<Map.Entry<DataStoreTypes.DataKind, DataStoreTypes.KeyedItems<DataStoreTypes.ItemDescriptor>>>singletonList(
134+
new AbstractMap.SimpleEntry<>(kind, items));
135+
136+
DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet =
137+
new DataStoreTypes.ChangeSet<>(
138+
DataStoreTypes.ChangeSetType.Partial,
139+
Selector.EMPTY,
140+
data,
141+
null);
142+
resultQueue.put(FDv2SourceResult.changeSet(changeSet, false));
143+
return true;
150144
}
151145

152146
@Override
153147
public DataStoreStatusProvider getDataStoreStatusProvider() {
154-
return delegate.getDataStoreStatusProvider();
148+
// This adapter doesn't use a data store
149+
return null;
155150
}
156151

157152
@Override
158153
public void updateStatus(DataSourceStatusProvider.State newState, DataSourceStatusProvider.ErrorInfo newError) {
159-
delegate.updateStatus(newState, newError);
160-
161154
// Convert state changes to FDv2SourceResult status events
162155
switch (newState) {
163156
case INTERRUPTED:

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java

Lines changed: 80 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import com.launchdarkly.sdk.server.subsystems.DataSource;
1010
import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSinkV2;
1111

12-
import java.io.IOException;
1312
import java.util.ArrayList;
1413
import java.util.Collections;
1514
import java.util.Date;
@@ -48,8 +47,6 @@ class FDv2DataSource implements DataSource {
4847

4948
private final LDLogger logger;
5049

51-
private final DataSourceFactory<Synchronizer> fdv1DataSourceFactory;
52-
5350
public interface DataSourceFactory<T> {
5451
T build();
5552
}
@@ -92,7 +89,19 @@ public FDv2DataSource(
9289
.stream()
9390
.map(SynchronizerFactoryWithState::new)
9491
.collect(Collectors.toList());
95-
this.fdv1DataSourceFactory = fdv1DataSourceFactory;
92+
93+
// If we have a fdv1 data source factory, then add that to the synchronizer factories in a blocked state.
94+
// If we receive a request to fallback, then we will unblock it and block all other synchronizers.
95+
if (fdv1DataSourceFactory != null) {
96+
SynchronizerFactoryWithState wrapped = new SynchronizerFactoryWithState(fdv1DataSourceFactory,
97+
true);
98+
wrapped.block();
99+
synchronizerFactories.add(wrapped);
100+
101+
// Currently, we only support 1 fdv1 fallback synchronizer, but that limitation is introduced by the
102+
// configuration.
103+
}
104+
96105
this.synchronizerStateManager = new SynchronizerStateManager(synchronizerFactories);
97106
this.dataSourceUpdates = dataSourceUpdates;
98107
this.threadPriority = threadPriority;
@@ -104,6 +113,11 @@ public FDv2DataSource(
104113

105114
private void run() {
106115
Thread runThread = new Thread(() -> {
116+
if (initializers.isEmpty() && synchronizerStateManager.getAvailableSynchronizerCount() == 0) {
117+
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
118+
startFuture.complete(true);
119+
return;
120+
}
107121
if (!initializers.isEmpty()) {
108122
runInitializers();
109123
}
@@ -113,7 +127,7 @@ private void run() {
113127
new DataSourceStatusProvider.ErrorInfo(
114128
DataSourceStatusProvider.ErrorKind.UNKNOWN,
115129
0,
116-
"",
130+
"All data source acquisition methods have been exhausted.",
117131
new Date().toInstant())
118132
);
119133

@@ -146,12 +160,37 @@ private void runInitializers() {
146160
}
147161
break;
148162
case STATUS:
149-
// TODO: Implement.
163+
FDv2SourceResult.Status status = result.getStatus();
164+
switch(status.getState()) {
165+
case INTERRUPTED:
166+
case TERMINAL_ERROR:
167+
// The data source updates handler will filter the state during initializing, but this
168+
// will make the error information available.
169+
dataSourceUpdates.updateStatus(
170+
// While the error was terminal to the individual initializer, it isn't terminal
171+
// to the data source as a whole.
172+
DataSourceStatusProvider.State.INTERRUPTED,
173+
status.getErrorInfo());
174+
break;
175+
case SHUTDOWN:
176+
case GOODBYE:
177+
// We don't need to inform anyone of these statuses.
178+
logger.debug("Ignoring status {} from initializer", result.getStatus().getState());
179+
break;
180+
}
150181
break;
151182
}
152183
} catch (ExecutionException | InterruptedException | CancellationException e) {
153-
// TODO: Better messaging?
154-
// TODO: Data source status?
184+
// We don't expect these conditions to happen in practice.
185+
186+
// The data source updates handler will filter the state during initializing, but this
187+
// will make the error information available.
188+
dataSourceUpdates.updateStatus(
189+
DataSourceStatusProvider.State.INTERRUPTED,
190+
new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.UNKNOWN,
191+
0,
192+
e.toString(),
193+
new Date().toInstant()));
155194
logger.warn("Error running initializer: {}", e.toString());
156195
}
157196
}
@@ -161,6 +200,8 @@ private void runInitializers() {
161200
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
162201
startFuture.complete(true);
163202
}
203+
// If no data was received, then it is possible initialization will complete from synchronizers, so we give
204+
// them an opportunity to run before reporting any issues.
164205
}
165206

166207
/**
@@ -217,11 +258,13 @@ private void runSynchronizers() {
217258
// For fallback, we will move to the next available synchronizer, which may loop.
218259
// This is the default behavior of exiting the run loop, so we don't need to take
219260
// any action.
261+
logger.debug("A synchronizer has experienced an interruption and we are falling back.");
220262
break;
221263
case RECOVERY:
222264
// For recovery, we will start at the first available synchronizer.
223265
// So we reset the source index, and finding the source will start at the beginning.
224266
synchronizerStateManager.resetSourceIndex();
267+
logger.debug("The data source is attempting to recover to a higher priority synchronizer.");
225268
break;
226269
}
227270
// A running synchronizer will only have fallback and recovery conditions that it can act on.
@@ -230,7 +273,7 @@ private void runSynchronizers() {
230273
break;
231274
}
232275

233-
if(!(res instanceof FDv2SourceResult)) {
276+
if (!(res instanceof FDv2SourceResult)) {
234277
logger.error("Unexpected result type from synchronizer: {}", res.getClass().getName());
235278
continue;
236279
}
@@ -241,6 +284,7 @@ private void runSynchronizers() {
241284
switch (result.getResultType()) {
242285
case CHANGE_SET:
243286
dataSourceUpdates.apply(result.getChangeSet());
287+
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
244288
// This could have been completed by any data source. But if it has not been completed before
245289
// now, then we complete it.
246290
startFuture.complete(true);
@@ -250,15 +294,20 @@ private void runSynchronizers() {
250294
switch (status.getState()) {
251295
case INTERRUPTED:
252296
// Handled by conditions.
253-
// TODO: Data source status.
297+
dataSourceUpdates.updateStatus(
298+
DataSourceStatusProvider.State.INTERRUPTED,
299+
status.getErrorInfo());
254300
break;
255301
case SHUTDOWN:
256302
// We should be overall shutting down.
257-
// TODO: We may need logging or to do a little more.
258-
return false;
303+
logger.debug("Synchronizer shutdown.");
304+
return;
259305
case TERMINAL_ERROR:
260306
availableSynchronizer.block();
261307
running = false;
308+
dataSourceUpdates.updateStatus(
309+
DataSourceStatusProvider.State.INTERRUPTED,
310+
status.getErrorInfo());
262311
break;
263312
case GOODBYE:
264313
// We let the synchronizer handle this internally.
@@ -268,14 +317,29 @@ private void runSynchronizers() {
268317
}
269318
// We have been requested to fall back to FDv1. We handle whatever message was associated,
270319
// close the synchronizer, and then fallback.
271-
if (result.isFdv1Fallback()) {
272-
// return true;
320+
// Only trigger fallback if we're not already running the FDv1 fallback synchronizer.
321+
if (
322+
result.isFdv1Fallback() &&
323+
synchronizerStateManager.hasFDv1Fallback() &&
324+
// This shouldn't happen in practice, an FDv1 source shouldn't request fallback
325+
// to FDv1. But if it does, then we will discard its request.
326+
!availableSynchronizer.isFDv1Fallback()
327+
) {
328+
synchronizerStateManager.fdv1Fallback();
329+
running = false;
273330
}
274331
}
275332
}
276333
} catch (ExecutionException | InterruptedException | CancellationException e) {
277-
// TODO: Log.
278-
// Move to next synchronizer.
334+
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.INTERRUPTED,
335+
new DataSourceStatusProvider.ErrorInfo(
336+
DataSourceStatusProvider.ErrorKind.UNKNOWN,
337+
0,
338+
e.toString(),
339+
new Date().toInstant()
340+
));
341+
logger.warn("Error running synchronizer: {}, will try next synchronizer, or retry.", e.toString());
342+
// Move to the next synchronizer.
279343
}
280344
availableSynchronizer = synchronizerStateManager.getNextAvailableSynchronizer();
281345
}

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSystem.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,21 @@ static FDv2DataSystem create(
154154
.map(synchronizer -> new FactoryWrapper<>(synchronizer, builderContext))
155155
.collect(ImmutableList.toImmutableList());
156156

157+
// Create FDv1 fallback synchronizer factory if configured
158+
FDv2DataSource.DataSourceFactory<Synchronizer> fdv1FallbackFactory = null;
159+
if (dataSystemConfiguration.getFDv1FallbackSynchronizer() != null) {
160+
fdv1FallbackFactory = () -> {
161+
// Wrap the FDv1 DataSource as a Synchronizer using the adapter
162+
return new DataSourceSynchronizerAdapter(
163+
updateSink -> dataSystemConfiguration.getFDv1FallbackSynchronizer().build(clientContext)
164+
);
165+
};
166+
}
167+
157168
DataSource dataSource = new FDv2DataSource(
158169
initializerFactories,
159170
synchronizerFactories,
171+
fdv1FallbackFactory,
160172
dataSourceUpdates,
161173
config.threadPriority,
162174
clientContext.getBaseLogger().subLogger(Loggers.DATA_SOURCE_LOGGER_NAME),

0 commit comments

Comments
 (0)