Skip to content

Commit b429eba

Browse files
committed
Basic streaming synchronizer.
1 parent 49c6008 commit b429eba

File tree

4 files changed

+58
-47
lines changed

4 files changed

+58
-47
lines changed

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DefaultFDv2Requestor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ public void onResponse(@Nonnull Call call, @Nonnull Response response) {
150150
response.close();
151151
}
152152
}
153-
});
153+
});
154154

155155
} catch (Exception e) {
156156
future.completeExceptionally(e);

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingBase.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,21 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
124124
case NONE:
125125
break;
126126
case INTERNAL_ERROR: {
127+
FDv2ProtocolHandler.FDv2ActionInternalError internalErrorAction = (FDv2ProtocolHandler.FDv2ActionInternalError) res;
128+
DataSourceStatusProvider.ErrorKind kind = DataSourceStatusProvider.ErrorKind.UNKNOWN;
129+
switch (internalErrorAction.getErrorType()) {
130+
131+
case MISSING_PAYLOAD:
132+
case JSON_ERROR:
133+
kind = DataSourceStatusProvider.ErrorKind.INVALID_DATA;
134+
break;
135+
case UNKNOWN_EVENT:
136+
case IMPLEMENTATION_ERROR:
137+
case PROTOCOL_ERROR:
138+
break;
139+
}
127140
DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo(
128-
DataSourceStatusProvider.ErrorKind.UNKNOWN,
141+
kind,
129142
0,
130143
"Internal error occurred during polling",
131144
new Date().toInstant());

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java

Lines changed: 32 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ public CompletableFuture<FDv2SourceResult> next() {
160160
}
161161

162162
@Override
163-
public void shutdown() {
163+
public void close() {
164164
if (shutdownRequested.getAndSet(true)) {
165165
return; // already shutdown
166166
}
@@ -198,17 +198,7 @@ private void handleMessage(MessageEvent event) {
198198
} catch (Exception e) {
199199
// Protocol handler threw exception processing the event - treat as invalid data
200200
logger.error("FDv2 protocol handler error: {}", LogValues.exceptionSummary(e));
201-
logger.debug(LogValues.exceptionTrace(e));
202-
DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo(
203-
DataSourceStatusProvider.ErrorKind.INVALID_DATA,
204-
0,
205-
e.toString(),
206-
Instant.now()
207-
);
208-
resultQueue.put(FDv2SourceResult.interrupted(errorInfo));
209-
if (eventSource != null) {
210-
eventSource.interrupt(); // restart the stream
211-
}
201+
interruptedWithException(e, DataSourceStatusProvider.ErrorKind.INVALID_DATA);
212202
return;
213203
}
214204

@@ -253,8 +243,21 @@ private void handleMessage(MessageEvent event) {
253243
break;
254244

255245
case INTERNAL_ERROR:
246+
FDv2ProtocolHandler.FDv2ActionInternalError internalErrorAction = (FDv2ProtocolHandler.FDv2ActionInternalError) action;
247+
DataSourceStatusProvider.ErrorKind kind = DataSourceStatusProvider.ErrorKind.UNKNOWN;
248+
switch(internalErrorAction.getErrorType()) {
249+
250+
case MISSING_PAYLOAD:
251+
case JSON_ERROR:
252+
kind = DataSourceStatusProvider.ErrorKind.INVALID_DATA;
253+
break;
254+
case UNKNOWN_EVENT:
255+
case IMPLEMENTATION_ERROR:
256+
case PROTOCOL_ERROR:
257+
break;
258+
}
256259
DataSourceStatusProvider.ErrorInfo internalError = new DataSourceStatusProvider.ErrorInfo(
257-
DataSourceStatusProvider.ErrorKind.INVALID_DATA,
260+
kind,
258261
0,
259262
"Internal error during FDv2 event processing",
260263
Instant.now()
@@ -281,34 +284,30 @@ private void handleMessage(MessageEvent event) {
281284
}
282285
} catch (SerializationException e) {
283286
logger.error("Failed to parse FDv2 event: {}", LogValues.exceptionSummary(e));
284-
logger.debug(LogValues.exceptionTrace(e));
285-
DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo(
286-
DataSourceStatusProvider.ErrorKind.INVALID_DATA,
287-
0,
288-
e.toString(),
289-
Instant.now()
290-
);
291-
// Queue as INTERRUPTED, not TERMINAL_ERROR, so we can continue processing other events
292-
resultQueue.put(FDv2SourceResult.interrupted(errorInfo));
293-
if (eventSource != null) {
294-
eventSource.interrupt(); // restart the stream
295-
}
287+
interruptedWithException(e, DataSourceStatusProvider.ErrorKind.INVALID_DATA);
296288
} catch (Exception e) {
297289
logger.error("Unexpected error handling stream message: {}", LogValues.exceptionSummary(e));
298-
logger.debug(LogValues.exceptionTrace(e));
299-
DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo(
300-
DataSourceStatusProvider.ErrorKind.UNKNOWN,
301-
0,
302-
e.toString(),
303-
Instant.now()
304-
);
305-
resultQueue.put(FDv2SourceResult.interrupted(errorInfo));
290+
interruptedWithException(e, DataSourceStatusProvider.ErrorKind.UNKNOWN);
306291
if (eventSource != null) {
307292
eventSource.interrupt(); // restart the stream
308293
}
309294
}
310295
}
311296

297+
private void interruptedWithException(Exception e, DataSourceStatusProvider.ErrorKind kind) {
298+
logger.debug(LogValues.exceptionTrace(e));
299+
DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo(
300+
kind,
301+
0,
302+
e.toString(),
303+
Instant.now()
304+
);
305+
resultQueue.put(FDv2SourceResult.interrupted(errorInfo));
306+
if (eventSource != null) {
307+
eventSource.interrupt(); // restart the stream
308+
}
309+
}
310+
312311
private boolean handleError(StreamException e) {
313312
if (e instanceof StreamClosedByCallerException) {
314313
// We closed it ourselves (shutdown was called)

lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import static org.mockito.Mockito.verify;
2929
import static org.mockito.Mockito.when;
3030

31-
@SuppressWarnings("javadoc")
3231
public class StreamingSynchronizerImplTest extends BaseTest {
3332

3433
private SelectorSource mockSelectorSource() {
@@ -83,7 +82,7 @@ public void receivesMultipleChangesets() throws Exception {
8382
assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result2.getResultType());
8483
assertNotNull(result2.getChangeSet());
8584

86-
synchronizer.shutdown();
85+
synchronizer.close();
8786
}
8887
}
8988

@@ -109,7 +108,7 @@ public void httpNonRecoverableError() throws Exception {
109108
assertEquals(FDv2SourceResult.State.TERMINAL_ERROR, result.getStatus().getState());
110109
assertEquals(DataSourceStatusProvider.ErrorKind.ERROR_RESPONSE, result.getStatus().getErrorInfo().getKind());
111110

112-
synchronizer.shutdown();
111+
synchronizer.close();
113112
}
114113
}
115114

@@ -135,7 +134,7 @@ public void httpRecoverableError() throws Exception {
135134
assertEquals(FDv2SourceResult.State.INTERRUPTED, result.getStatus().getState());
136135
assertEquals(DataSourceStatusProvider.ErrorKind.ERROR_RESPONSE, result.getStatus().getErrorInfo().getKind());
137136

138-
synchronizer.shutdown();
137+
synchronizer.close();
139138
}
140139
}
141140

@@ -161,7 +160,7 @@ public void networkError() throws Exception {
161160
assertEquals(FDv2SourceResult.State.INTERRUPTED, result.getStatus().getState());
162161
assertEquals(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, result.getStatus().getErrorInfo().getKind());
163162

164-
synchronizer.shutdown();
163+
synchronizer.close();
165164
}
166165

167166
@Test
@@ -192,7 +191,7 @@ public void invalidEventData() throws Exception {
192191
assertEquals(FDv2SourceResult.State.INTERRUPTED, result.getStatus().getState());
193192
assertEquals(DataSourceStatusProvider.ErrorKind.INVALID_DATA, result.getStatus().getErrorInfo().getKind());
194193

195-
synchronizer.shutdown();
194+
synchronizer.close();
196195
}
197196
}
198197

@@ -217,7 +216,7 @@ public void shutdownBeforeEventReceived() throws Exception {
217216

218217
// Wait a bit then shutdown
219218
Thread.sleep(100);
220-
synchronizer.shutdown();
219+
synchronizer.close();
221220

222221
FDv2SourceResult result = nextFuture.get(5, TimeUnit.SECONDS);
223222

@@ -257,7 +256,7 @@ public void shutdownAfterEventReceived() throws Exception {
257256
assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType());
258257

259258
// Shutdown after receiving event should still work
260-
synchronizer.shutdown();
259+
synchronizer.close();
261260
}
262261
}
263262

@@ -288,7 +287,7 @@ public void goodbyeEventInResponse() throws Exception {
288287
assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType());
289288
assertEquals(FDv2SourceResult.State.GOODBYE, result.getStatus().getState());
290289

291-
synchronizer.shutdown();
290+
synchronizer.close();
292291
}
293292
}
294293

@@ -324,7 +323,7 @@ public void heartbeatEvent() throws Exception {
324323
assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType());
325324
assertNotNull(result.getChangeSet());
326325

327-
synchronizer.shutdown();
326+
synchronizer.close();
328327
}
329328
}
330329

@@ -367,7 +366,7 @@ public void selectorWithVersionAndState() throws Exception {
367366
assertThat(request.getQuery(), containsString("version=50"));
368367
assertThat(request.getQuery(), containsString("state="));
369368

370-
synchronizer.shutdown();
369+
synchronizer.close();
371370
}
372371
}
373372

@@ -436,7 +435,7 @@ public void selectorRefetchedOnReconnection() throws Exception {
436435
// Verify we made at least 2 requests
437436
assertTrue("Should have made at least 2 requests", server.getRecorder().count() >= 2);
438437

439-
synchronizer.shutdown();
438+
synchronizer.close();
440439
}
441440
}
442441
}

0 commit comments

Comments
 (0)