Skip to content

Commit 8d2d922

Browse files
authored
chore: Add support for FDv2 fallback and recovery. (#113)
Adds support for FDv2 fallback and recovery. This doesn't add support for fallback to FDv1. It also does not finish data source status support. It does some refactoring to limit the complexity of the FDv2DataSource by moving some implementation details regarding synchronizer state handling and conditions out of its implementation. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Implements FDv2 fallback/recovery and refactors synchronizer orchestration. > > - Adds `FDv2DataSourceConditions` with timed `FallbackCondition` (on prolonged `INTERRUPTED`) and `RecoveryCondition` (time-based), and wires them into `FDv2DataSource` run loop > - Extracts synchronizer lifecycle into `SynchronizerStateManager` and `SynchronizerFactoryWithState` for active-source tracking, rotation, and blocking on terminal errors > - `FDv2DataSource` now supports configurable timeouts (defaults: fallback 2m, recovery 5m), applies thread priority, logs via injected `LDLogger`, and returns FDv1 fallback signal (execution still TODO) > - `FDv2DataSystem` constructs `FDv2DataSource` with `threadPriority`, `DataSource` sub-logger, and shared executor; logging refined for polling/streaming components via new names in `Loggers` > - Extensive unit tests added for conditions, data source behavior (fallback/recovery/closure), and synchronizer state manager > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit e6b032e. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 8469d3c commit 8d2d922

13 files changed

+3964
-142
lines changed

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java

Lines changed: 217 additions & 138 deletions
Large diffs are not rendered by default.
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
package com.launchdarkly.sdk.server;
2+
3+
import com.launchdarkly.sdk.server.datasources.FDv2SourceResult;
4+
5+
import java.io.Closeable;
6+
import java.io.IOException;
7+
import java.util.concurrent.CompletableFuture;
8+
import java.util.concurrent.ScheduledExecutorService;
9+
import java.util.concurrent.ScheduledFuture;
10+
import java.util.concurrent.TimeUnit;
11+
12+
/**
13+
* Container class for FDv2 data source conditions and related types.
14+
* <p>
15+
* This class is non-constructable and serves only as a namespace for condition-related types.
16+
* Package-private for internal use and testing.
17+
*/
18+
class FDv2DataSourceConditions {
19+
/**
20+
* Private constructor to prevent instantiation.
21+
*/
22+
private FDv2DataSourceConditions() {
23+
}
24+
25+
/**
26+
* Package-private for testing.
27+
*/
28+
interface Condition extends Closeable {
29+
enum ConditionType {
30+
FALLBACK,
31+
RECOVERY,
32+
}
33+
34+
CompletableFuture<Condition> execute();
35+
36+
void inform(FDv2SourceResult sourceResult);
37+
38+
void close();
39+
40+
ConditionType getType();
41+
}
42+
43+
interface ConditionFactory {
44+
Condition build();
45+
46+
Condition.ConditionType getType();
47+
}
48+
49+
static abstract class TimedCondition implements Condition {
50+
protected final CompletableFuture<Condition> resultFuture = new CompletableFuture<>();
51+
52+
protected final ScheduledExecutorService sharedExecutor;
53+
54+
/**
55+
* Future for the timeout task, if any. Will be null when no timeout is active.
56+
*/
57+
protected ScheduledFuture<Void> timerFuture;
58+
59+
/**
60+
* Timeout duration for the fallback operation.
61+
*/
62+
protected final long timeoutSeconds;
63+
64+
public TimedCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) {
65+
this.sharedExecutor = sharedExecutor;
66+
this.timeoutSeconds = timeoutSeconds;
67+
}
68+
69+
@Override
70+
public CompletableFuture<Condition> execute() {
71+
return resultFuture;
72+
}
73+
74+
@Override
75+
public void close() {
76+
if (timerFuture != null) {
77+
timerFuture.cancel(false);
78+
timerFuture = null;
79+
}
80+
}
81+
82+
static abstract class Factory implements ConditionFactory {
83+
protected final ScheduledExecutorService sharedExecutor;
84+
protected final long timeoutSeconds;
85+
86+
public Factory(ScheduledExecutorService sharedExecutor, long timeout) {
87+
this.sharedExecutor = sharedExecutor;
88+
this.timeoutSeconds = timeout;
89+
}
90+
}
91+
}
92+
93+
/**
94+
* This condition is used to determine if a fallback should be performed. It is informed of each data source result
95+
* via {@link #inform(FDv2SourceResult)}. Based on the results, it updates its internal state. When the fallback
96+
* condition is met, then the {@link java.util.concurrent.Future} returned by {@link #execute()} will complete.
97+
* <p>
98+
* This is package-private, instead of private, for ease of testing.
99+
*/
100+
static class FallbackCondition extends TimedCondition {
101+
static class Factory extends TimedCondition.Factory {
102+
public Factory(ScheduledExecutorService sharedExecutor, long timeout) {
103+
super(sharedExecutor, timeout);
104+
}
105+
106+
@Override
107+
public Condition build() {
108+
return new FallbackCondition(sharedExecutor, timeoutSeconds);
109+
}
110+
111+
@Override
112+
public ConditionType getType() {
113+
return ConditionType.FALLBACK;
114+
}
115+
}
116+
117+
public FallbackCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) {
118+
super(sharedExecutor, timeoutSeconds);
119+
}
120+
121+
@Override
122+
public void inform(FDv2SourceResult sourceResult) {
123+
if (sourceResult.getResultType() == FDv2SourceResult.ResultType.CHANGE_SET) {
124+
if (timerFuture != null) {
125+
timerFuture.cancel(false);
126+
timerFuture = null;
127+
}
128+
}
129+
if (sourceResult.getResultType() == FDv2SourceResult.ResultType.STATUS && sourceResult.getStatus().getState() == FDv2SourceResult.State.INTERRUPTED) {
130+
if (timerFuture == null) {
131+
timerFuture = sharedExecutor.schedule(() -> {
132+
resultFuture.complete(this);
133+
return null;
134+
}, timeoutSeconds, TimeUnit.SECONDS);
135+
}
136+
}
137+
}
138+
139+
@Override
140+
public ConditionType getType() {
141+
return ConditionType.FALLBACK;
142+
}
143+
}
144+
145+
static class RecoveryCondition extends TimedCondition {
146+
147+
static class Factory extends TimedCondition.Factory {
148+
public Factory(ScheduledExecutorService sharedExecutor, long timeout) {
149+
super(sharedExecutor, timeout);
150+
}
151+
152+
@Override
153+
public Condition build() {
154+
return new RecoveryCondition(sharedExecutor, timeoutSeconds);
155+
}
156+
157+
@Override
158+
public ConditionType getType() {
159+
return ConditionType.RECOVERY;
160+
}
161+
}
162+
163+
public RecoveryCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) {
164+
super(sharedExecutor, timeoutSeconds);
165+
timerFuture = sharedExecutor.schedule(() -> {
166+
resultFuture.complete(this);
167+
return null;
168+
}, timeoutSeconds, TimeUnit.SECONDS);
169+
}
170+
171+
@Override
172+
public void inform(FDv2SourceResult sourceResult) {
173+
// Time-based recovery.
174+
}
175+
176+
@Override
177+
public ConditionType getType() {
178+
return ConditionType.RECOVERY;
179+
}
180+
}
181+
}

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSystem.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,10 @@ static FDv2DataSystem create(
157157
DataSource dataSource = new FDv2DataSource(
158158
initializerFactories,
159159
synchronizerFactories,
160-
dataSourceUpdates
160+
dataSourceUpdates,
161+
config.threadPriority,
162+
clientContext.getBaseLogger().subLogger(Loggers.DATA_SOURCE_LOGGER_NAME),
163+
clientContext.sharedExecutor
161164
);
162165
DataSourceStatusProvider dataSourceStatusProvider = new DataSourceStatusProviderImpl(
163166
dataSourceStatusBroadcaster,

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/Loggers.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,7 @@ private Loggers() {}
2424
static final String EVALUATION_LOGGER_NAME = "Evaluation";
2525
static final String EVENTS_LOGGER_NAME = "Events";
2626
static final String HOOKS_LOGGER_NAME = "Hooks";
27+
static final String STREAMING_SYNCHRONIZER = "StreamingSynchronizer";
28+
static final String POLLING_SYNCHRONIZER = "PollingSynchronizer";
29+
static final String POLLING_INITIALIZER = "PollingInitializer";
2730
}

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingInitializerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class PollingInitializerImpl extends PollingBase implements Initializer {
1212
private final SelectorSource selectorSource;
1313

1414
public PollingInitializerImpl(FDv2Requestor requestor, LDLogger logger, SelectorSource selectorSource) {
15-
super(requestor, logger);
15+
super(requestor, logger.subLogger(Loggers.POLLING_INITIALIZER));
1616
this.selectorSource = selectorSource;
1717
}
1818

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingSynchronizerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public PollingSynchronizerImpl(
2323
ScheduledExecutorService sharedExecutor,
2424
Duration pollInterval
2525
) {
26-
super(requestor, logger);
26+
super(requestor, logger.subLogger(Loggers.POLLING_SYNCHRONIZER));
2727
this.selectorSource = selectorSource;
2828

2929
synchronized (this) {

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public StreamingSynchronizerImpl(
7171
) {
7272
this.httpProperties = httpProperties;
7373
this.selectorSource = selectorSource;
74-
this.logger = logger;
74+
this.logger = logger.subLogger(Loggers.STREAMING_SYNCHRONIZER);
7575
this.payloadFilter = payloadFilter;
7676
this.streamUri = HttpHelpers.concatenateUriPath(baseUri, requestPath);
7777
this.initialReconnectDelay = initialReconnectDelaySeconds;
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.launchdarkly.sdk.server;
2+
3+
import com.launchdarkly.sdk.server.datasources.Synchronizer;
4+
5+
class SynchronizerFactoryWithState {
6+
public enum State {
7+
/**
8+
* This synchronizer is available to use.
9+
*/
10+
Available,
11+
12+
/**
13+
* This synchronizer is no longer available to use.
14+
*/
15+
Blocked
16+
}
17+
18+
private final FDv2DataSource.DataSourceFactory<Synchronizer> factory;
19+
20+
private State state = State.Available;
21+
22+
23+
public SynchronizerFactoryWithState(FDv2DataSource.DataSourceFactory<Synchronizer> factory) {
24+
this.factory = factory;
25+
}
26+
27+
public State getState() {
28+
return state;
29+
}
30+
31+
public void block() {
32+
state = State.Blocked;
33+
}
34+
35+
public Synchronizer build() {
36+
return factory.build();
37+
}
38+
}

0 commit comments

Comments
 (0)