Skip to content

Commit a383fc9

Browse files
committed
Threading and tests.
1 parent 9c43cbb commit a383fc9

File tree

4 files changed

+40
-26
lines changed

4 files changed

+40
-26
lines changed

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingSynchronizerImpl.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,12 @@ public PollingSynchronizerImpl(
3838
private void doPoll() {
3939
try {
4040
FDv2SourceResult res = poll(selectorSource.getSelector(), false).get();
41-
switch(res.getResultType()) {
41+
boolean shouldShutdown = false;
42+
switch (res.getResultType()) {
4243
case CHANGE_SET:
4344
break;
4445
case STATUS:
45-
switch(res.getStatus().getState()) {
46+
switch (res.getStatus().getState()) {
4647
case INTERRUPTED:
4748
break;
4849
case SHUTDOWN:
@@ -54,6 +55,7 @@ private void doPoll() {
5455
task.cancel(true);
5556
}
5657
internalShutdown();
58+
shouldShutdown = true;
5759
break;
5860
case GOODBYE:
5961
// We don't need to take any action, as the connection for the poll
@@ -63,7 +65,11 @@ private void doPoll() {
6365
}
6466
break;
6567
}
66-
resultQueue.put(res);
68+
if (shouldShutdown) {
69+
shutdownFuture.complete(res);
70+
} else {
71+
resultQueue.put(res);
72+
}
6773
} catch (InterruptedException | ExecutionException e) {
6874
// TODO: Determine if handling is needed.
6975
}

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ class StreamingSynchronizerImpl implements Synchronizer {
5252
private final String payloadFilter;
5353
private final IterableAsyncQueue<FDv2SourceResult> resultQueue = new IterableAsyncQueue<>();
5454
private final CompletableFuture<FDv2SourceResult> shutdownFuture = new CompletableFuture<>();
55-
private final AtomicBoolean closed = new AtomicBoolean(false);
55+
private boolean closed = false;
56+
private final Object closeLock = new Object();
5657
private final FDv2ProtocolHandler protocolHandler = new FDv2ProtocolHandler();
5758
private volatile EventSource eventSource;
5859
final Duration initialReconnectDelay;
@@ -152,8 +153,10 @@ private Thread getRunThread() {
152153
// So we have to assume something is wrong that we can't recover from at this point,
153154
// and just let the thread terminate. That's better than having the thread be killed
154155
// by an uncaught exception.
155-
if (closed.get()) {
156-
return; // ignore any exception that's just a side effect of stopping the EventSource
156+
synchronized (closeLock) {
157+
if (closed) {
158+
return;
159+
}
157160
}
158161
logger.error("Stream thread ended with exception: {}", LogValues.exceptionSummary(e));
159162
logger.debug(LogValues.exceptionTrace(e));
@@ -181,29 +184,34 @@ private Thread getRunThread() {
181184
@Override
182185
public CompletableFuture<FDv2SourceResult> next() {
183186
// If we are already closed, don't start the stream.
184-
if (!started.getAndSet(true) && !closed.get()) {
185-
startStream();
187+
synchronized (closeLock) {
188+
if (!closed) {
189+
if (!started.getAndSet(true)) {
190+
startStream();
191+
}
192+
}
186193
}
194+
187195
return CompletableFuture.anyOf(shutdownFuture, resultQueue.take())
188196
.thenApply(result -> (FDv2SourceResult) result);
189197
}
190198

191199
@Override
192200
public void close() {
193-
if (closed.getAndSet(true)) {
194-
return; // already shutdown
195-
}
196-
197-
shutdownFuture.complete(FDv2SourceResult.shutdown());
201+
synchronized (closeLock){
202+
closed = true;
198203

199-
// If the synchronizer was never started, then the event source could be null.
200-
if (eventSource != null) {
201-
try {
202-
eventSource.close();
203-
} catch (Exception e) {
204-
logger.debug("Error closing event source during shutdown: {}", LogValues.exceptionSummary(e));
204+
// If the synchronizer was never started, then the event source could be null.
205+
if (eventSource != null) {
206+
try {
207+
eventSource.close();
208+
} catch (Exception e) {
209+
logger.debug("Error closing event source during shutdown: {}", LogValues.exceptionSummary(e));
210+
}
205211
}
206212
}
213+
214+
shutdownFuture.complete(FDv2SourceResult.shutdown());
207215
}
208216

209217
private boolean handleEvent(StreamEvent event) {
@@ -338,7 +346,7 @@ private boolean handleError(StreamException e) {
338346
"will retry");
339347

340348
if (!recoverable) {
341-
resultQueue.put(FDv2SourceResult.terminalError(errorInfo));
349+
shutdownFuture.complete(FDv2SourceResult.terminalError(errorInfo));
342350
return false;
343351
} else {
344352
// Queue as INTERRUPTED to indicate temporary failure

lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PollingInitializerImplTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ public void internalErrorWithInvalidDataKind() throws Exception {
339339

340340
// Create a response with malformed payload-transferred event. `state->states`.
341341
// This will trigger JSON_ERROR internal error which maps to INVALID_DATA
342-
String malformedPutObjectJson = "{\n" +
342+
String malformedPayloadTransferred = "{\n" +
343343
" \"events\": [\n" +
344344
" {\n" +
345345
" \"event\": \"server-intent\",\n" +
@@ -367,7 +367,7 @@ public void internalErrorWithInvalidDataKind() throws Exception {
367367
"}";
368368

369369
FDv2Requestor.FDv2PayloadResponse response = new FDv2Requestor.FDv2PayloadResponse(
370-
com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray(malformedPutObjectJson),
370+
com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray(malformedPayloadTransferred),
371371
okhttp3.Headers.of()
372372
);
373373

lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PollingSynchronizerImplTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -733,8 +733,8 @@ public void internalErrorWithInvalidDataKindContinuesPolling() throws Exception
733733
when(requestor.Poll(any(Selector.class))).thenAnswer(invocation -> {
734734
int count = callCount.incrementAndGet();
735735
if (count == 1) {
736-
// First call returns response with malformed put-object which triggers INTERNAL_ERROR (INVALID_DATA)
737-
String malformedPutObjectJson = "{\n" +
736+
// First call returns response with malformed payload transfer. state->states
737+
String malformedPayloadTransferred = "{\n" +
738738
" \"events\": [\n" +
739739
" {\n" +
740740
" \"event\": \"server-intent\",\n" +
@@ -750,7 +750,7 @@ public void internalErrorWithInvalidDataKindContinuesPolling() throws Exception
750750
" {\n" +
751751
" \"event\": \"payload-transferred\",\n" +
752752
" \"data\": {\n" +
753-
" \"state\": \"(p:payload-1:100)\",\n" +
753+
" \"states\": \"(p:payload-1:100)\",\n" +
754754
" \"version\": 100\n" +
755755
" }\n" +
756756
" },\n" +
@@ -762,7 +762,7 @@ public void internalErrorWithInvalidDataKindContinuesPolling() throws Exception
762762
"}";
763763

764764
return CompletableFuture.completedFuture(new FDv2Requestor.FDv2PayloadResponse(
765-
com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray(malformedPutObjectJson),
765+
com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray(malformedPayloadTransferred),
766766
okhttp3.Headers.of()
767767
));
768768
} else {

0 commit comments

Comments
 (0)