Skip to content

Commit 117b85c

Browse files
committed
Async iterable tests.
1 parent 201a92c commit 117b85c

File tree

3 files changed

+352
-8
lines changed

3 files changed

+352
-8
lines changed

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/IterableAsyncQueue.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@ class IterableAsyncQueue<T> {
77
private final Object lock = new Object();
88
private final LinkedList<T> queue = new LinkedList<>();
99

10-
private CompletableFuture<T> nextFuture = null;
10+
private final LinkedList<CompletableFuture<T>> pendingFutures = new LinkedList<>();
1111

1212
public void put(T item) {
1313
synchronized (lock) {
14+
CompletableFuture<T> nextFuture = pendingFutures.pollFirst();
1415
if(nextFuture != null) {
1516
nextFuture.complete(item);
16-
nextFuture = null;
1717
return;
1818
}
1919
queue.addLast(item);
@@ -24,10 +24,9 @@ public CompletableFuture<T> take() {
2424
if(!queue.isEmpty()) {
2525
return CompletableFuture.completedFuture(queue.removeFirst());
2626
}
27-
if (nextFuture == null) {
28-
nextFuture = new CompletableFuture<>();
29-
}
30-
return nextFuture;
27+
CompletableFuture<T> takeFuture = new CompletableFuture<>();
28+
pendingFutures.addLast(takeFuture);
29+
return takeFuture;
3130
}
3231
}
3332
}

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingBase.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,12 +133,14 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
133133
}
134134
}
135135
}
136-
return FDv2SourceResult.terminalError(new DataSourceStatusProvider.ErrorInfo(
136+
137+
DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo(
137138
DataSourceStatusProvider.ErrorKind.UNKNOWN,
138139
0,
139140
"Unexpected end of polling response",
140141
new Date().toInstant()
141-
));
142+
);
143+
return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info);
142144
}));
143145
}
144146
}
Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
package com.launchdarkly.sdk.server;
2+
3+
import org.junit.Test;
4+
5+
import java.util.ArrayList;
6+
import java.util.List;
7+
import java.util.concurrent.CompletableFuture;
8+
import java.util.concurrent.CountDownLatch;
9+
import java.util.concurrent.ExecutorService;
10+
import java.util.concurrent.Executors;
11+
import java.util.concurrent.TimeUnit;
12+
import java.util.concurrent.atomic.AtomicInteger;
13+
14+
import static org.junit.Assert.*;
15+
16+
@SuppressWarnings("javadoc")
17+
public class IterableAsyncQueueTest {
18+
19+
@Test
20+
public void putThenTakeReturnsImmediately() throws Exception {
21+
IterableAsyncQueue<String> queue = new IterableAsyncQueue<>();
22+
23+
queue.put("item1");
24+
25+
CompletableFuture<String> future = queue.take();
26+
assertTrue("Future should be completed immediately", future.isDone());
27+
assertEquals("item1", future.get());
28+
}
29+
30+
@Test
31+
public void takeThenPutCompletesWaitingFuture() throws Exception {
32+
IterableAsyncQueue<String> queue = new IterableAsyncQueue<>();
33+
34+
CompletableFuture<String> future = queue.take();
35+
assertFalse("Future should not be completed yet", future.isDone());
36+
37+
queue.put("item1");
38+
39+
assertTrue("Future should be completed after put", future.isDone());
40+
assertEquals("item1", future.get());
41+
}
42+
43+
@Test
44+
public void multiplePutsThenMultipleTakesPreservesOrder() throws Exception {
45+
IterableAsyncQueue<Integer> queue = new IterableAsyncQueue<>();
46+
47+
// Put multiple items
48+
queue.put(1);
49+
queue.put(2);
50+
queue.put(3);
51+
52+
// Take them in order
53+
assertEquals(Integer.valueOf(1), queue.take().get());
54+
assertEquals(Integer.valueOf(2), queue.take().get());
55+
assertEquals(Integer.valueOf(3), queue.take().get());
56+
}
57+
58+
@Test
59+
public void multipleTakesThenMultiplePutsCompletesInOrder() throws Exception {
60+
IterableAsyncQueue<Integer> queue = new IterableAsyncQueue<>();
61+
62+
// Multiple takes when queue is empty
63+
CompletableFuture<Integer> future1 = queue.take();
64+
CompletableFuture<Integer> future2 = queue.take();
65+
CompletableFuture<Integer> future3 = queue.take();
66+
67+
assertFalse(future1.isDone());
68+
assertFalse(future2.isDone());
69+
assertFalse(future3.isDone());
70+
71+
// Put items - should complete futures in FIFO order
72+
queue.put(1);
73+
assertTrue("First future should be completed", future1.isDone());
74+
assertFalse("Second future should not be completed yet", future2.isDone());
75+
assertFalse("Third future should not be completed yet", future3.isDone());
76+
assertEquals(Integer.valueOf(1), future1.get());
77+
78+
queue.put(2);
79+
assertTrue("Second future should be completed", future2.isDone());
80+
assertFalse("Third future should not be completed yet", future3.isDone());
81+
assertEquals(Integer.valueOf(2), future2.get());
82+
83+
queue.put(3);
84+
assertTrue("Third future should be completed", future3.isDone());
85+
assertEquals(Integer.valueOf(3), future3.get());
86+
}
87+
88+
@Test
89+
public void interleavedPutAndTakeOperations() throws Exception {
90+
IterableAsyncQueue<String> queue = new IterableAsyncQueue<>();
91+
92+
// Put one
93+
queue.put("a");
94+
assertEquals("a", queue.take().get());
95+
96+
// Take when empty, then put
97+
CompletableFuture<String> future = queue.take();
98+
assertFalse(future.isDone());
99+
queue.put("b");
100+
assertEquals("b", future.get());
101+
102+
// Put multiple, take one, put one more, take remaining
103+
queue.put("c");
104+
queue.put("d");
105+
assertEquals("c", queue.take().get());
106+
queue.put("e");
107+
assertEquals("d", queue.take().get());
108+
assertEquals("e", queue.take().get());
109+
}
110+
111+
@Test
112+
public void concurrentProducersAndConsumers() throws Exception {
113+
IterableAsyncQueue<Integer> queue = new IterableAsyncQueue<>();
114+
int itemCount = 1000;
115+
int producerThreads = 5;
116+
int consumerThreads = 5;
117+
118+
ExecutorService executor = Executors.newFixedThreadPool(producerThreads + consumerThreads);
119+
CountDownLatch producerLatch = new CountDownLatch(producerThreads);
120+
CountDownLatch consumerLatch = new CountDownLatch(consumerThreads);
121+
122+
List<Integer> consumedItems = new ArrayList<>();
123+
Object consumedLock = new Object();
124+
125+
// Start producers
126+
for (int t = 0; t < producerThreads; t++) {
127+
final int threadId = t;
128+
executor.submit(() -> {
129+
try {
130+
for (int i = 0; i < itemCount / producerThreads; i++) {
131+
queue.put(threadId * 1000 + i);
132+
Thread.sleep(1); // Small delay to encourage interleaving
133+
}
134+
} catch (InterruptedException e) {
135+
Thread.currentThread().interrupt();
136+
} finally {
137+
producerLatch.countDown();
138+
}
139+
});
140+
}
141+
142+
// Start consumers
143+
for (int t = 0; t < consumerThreads; t++) {
144+
executor.submit(() -> {
145+
try {
146+
for (int i = 0; i < itemCount / consumerThreads; i++) {
147+
Integer item = queue.take().get(5, TimeUnit.SECONDS);
148+
synchronized (consumedLock) {
149+
consumedItems.add(item);
150+
}
151+
}
152+
} catch (Exception e) {
153+
throw new RuntimeException(e);
154+
} finally {
155+
consumerLatch.countDown();
156+
}
157+
});
158+
}
159+
160+
// Wait for completion
161+
assertTrue("Producers should complete", producerLatch.await(10, TimeUnit.SECONDS));
162+
assertTrue("Consumers should complete", consumerLatch.await(10, TimeUnit.SECONDS));
163+
164+
executor.shutdown();
165+
assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
166+
167+
// Verify all items were consumed
168+
assertEquals("All items should be consumed", itemCount, consumedItems.size());
169+
}
170+
171+
@Test
172+
public void singleProducerAndConsumer() throws Exception {
173+
IterableAsyncQueue<Integer> queue = new IterableAsyncQueue<>();
174+
int itemCount = 10000;
175+
176+
AtomicInteger producedCount = new AtomicInteger(0);
177+
AtomicInteger consumedCount = new AtomicInteger(0);
178+
179+
ExecutorService executor = Executors.newFixedThreadPool(2);
180+
181+
// Producer
182+
CompletableFuture<Void> producer = CompletableFuture.runAsync(() -> {
183+
for (int i = 0; i < itemCount; i++) {
184+
queue.put(i);
185+
producedCount.incrementAndGet();
186+
}
187+
}, executor);
188+
189+
// Consumer
190+
CompletableFuture<Void> consumer = CompletableFuture.runAsync(() -> {
191+
try {
192+
for (int i = 0; i < itemCount; i++) {
193+
Integer item = queue.take().get(5, TimeUnit.SECONDS);
194+
assertEquals(Integer.valueOf(i), item);
195+
consumedCount.incrementAndGet();
196+
}
197+
} catch (Exception e) {
198+
throw new RuntimeException(e);
199+
}
200+
}, executor);
201+
202+
// Wait for both to complete
203+
CompletableFuture.allOf(producer, consumer).get(10, TimeUnit.SECONDS);
204+
205+
executor.shutdown();
206+
assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
207+
208+
assertEquals("All items should be produced", itemCount, producedCount.get());
209+
assertEquals("All items should be consumed", itemCount, consumedCount.get());
210+
}
211+
212+
@Test
213+
public void multipleProducersSingleConsumer() throws Exception {
214+
IterableAsyncQueue<String> queue = new IterableAsyncQueue<>();
215+
int producersCount = 10;
216+
int itemsPerProducer = 100;
217+
int totalItems = producersCount * itemsPerProducer;
218+
219+
ExecutorService executor = Executors.newFixedThreadPool(producersCount + 1);
220+
CountDownLatch producerLatch = new CountDownLatch(producersCount);
221+
222+
// Start multiple producers
223+
for (int p = 0; p < producersCount; p++) {
224+
final int producerId = p;
225+
executor.submit(() -> {
226+
try {
227+
for (int i = 0; i < itemsPerProducer; i++) {
228+
queue.put("producer-" + producerId + "-item-" + i);
229+
}
230+
} finally {
231+
producerLatch.countDown();
232+
}
233+
});
234+
}
235+
236+
// Single consumer
237+
List<String> consumed = new ArrayList<>();
238+
CompletableFuture<Void> consumer = CompletableFuture.runAsync(() -> {
239+
try {
240+
for (int i = 0; i < totalItems; i++) {
241+
String item = queue.take().get(5, TimeUnit.SECONDS);
242+
consumed.add(item);
243+
}
244+
} catch (Exception e) {
245+
throw new RuntimeException(e);
246+
}
247+
}, executor);
248+
249+
assertTrue("Producers should complete", producerLatch.await(10, TimeUnit.SECONDS));
250+
consumer.get(10, TimeUnit.SECONDS);
251+
252+
executor.shutdown();
253+
assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
254+
255+
assertEquals("Consumer should receive all items", totalItems, consumed.size());
256+
}
257+
258+
@Test
259+
public void singleProducerMultipleConsumers() throws Exception {
260+
IterableAsyncQueue<Integer> queue = new IterableAsyncQueue<>();
261+
int consumersCount = 10;
262+
int totalItems = 1000;
263+
int itemsPerConsumer = totalItems / consumersCount;
264+
265+
ExecutorService executor = Executors.newFixedThreadPool(consumersCount + 1);
266+
CountDownLatch consumerLatch = new CountDownLatch(consumersCount);
267+
268+
List<Integer> allConsumed = new ArrayList<>();
269+
Object consumedLock = new Object();
270+
271+
// Start multiple consumers
272+
for (int c = 0; c < consumersCount; c++) {
273+
executor.submit(() -> {
274+
try {
275+
for (int i = 0; i < itemsPerConsumer; i++) {
276+
Integer item = queue.take().get(5, TimeUnit.SECONDS);
277+
synchronized (consumedLock) {
278+
allConsumed.add(item);
279+
}
280+
}
281+
} catch (Exception e) {
282+
throw new RuntimeException(e);
283+
} finally {
284+
consumerLatch.countDown();
285+
}
286+
});
287+
}
288+
289+
// Single producer
290+
CompletableFuture<Void> producer = CompletableFuture.runAsync(() -> {
291+
for (int i = 0; i < totalItems; i++) {
292+
queue.put(i);
293+
}
294+
}, executor);
295+
296+
producer.get(5, TimeUnit.SECONDS);
297+
assertTrue("Consumers should complete", consumerLatch.await(10, TimeUnit.SECONDS));
298+
299+
executor.shutdown();
300+
assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
301+
302+
assertEquals("All items should be consumed", totalItems, allConsumed.size());
303+
}
304+
305+
@Test
306+
public void nullValuesAreSupported() throws Exception {
307+
IterableAsyncQueue<String> queue = new IterableAsyncQueue<>();
308+
309+
queue.put(null);
310+
queue.put("not-null");
311+
queue.put(null);
312+
313+
assertNull(queue.take().get());
314+
assertEquals("not-null", queue.take().get());
315+
assertNull(queue.take().get());
316+
}
317+
318+
@Test
319+
public void takeCompletesAsynchronously() throws Exception {
320+
IterableAsyncQueue<String> queue = new IterableAsyncQueue<>();
321+
322+
CompletableFuture<String> future = queue.take();
323+
AtomicInteger callbackInvoked = new AtomicInteger(0);
324+
325+
// Attach callback
326+
future.thenAccept(item -> {
327+
assertEquals("async-item", item);
328+
callbackInvoked.incrementAndGet();
329+
});
330+
331+
assertFalse("Future should not be completed yet", future.isDone());
332+
assertEquals(0, callbackInvoked.get());
333+
334+
// Put item should trigger callback
335+
queue.put("async-item");
336+
337+
// Give callback time to execute
338+
Thread.sleep(50);
339+
340+
assertTrue("Future should be completed", future.isDone());
341+
assertEquals("Callback should have been invoked", 1, callbackInvoked.get());
342+
}
343+
}

0 commit comments

Comments
 (0)