Skip to content

Commit 22e1550

Browse files
chore: adds TestDataV2 to implement Synchronizer (#124)
**Requirements** - [x] I have added test coverage for new or changed functionality - [x] I have followed the repository's [pull request submission guidelines](../blob/main/CONTRIBUTING.md#submitting-pull-requests) - [x] I have validated my changes against all supported platform versions <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Touches FDv2 core result handling (`FDv2SourceResult` lifecycle and `FDv2DataSource` processing), so incorrect closing/callback behavior could affect all FDv2 synchronizers; changes are mostly additive and covered by new tests. > > **Overview** > Adds `TestDataV2`, a new test-only FDv2 `Synchronizer` data source that can emit an initial full `ChangeSet` and subsequent partial updates/tombstones via `update()`, `delete()`, `updateStatus()`, and optional persistence control (`shouldPersist`). > > To support backpressure/ack semantics, `FDv2SourceResult` is now `Closeable` and can carry/combine completion callbacks via `withCompletion()`, and `FDv2DataSource` is updated to always `try-with-resources` close results from initializers/synchronizers. Tests are updated to assert correct versioning, and new unit/integration tests cover `TestDataV2` behavior and end-to-end usage with `LDClient`. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 4f26ccf. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Co-authored-by: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com>
1 parent b50f684 commit 22e1550

File tree

7 files changed

+1026
-106
lines changed

7 files changed

+1026
-106
lines changed

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java

Lines changed: 83 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -159,38 +159,39 @@ private void runInitializers() {
159159
Initializer initializer = sourceManager.getNextInitializerAndSetActive();
160160
while(initializer != null) {
161161
try {
162-
FDv2SourceResult result = initializer.run().get();
163-
switch (result.getResultType()) {
164-
case CHANGE_SET:
165-
dataSourceUpdates.apply(result.getChangeSet());
166-
anyDataReceived = true;
167-
if (!result.getChangeSet().getSelector().isEmpty()) {
168-
// We received data with a selector, so we end the initialization process.
169-
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
170-
startFuture.complete(true);
171-
return;
172-
}
173-
break;
174-
case STATUS:
175-
FDv2SourceResult.Status status = result.getStatus();
176-
switch(status.getState()) {
177-
case INTERRUPTED:
178-
case TERMINAL_ERROR:
179-
// The data source updates handler will filter the state during initializing, but this
180-
// will make the error information available.
181-
dataSourceUpdates.updateStatus(
182-
// While the error was terminal to the individual initializer, it isn't terminal
183-
// to the data source as a whole.
184-
DataSourceStatusProvider.State.INTERRUPTED,
185-
status.getErrorInfo());
186-
break;
187-
case SHUTDOWN:
188-
case GOODBYE:
189-
// We don't need to inform anyone of these statuses.
190-
logger.debug("Ignoring status {} from initializer", result.getStatus().getState());
191-
break;
192-
}
193-
break;
162+
try(FDv2SourceResult result = initializer.run().get()) {
163+
switch (result.getResultType()) {
164+
case CHANGE_SET:
165+
dataSourceUpdates.apply(result.getChangeSet());
166+
anyDataReceived = true;
167+
if (!result.getChangeSet().getSelector().isEmpty()) {
168+
// We received data with a selector, so we end the initialization process.
169+
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
170+
startFuture.complete(true);
171+
return;
172+
}
173+
break;
174+
case STATUS:
175+
FDv2SourceResult.Status status = result.getStatus();
176+
switch (status.getState()) {
177+
case INTERRUPTED:
178+
case TERMINAL_ERROR:
179+
// The data source updates handler will filter the state during initializing, but this
180+
// will make the error information available.
181+
dataSourceUpdates.updateStatus(
182+
// While the error was terminal to the individual initializer, it isn't terminal
183+
// to the data source as a whole.
184+
DataSourceStatusProvider.State.INTERRUPTED,
185+
status.getErrorInfo());
186+
break;
187+
case SHUTDOWN:
188+
case GOODBYE:
189+
// We don't need to inform anyone of these statuses.
190+
logger.debug("Ignoring status {} from initializer", result.getStatus().getState());
191+
break;
192+
}
193+
break;
194+
}
194195
}
195196
} catch (ExecutionException | InterruptedException | CancellationException e) {
196197
// We don't expect these conditions to happen in practice.
@@ -205,7 +206,7 @@ private void runInitializers() {
205206
new Date().toInstant()));
206207
logger.warn("Error running initializer: {}", e.toString());
207208
}
208-
initializer = sourceManager.getNextInitializerAndSetActive();
209+
initializer = sourceManager.getNextInitializerAndSetActive();
209210
}
210211
// We received data without a selector, and we have exhausted initializers, so we are going to
211212
// consider ourselves initialized.
@@ -286,55 +287,56 @@ private void runSynchronizers() {
286287
continue;
287288
}
288289

289-
FDv2SourceResult result = (FDv2SourceResult) res;
290-
conditions.inform(result);
290+
try (FDv2SourceResult result = (FDv2SourceResult) res) {
291+
conditions.inform(result);
291292

292-
switch (result.getResultType()) {
293-
case CHANGE_SET:
294-
dataSourceUpdates.apply(result.getChangeSet());
295-
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
296-
// This could have been completed by any data source. But if it has not been completed before
297-
// now, then we complete it.
298-
startFuture.complete(true);
299-
break;
300-
case STATUS:
301-
FDv2SourceResult.Status status = result.getStatus();
302-
switch (status.getState()) {
303-
case INTERRUPTED:
304-
// Handled by conditions.
305-
dataSourceUpdates.updateStatus(
306-
DataSourceStatusProvider.State.INTERRUPTED,
307-
status.getErrorInfo());
308-
break;
309-
case SHUTDOWN:
310-
// We should be overall shutting down.
311-
logger.debug("Synchronizer shutdown.");
312-
return;
313-
case TERMINAL_ERROR:
314-
sourceManager.blockCurrentSynchronizer();
315-
running = false;
316-
dataSourceUpdates.updateStatus(
317-
DataSourceStatusProvider.State.INTERRUPTED,
318-
status.getErrorInfo());
319-
break;
320-
case GOODBYE:
321-
// We let the synchronizer handle this internally.
322-
break;
323-
}
324-
break;
325-
}
326-
// We have been requested to fall back to FDv1. We handle whatever message was associated,
327-
// close the synchronizer, and then fallback.
328-
// Only trigger fallback if we're not already running the FDv1 fallback synchronizer.
329-
if (
330-
result.isFdv1Fallback() &&
331-
sourceManager.hasFDv1Fallback() &&
332-
// This shouldn't happen in practice, an FDv1 source shouldn't request fallback
333-
// to FDv1. But if it does, then we will discard its request.
334-
!sourceManager.isCurrentSynchronizerFDv1Fallback()
335-
) {
336-
sourceManager.fdv1Fallback();
337-
running = false;
293+
switch (result.getResultType()) {
294+
case CHANGE_SET:
295+
dataSourceUpdates.apply(result.getChangeSet());
296+
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
297+
// This could have been completed by any data source. But if it has not been completed before
298+
// now, then we complete it.
299+
startFuture.complete(true);
300+
break;
301+
case STATUS:
302+
FDv2SourceResult.Status status = result.getStatus();
303+
switch (status.getState()) {
304+
case INTERRUPTED:
305+
// Handled by conditions.
306+
dataSourceUpdates.updateStatus(
307+
DataSourceStatusProvider.State.INTERRUPTED,
308+
status.getErrorInfo());
309+
break;
310+
case SHUTDOWN:
311+
// We should be overall shutting down.
312+
logger.debug("Synchronizer shutdown.");
313+
return;
314+
case TERMINAL_ERROR:
315+
sourceManager.blockCurrentSynchronizer();
316+
running = false;
317+
dataSourceUpdates.updateStatus(
318+
DataSourceStatusProvider.State.INTERRUPTED,
319+
status.getErrorInfo());
320+
break;
321+
case GOODBYE:
322+
// We let the synchronizer handle this internally.
323+
break;
324+
}
325+
break;
326+
}
327+
// We have been requested to fall back to FDv1. We handle whatever message was associated,
328+
// close the synchronizer, and then fallback.
329+
// Only trigger fallback if we're not already running the FDv1 fallback synchronizer.
330+
if (
331+
result.isFdv1Fallback() &&
332+
sourceManager.hasFDv1Fallback() &&
333+
// This shouldn't happen in practice, an FDv1 source shouldn't request fallback
334+
// to FDv1. But if it does, then we will discard its request.
335+
!sourceManager.isCurrentSynchronizerFDv1Fallback()
336+
) {
337+
sourceManager.fdv1Fallback();
338+
running = false;
339+
}
338340
}
339341
}
340342
}

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/FDv2SourceResult.java

Lines changed: 87 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
11
package com.launchdarkly.sdk.server.datasources;
2+
23
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
34
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes;
45

6+
import java.io.Closeable;
7+
import java.util.function.Function;
8+
59
/**
610
* This type is currently experimental and not subject to semantic versioning.
711
* <p>
812
* The result type for FDv2 initializers and synchronizers. An FDv2 initializer produces a single result, while
913
* an FDv2 synchronizer produces a stream of results.
1014
*/
11-
public class FDv2SourceResult {
15+
public class FDv2SourceResult implements Closeable {
16+
1217
public enum State {
1318
/**
1419
* The data source has encountered an interruption and will attempt to reconnect. This isn't intended to be used
@@ -67,49 +72,86 @@ public Status(State state, DataSourceStatusProvider.ErrorInfo errorInfo) {
6772
private final Status status;
6873

6974
private final ResultType resultType;
70-
75+
7176
private final boolean fdv1Fallback;
7277

73-
private FDv2SourceResult(DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet, Status status, ResultType resultType, boolean fdv1Fallback) {
78+
private final Function<Void, Void> completionCallback;
79+
80+
private FDv2SourceResult(
81+
DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet,
82+
Status status, ResultType resultType,
83+
boolean fdv1Fallback,
84+
Function<Void, Void> completionCallback
85+
) {
7486
this.changeSet = changeSet;
7587
this.status = status;
7688
this.resultType = resultType;
7789
this.fdv1Fallback = fdv1Fallback;
90+
this.completionCallback = completionCallback;
7891
}
7992

8093
public static FDv2SourceResult interrupted(DataSourceStatusProvider.ErrorInfo errorInfo, boolean fdv1Fallback) {
94+
return interrupted(errorInfo, fdv1Fallback, null);
95+
}
96+
97+
public static FDv2SourceResult interrupted(DataSourceStatusProvider.ErrorInfo errorInfo, boolean fdv1Fallback, Function<Void, Void> completionCallback) {
8198
return new FDv2SourceResult(
82-
null,
83-
new Status(State.INTERRUPTED, errorInfo),
84-
ResultType.STATUS,
85-
fdv1Fallback);
99+
null,
100+
new Status(State.INTERRUPTED, errorInfo),
101+
ResultType.STATUS,
102+
fdv1Fallback,
103+
completionCallback);
86104
}
87105

88106
public static FDv2SourceResult shutdown() {
107+
return shutdown(null);
108+
}
109+
110+
public static FDv2SourceResult shutdown(Function<Void, Void> completionCallback) {
89111
return new FDv2SourceResult(null,
90-
new Status(State.SHUTDOWN, null),
91-
ResultType.STATUS,
92-
false);
112+
new Status(State.SHUTDOWN, null),
113+
ResultType.STATUS,
114+
false,
115+
completionCallback);
93116
}
94117

95118
public static FDv2SourceResult terminalError(DataSourceStatusProvider.ErrorInfo errorInfo, boolean fdv1Fallback) {
119+
return terminalError(errorInfo, fdv1Fallback, null);
120+
}
121+
122+
public static FDv2SourceResult terminalError(DataSourceStatusProvider.ErrorInfo errorInfo, boolean fdv1Fallback, Function<Void, Void> completionCallback) {
96123
return new FDv2SourceResult(null,
97-
new Status(State.TERMINAL_ERROR, errorInfo),
98-
ResultType.STATUS,
99-
fdv1Fallback);
124+
new Status(State.TERMINAL_ERROR, errorInfo),
125+
ResultType.STATUS,
126+
fdv1Fallback,
127+
completionCallback);
100128
}
101129

102130
public static FDv2SourceResult changeSet(DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet, boolean fdv1Fallback) {
103-
return new FDv2SourceResult(changeSet, null, ResultType.CHANGE_SET, fdv1Fallback);
131+
return changeSet(changeSet, fdv1Fallback, null);
132+
}
133+
134+
public static FDv2SourceResult changeSet(DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet, boolean fdv1Fallback, Function<Void, Void> completionCallback) {
135+
return new FDv2SourceResult(
136+
changeSet,
137+
null,
138+
ResultType.CHANGE_SET,
139+
fdv1Fallback,
140+
completionCallback);
104141
}
105142

106143
public static FDv2SourceResult goodbye(String reason, boolean fdv1Fallback) {
144+
return goodbye(reason, fdv1Fallback, null);
145+
}
146+
147+
public static FDv2SourceResult goodbye(String reason, boolean fdv1Fallback, Function<Void, Void> completionCallback) {
107148
// TODO: Goodbye reason.
108149
return new FDv2SourceResult(
109-
null,
110-
new Status(State.GOODBYE, null),
111-
ResultType.STATUS,
112-
fdv1Fallback);
150+
null,
151+
new Status(State.GOODBYE, null),
152+
ResultType.STATUS,
153+
fdv1Fallback,
154+
completionCallback);
113155
}
114156

115157
public ResultType getResultType() {
@@ -127,4 +169,31 @@ public DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> getChangeSet() {
127169
public boolean isFdv1Fallback() {
128170
return fdv1Fallback;
129171
}
172+
173+
/**
174+
* Creates a new result wrapping this one with an additional completion callback.
175+
* <p>
176+
* The new completion callback will be invoked when the result is closed, followed by
177+
* the original completion callback (if any).
178+
*
179+
* @param newCallback the completion callback to add
180+
* @return a new FDv2SourceResult with the added completion callback
181+
*/
182+
public FDv2SourceResult withCompletion(Function<Void, Void> newCallback) {
183+
Function<Void, Void> combinedCallback = v -> {
184+
newCallback.apply(null);
185+
if (completionCallback != null) {
186+
completionCallback.apply(null);
187+
}
188+
return null;
189+
};
190+
return new FDv2SourceResult(changeSet, status, resultType, fdv1Fallback, combinedCallback);
191+
}
192+
193+
@Override
194+
public void close() {
195+
if(completionCallback != null) {
196+
completionCallback.apply(null);
197+
}
198+
}
130199
}

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/TestData.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -249,8 +249,8 @@ private void closedInstance(DataSourceImpl instance) {
249249
}
250250

251251
/**
252-
* A builder for feature flag configurations to be used with {@link TestData}.
253-
*
252+
* A builder for feature flag configurations to be used with {@link TestData} and {@link TestDataV2}.
253+
*
254254
* @see TestData#flag(String)
255255
* @see TestData#update(FlagBuilder)
256256
*/
@@ -269,18 +269,20 @@ public static final class FlagBuilder {
269269
final Map<ContextKind, Map<Integer, ImmutableSet<String>>> targets = new TreeMap<>(); // TreeMap enforces ordering for test determinacy
270270
final List<FlagRuleBuilder> rules = new ArrayList<>();
271271

272-
private FlagBuilder(String key) {
272+
FlagBuilder(String key) {
273273
this.key = key;
274274
this.on = true;
275275
this.variations = new CopyOnWriteArrayList<>();
276276
}
277277

278-
private FlagBuilder(FlagBuilder from) {
278+
FlagBuilder(FlagBuilder from) {
279279
this.key = from.key;
280280
this.offVariation = from.offVariation;
281281
this.on = from.on;
282282
this.fallthroughVariation = from.fallthroughVariation;
283283
this.variations = new CopyOnWriteArrayList<>(from.variations);
284+
this.samplingRatio = from.samplingRatio;
285+
this.migrationCheckRatio = from.migrationCheckRatio;
284286
for (ContextKind contextKind: from.targets.keySet()) {
285287
this.targets.put(contextKind, new TreeMap<>(from.targets.get(contextKind)));
286288
}
@@ -811,6 +813,8 @@ private static int variationForBoolean(boolean value) {
811813
* {@link #thenReturn(int)} to finish defining the rule.
812814
*/
813815
public final class FlagRuleBuilder {
816+
// TODO: Move FlagRuleBuilder to TestDataV2 when TestData is deprecated
817+
814818
final List<Clause> clauses = new ArrayList<>();
815819
int variation;
816820

0 commit comments

Comments
 (0)