Skip to content

Commit 1965b25

Browse files
authored
feat: Add IterableAsyncQueue. (#118)
BEGIN_COMMIT_OVERRIDE feat: Add IterableAsyncQueue. feat: Update minimum Java version to 1.8. END_COMMIT_OVERRIDE <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Raising `shared/common`’s Java baseline to 1.8 can break consumers pinned to Java 7; the new queue is well-tested but adds new concurrency code paths. > > **Overview** > Adds `com.launchdarkly.sdk.collections.IterableAsyncQueue`, a thread-safe unbounded FIFO queue whose `take()` returns a `CompletableFuture` that completes immediately when items exist or later when `put()` supplies one, plus package docs. > > Updates `lib/shared/common` to target Java 8 (1.8) and adds an extensive JUnit test suite covering ordering, null handling, and multi-threaded producer/consumer behavior. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit e78dcde. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 6804e26 commit 1965b25

File tree

5 files changed

+421
-2
lines changed

5 files changed

+421
-2
lines changed

lib/shared/common/build.gradle.kts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ base {
3333
java {
3434
withJavadocJar()
3535
withSourcesJar()
36-
sourceCompatibility = JavaVersion.VERSION_1_7
37-
targetCompatibility = JavaVersion.VERSION_1_7
36+
sourceCompatibility = JavaVersion.VERSION_1_8
37+
targetCompatibility = JavaVersion.VERSION_1_8
3838
}
3939

4040
// See Dependencies.kt in buildSrc for the purpose of "privateImplementation"
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package com.launchdarkly.sdk.collections;
2+
3+
import java.util.LinkedList;
4+
import java.util.concurrent.CompletableFuture;
5+
6+
/**
7+
* A thread-safe unbounded queue that provides asynchronous consumption via {@link CompletableFuture}.
8+
* <p>
9+
* This queue supports multiple concurrent producers and consumers. Items are delivered in FIFO order.
10+
* The {@link #take()} method returns a {@link CompletableFuture} that either completes immediately
11+
* if an item is available, or completes later when an item is added via {@link #put(Object)}.
12+
* <p>
13+
* When multiple consumers are waiting (i.e., multiple pending {@link #take()} calls), they are
14+
* satisfied in FIFO order as items become available.
15+
* <p>
16+
* Null values are supported.
17+
*
18+
* @param <T> the type of elements held in this queue
19+
*/
20+
class IterableAsyncQueue<T> {
21+
private final Object lock = new Object();
22+
private final LinkedList<T> queue = new LinkedList<>();
23+
24+
private final LinkedList<CompletableFuture<T>> pendingFutures = new LinkedList<>();
25+
26+
/**
27+
* Adds an item to the queue.
28+
* <p>
29+
* If there is a consumer is waiting (a pending {@link #take()} call), the item is delivered
30+
* directly to the oldest waiting consumer's future. Otherwise, the item is added to the
31+
* queue for later consumption.
32+
* <p>
33+
* If a future returned by this method is completed or canceled by the caller, then the item associated
34+
* with that call will not be delivered. It is recommended not to complete or cancel the future
35+
* returned by {@link #take()} unless you are finished using the queue.
36+
*
37+
* @param item the item to add (maybe null)
38+
*/
39+
public void put(T item) {
40+
CompletableFuture<T> pendingFuture = null;
41+
synchronized (lock) {
42+
CompletableFuture<T> nextFuture = pendingFutures.pollFirst();
43+
if(nextFuture != null) {
44+
pendingFuture = nextFuture;
45+
} else {
46+
queue.addLast(item);
47+
return;
48+
}
49+
}
50+
// Execute callback outside the lock.
51+
pendingFuture.complete(item);
52+
}
53+
/**
54+
* Retrieves and removes an item from the queue, returning a future that completes with the item.
55+
* <p>
56+
* If the queue contains items, returns an already-completed future with the oldest item.
57+
* If the queue is empty, returns a future that will complete when an item becomes available
58+
* via {@link #put(Object)}.
59+
*
60+
* @return a {@link CompletableFuture} that completes with the next item
61+
*/
62+
public CompletableFuture<T> take() {
63+
synchronized (lock) {
64+
if(!queue.isEmpty()) {
65+
return CompletableFuture.completedFuture(queue.removeFirst());
66+
}
67+
CompletableFuture<T> takeFuture = new CompletableFuture<>();
68+
pendingFutures.addLast(takeFuture);
69+
return takeFuture;
70+
}
71+
}
72+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/**
2+
* Collections for use in LaunchDarkly SDKs and components.
3+
*/
4+
package com.launchdarkly.sdk.collections;

0 commit comments

Comments
 (0)