Skip to content

Commit cb01449

Browse files
authored
feat: Support stream init diagnostics events for FDv2. (#133)
<!-- CURSOR_SUMMARY --> > [!NOTE] > **Medium Risk** > Touches FDv2 streaming lifecycle/error handling and restart behavior, which could affect reconnection and initialization semantics; changes are guarded by extensive new unit tests and are largely additive (diagnostic recording). > > **Overview** > Adds FDv2 streaming *stream-init diagnostics* by plumbing a `DiagnosticStore` into `StreamingSynchronizerImpl` (via `DataSystemComponents`) and recording init timestamp/duration with a `failed` flag. > > Updates streaming error/restart paths to distinguish deliberate restarts (e.g. `goodbye`/caller-initiated) from failures, reset timing after the first successful changeset, and record failures on conversion/protocol/parsing errors and unexpected thread termination. Tests are expanded to validate diagnostics across success, HTTP/network errors, invalid data, and repeated restarts (including null-store safety). > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit ee478d9. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 6b1d9fc commit cb01449

File tree

3 files changed

+593
-35
lines changed

3 files changed

+593
-35
lines changed

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSystemComponents.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ public Synchronizer build(DataSourceBuildInputs context) {
9898
context.getSelectorSource(),
9999
payloadFilter,
100100
initialReconnectDelay,
101-
context.getThreadPriority()
101+
context.getThreadPriority(),
102+
context.getDiagnosticStore()
102103
);
103104
}
104105
}

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.launchdarkly.logging.LDLogger;
1414
import com.launchdarkly.logging.LogValues;
1515
import com.launchdarkly.sdk.internal.collections.IterableAsyncQueue;
16+
import com.launchdarkly.sdk.internal.events.DiagnosticStore;
1617
import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event;
1718
import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ProtocolHandler;
1819
import com.launchdarkly.sdk.internal.fdv2.sources.Selector;
@@ -62,6 +63,8 @@ class StreamingSynchronizerImpl implements Synchronizer {
6263
private final AtomicBoolean started = new AtomicBoolean(false);
6364

6465
private final int threadPriority;
66+
private final DiagnosticStore diagnosticStore;
67+
private volatile long streamStarted = 0;
6568

6669
public StreamingSynchronizerImpl(
6770
HttpProperties httpProperties,
@@ -71,7 +74,8 @@ public StreamingSynchronizerImpl(
7174
SelectorSource selectorSource,
7275
String payloadFilter,
7376
Duration initialReconnectDelaySeconds,
74-
int threadPriority
77+
int threadPriority,
78+
DiagnosticStore diagnosticStore
7579
) {
7680
this.httpProperties = httpProperties;
7781
this.selectorSource = selectorSource;
@@ -80,6 +84,7 @@ public StreamingSynchronizerImpl(
8084
this.streamUri = HttpHelpers.concatenateUriPath(baseUri, requestPath);
8185
this.initialReconnectDelay = initialReconnectDelaySeconds;
8286
this.threadPriority = threadPriority;
87+
this.diagnosticStore = diagnosticStore;
8388

8489
// The stream will lazily start when `next` is called.
8590
}
@@ -143,6 +148,7 @@ private void startStream() {
143148
@NotNull
144149
private Thread getRunThread() {
145150
Thread thread = new Thread(() -> {
151+
streamStarted = System.currentTimeMillis();
146152
try {
147153
for (StreamEvent event : eventSource.anyEvents()) {
148154
if (!handleEvent(event)) {
@@ -163,6 +169,8 @@ private Thread getRunThread() {
163169
logger.error("Stream thread ended with exception: {}", LogValues.exceptionSummary(e));
164170
logger.debug(LogValues.exceptionTrace(e));
165171

172+
recordStreamInit(true); // Record failed init for unexpected thread exception
173+
166174
DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo(
167175
DataSourceStatusProvider.ErrorKind.UNKNOWN,
168176
0,
@@ -215,6 +223,13 @@ public void close() {
215223
shutdownFuture.complete(FDv2SourceResult.shutdown());
216224
}
217225

226+
private void recordStreamInit(boolean failed) {
227+
if (diagnosticStore != null && streamStarted != 0) {
228+
diagnosticStore.recordStreamInit(streamStarted,
229+
System.currentTimeMillis() - streamStarted, failed);
230+
}
231+
}
232+
218233
private boolean handleEvent(StreamEvent event) {
219234
if (event instanceof MessageEvent) {
220235
handleMessage((MessageEvent) event);
@@ -259,6 +274,9 @@ private void handleMessage(MessageEvent event) {
259274
logger,
260275
event.getHeaders().value(HeaderConstants.ENVIRONMENT_ID.getHeaderName()),
261276
true);
277+
recordStreamInit(false);
278+
// Reset to 0 after successful init to prevent duplicate recordings until next restart
279+
streamStarted = 0;
262280
result = FDv2SourceResult.changeSet(converted, getFallback(event));
263281
} catch (Exception e) {
264282
logger.error("Failed to convert FDv2 changeset: {}", LogValues.exceptionSummary(e));
@@ -270,7 +288,7 @@ private void handleMessage(MessageEvent event) {
270288
Instant.now()
271289
);
272290
result = FDv2SourceResult.interrupted(conversionError, getFallback(event));
273-
restartStream();
291+
restartStream(true);
274292
}
275293
break;
276294

@@ -286,7 +304,7 @@ private void handleMessage(MessageEvent event) {
286304
logger.info("Goodbye was received from the LaunchDarkly connection with reason: '{}'.", reason);
287305
result = FDv2SourceResult.goodbye(reason, getFallback(event));
288306
// We drop this current connection and attempt to restart the stream.
289-
restartStream();
307+
restartStream(false); // Not a failure - deliberate server-initiated restart
290308
break;
291309

292310
case INTERNAL_ERROR:
@@ -310,7 +328,7 @@ private void handleMessage(MessageEvent event) {
310328
);
311329
result = FDv2SourceResult.interrupted(internalError, getFallback(event));
312330
if(kind == DataSourceStatusProvider.ErrorKind.INVALID_DATA) {
313-
restartStream();
331+
restartStream(true);
314332
}
315333
break;
316334

@@ -333,12 +351,16 @@ private void interruptedWithException(Exception e, DataSourceStatusProvider.Erro
333351
Instant.now()
334352
);
335353
resultQueue.put(FDv2SourceResult.interrupted(errorInfo, getFallback(event)));
336-
restartStream();
354+
restartStream(true);
337355
}
338356

339357
private boolean handleError(StreamException e) {
358+
// Check if this was a deliberate shutdown/restart rather than an actual error
359+
boolean streamFailed = !(e instanceof StreamClosedByCallerException);
360+
recordStreamInit(streamFailed);
361+
340362
if (e instanceof StreamClosedByCallerException) {
341-
// We closed it ourselves (shutdown was called)
363+
// We closed it ourselves (shutdown was called or stream was deliberately restarted)
342364
return false;
343365
}
344366

@@ -358,6 +380,7 @@ private boolean handleError(StreamException e) {
358380
} else {
359381
// Queue as INTERRUPTED to indicate temporary failure
360382
resultQueue.put(FDv2SourceResult.interrupted(errorInfo, getFallback(e)));
383+
streamStarted = System.currentTimeMillis();
361384
return true; // allow reconnect
362385
}
363386
}
@@ -372,11 +395,14 @@ private boolean handleError(StreamException e) {
372395
Instant.now()
373396
);
374397
resultQueue.put(FDv2SourceResult.interrupted(errorInfo, getFallback(e)));
398+
streamStarted = System.currentTimeMillis();
375399
return true; // allow reconnect
376400
}
377401

378-
private void restartStream() {
402+
private void restartStream(boolean failed) {
379403
Objects.requireNonNull(eventSource, "eventSource must not be null");
404+
recordStreamInit(failed);
405+
streamStarted = System.currentTimeMillis();
380406
eventSource.interrupt();
381407
protocolHandler.reset();
382408
}

0 commit comments

Comments
 (0)