Skip to content

Commit bef2500

Browse files
chore: Add FDv2 data source interfaces. (#100)
BEGIN_COMMIT_OVERRIDE chore: Add FDv2 data source interfaces. chore: Add FDv2 polling data source. chore: Scaffold FDv2 Data Source. END_COMMIT_OVERRIDE <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Adds an experimental FDv2 data source pipeline with polling and change propagation. > > - Implements `DefaultFDv2Requestor` for FDv2 HTTP polling with `version`/`state` query params, ETag caching, header passthrough, and robust error handling > - Introduces `FDv2SourceResult`, `Initializer`, `Synchronizer`, and `SelectorSource` interfaces and `FDv2DataSource` coordinator to run initializers then synchronizers > - Provides `PollingBase`, `PollingInitializerImpl`, and `PollingSynchronizerImpl` (scheduled) to process events, map errors, and emit `ChangeSet`/status updates > - Adds `FDv2ChangeSetTranslator` to convert FDv2 changes to `DataStoreTypes.ChangeSet` (flags/segments), with logging for unknown kinds > - Adds `IterableAsyncQueue` utility for async result delivery > - Extends `StandardEndpoints` with `FDV2_POLLING_REQUEST_PATH` and `FDV2_STREAMING_REQUEST_PATH` > - Extensive unit tests for requestor, translator, initializer/synchronizer behaviors and queue > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 117b85c. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Co-authored-by: Todd Anderson <tanderson@launchdarkly.com>
1 parent 89fab04 commit bef2500

20 files changed

+3331
-0
lines changed
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
package com.launchdarkly.sdk.server;
2+
3+
import com.launchdarkly.logging.LDLogger;
4+
import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event;
5+
import com.launchdarkly.sdk.internal.fdv2.sources.Selector;
6+
import com.launchdarkly.sdk.internal.http.HttpErrors;
7+
import com.launchdarkly.sdk.internal.http.HttpHelpers;
8+
import com.launchdarkly.sdk.internal.http.HttpProperties;
9+
import com.launchdarkly.sdk.json.SerializationException;
10+
11+
import okhttp3.Call;
12+
import okhttp3.Callback;
13+
import okhttp3.Headers;
14+
import okhttp3.OkHttpClient;
15+
import okhttp3.Request;
16+
import okhttp3.Response;
17+
18+
import javax.annotation.Nonnull;
19+
20+
import java.io.Closeable;
21+
import java.io.IOException;
22+
import java.net.SocketTimeoutException;
23+
import java.net.URI;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.Objects;
28+
import java.util.concurrent.CompletableFuture;
29+
30+
/**
31+
* Implementation of FDv2Requestor for polling feature flag data via FDv2 protocol.
32+
*/
33+
public class DefaultFDv2Requestor implements FDv2Requestor, Closeable {
34+
private static final String VERSION_QUERY_PARAM = "version";
35+
private static final String STATE_QUERY_PARAM = "state";
36+
37+
private final OkHttpClient httpClient;
38+
private final URI pollingUri;
39+
private final Headers headers;
40+
private final LDLogger logger;
41+
private final Map<URI, String> etags;
42+
43+
/**
44+
* Creates a DefaultFDv2Requestor.
45+
*
46+
* @param httpProperties HTTP configuration properties
47+
* @param baseUri base URI for the FDv2 polling endpoint
48+
* @param requestPath the request path to append to the base URI (e.g., "/sdk/poll")
49+
* @param logger logger for diagnostic output
50+
*/
51+
public DefaultFDv2Requestor(HttpProperties httpProperties, URI baseUri, String requestPath, LDLogger logger) {
52+
this.logger = logger;
53+
this.pollingUri = HttpHelpers.concatenateUriPath(baseUri, requestPath);
54+
this.etags = new HashMap<>();
55+
56+
OkHttpClient.Builder httpBuilder = httpProperties.toHttpClientBuilder();
57+
this.headers = httpProperties.toHeadersBuilder().build();
58+
this.httpClient = httpBuilder.build();
59+
}
60+
61+
@Override
62+
public CompletableFuture<FDv2PayloadResponse> Poll(Selector selector) {
63+
CompletableFuture<FDv2PayloadResponse> future = new CompletableFuture<>();
64+
65+
try {
66+
// Build the request URI with query parameters
67+
URI requestUri = pollingUri;
68+
69+
if (!selector.isEmpty()) {
70+
requestUri = HttpHelpers.addQueryParam(requestUri, VERSION_QUERY_PARAM, String.valueOf(selector.getVersion()));
71+
}
72+
73+
if (selector.getState() != null && !selector.getState().isEmpty()) {
74+
requestUri = HttpHelpers.addQueryParam(requestUri, STATE_QUERY_PARAM, selector.getState());
75+
}
76+
77+
logger.debug("Making FDv2 polling request to: {}", requestUri);
78+
79+
// Build the HTTP request
80+
Request.Builder requestBuilder = new Request.Builder()
81+
.url(requestUri.toURL())
82+
.headers(headers)
83+
.get();
84+
85+
// Add ETag if we have one cached for this URI
86+
synchronized (etags) {
87+
String etag = etags.get(requestUri);
88+
if (etag != null) {
89+
requestBuilder.header("If-None-Match", etag);
90+
}
91+
}
92+
93+
Request request = requestBuilder.build();
94+
final URI finalRequestUri = requestUri;
95+
96+
// Make asynchronous HTTP call
97+
httpClient.newCall(request).enqueue(new Callback() {
98+
@Override
99+
public void onFailure(@Nonnull Call call, @Nonnull IOException e) {
100+
if (e instanceof SocketTimeoutException) {
101+
future.completeExceptionally(
102+
new IOException("FDv2 polling request timed out: " + finalRequestUri, e)
103+
);
104+
} else {
105+
future.completeExceptionally(e);
106+
}
107+
}
108+
109+
@Override
110+
public void onResponse(@Nonnull Call call, @Nonnull Response response) {
111+
try {
112+
// Handle 304 Not Modified - no new data
113+
if (response.code() == 304) {
114+
logger.debug("FDv2 polling request returned 304: not modified");
115+
future.complete(null);
116+
return;
117+
}
118+
119+
if (!response.isSuccessful()) {
120+
future.completeExceptionally(
121+
new HttpErrors.HttpErrorException(response.code())
122+
);
123+
return;
124+
}
125+
126+
// Update ETag cache
127+
String newEtag = response.header("ETag");
128+
synchronized (etags) {
129+
if (newEtag != null) {
130+
etags.put(finalRequestUri, newEtag);
131+
} else {
132+
etags.remove(finalRequestUri);
133+
}
134+
}
135+
136+
// The documentation indicates that the body will not be null for a response passed to the
137+
// onResponse callback.
138+
String responseBody = Objects.requireNonNull(response.body()).string();
139+
logger.debug("Received FDv2 polling response");
140+
141+
List<FDv2Event> events = FDv2Event.parseEventsArray(responseBody);
142+
143+
// Create and return the response
144+
FDv2PayloadResponse pollingResponse = new FDv2PayloadResponse(events, response.headers());
145+
future.complete(pollingResponse);
146+
147+
} catch (Exception e) {
148+
future.completeExceptionally(e);
149+
} finally {
150+
response.close();
151+
}
152+
}
153+
});
154+
155+
} catch (Exception e) {
156+
future.completeExceptionally(e);
157+
}
158+
159+
return future;
160+
}
161+
162+
/**
163+
* Closes the HTTP client and releases resources.
164+
*/
165+
public void close() {
166+
HttpProperties.shutdownHttpClient(httpClient);
167+
}
168+
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package com.launchdarkly.sdk.server;
2+
3+
import com.google.common.collect.ImmutableList;
4+
import com.launchdarkly.logging.LDLogger;
5+
import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ChangeSet;
6+
import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ChangeSet.FDv2Change;
7+
import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ChangeSet.FDv2ChangeType;
8+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes;
9+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSetType;
10+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind;
11+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor;
12+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems;
13+
14+
import java.util.AbstractMap;
15+
import java.util.ArrayList;
16+
import java.util.LinkedHashMap;
17+
import java.util.List;
18+
import java.util.Map;
19+
20+
/**
21+
* Translates FDv2 changesets into data store formats.
22+
*/
23+
final class FDv2ChangeSetTranslator {
24+
private FDv2ChangeSetTranslator() {
25+
}
26+
27+
/**
28+
* Converts an FDv2ChangeSet to a DataStoreTypes.ChangeSet.
29+
*
30+
* @param changeset the FDv2 changeset to convert
31+
* @param logger logger for diagnostic messages
32+
* @param environmentId the environment ID to include in the changeset (may be null)
33+
* @return a DataStoreTypes.ChangeSet containing the converted data
34+
* @throws IllegalArgumentException if the changeset type is unknown
35+
*/
36+
public static DataStoreTypes.ChangeSet<ItemDescriptor> toChangeSet(
37+
FDv2ChangeSet changeset,
38+
LDLogger logger,
39+
String environmentId) {
40+
ChangeSetType changeSetType;
41+
switch (changeset.getType()) {
42+
case FULL:
43+
changeSetType = ChangeSetType.Full;
44+
break;
45+
case PARTIAL:
46+
changeSetType = ChangeSetType.Partial;
47+
break;
48+
case NONE:
49+
changeSetType = ChangeSetType.None;
50+
break;
51+
default:
52+
throw new IllegalArgumentException(
53+
"Unknown FDv2ChangeSetType: " + changeset.getType() + ". This is an implementation error.");
54+
}
55+
56+
// Use a LinkedHashMap to group items by DataKind in a single pass while preserving order
57+
Map<DataKind, List<Map.Entry<String, ItemDescriptor>>> kindToItems = new LinkedHashMap<>();
58+
59+
for (FDv2Change change : changeset.getChanges()) {
60+
DataKind dataKind = getDataKind(change.getKind());
61+
62+
if (dataKind == null) {
63+
logger.warn("Unknown data kind '{}' in changeset, skipping", change.getKind());
64+
continue;
65+
}
66+
67+
ItemDescriptor item;
68+
69+
if (change.getType() == FDv2ChangeType.PUT) {
70+
if (change.getObject() == null) {
71+
logger.warn(
72+
"Put operation for {}/{} missing object data, skipping",
73+
change.getKind(),
74+
change.getKey());
75+
continue;
76+
}
77+
item = dataKind.deserialize(change.getObject().toString());
78+
} else if (change.getType() == FDv2ChangeType.DELETE) {
79+
item = ItemDescriptor.deletedItem(change.getVersion());
80+
} else {
81+
throw new IllegalArgumentException(
82+
"Unknown FDv2ChangeType: " + change.getType() + ". This is an implementation error.");
83+
}
84+
85+
List<Map.Entry<String, ItemDescriptor>> itemsList =
86+
kindToItems.computeIfAbsent(dataKind, k -> new ArrayList<>());
87+
88+
itemsList.add(new AbstractMap.SimpleImmutableEntry<>(change.getKey(), item));
89+
}
90+
91+
ImmutableList.Builder<Map.Entry<DataKind, KeyedItems<ItemDescriptor>>> dataBuilder =
92+
ImmutableList.builder();
93+
94+
for (Map.Entry<DataKind, List<Map.Entry<String, ItemDescriptor>>> entry : kindToItems.entrySet()) {
95+
dataBuilder.add(
96+
new AbstractMap.SimpleImmutableEntry<>(
97+
entry.getKey(),
98+
new KeyedItems<>(entry.getValue())
99+
));
100+
}
101+
102+
return new DataStoreTypes.ChangeSet<>(
103+
changeSetType,
104+
changeset.getSelector(),
105+
dataBuilder.build(),
106+
environmentId);
107+
}
108+
109+
/**
110+
* Maps an FDv2 object kind to the corresponding DataKind.
111+
*
112+
* @param kind the kind string from the FDv2 change
113+
* @return the corresponding DataKind, or null if the kind is not recognized
114+
*/
115+
private static DataKind getDataKind(String kind) {
116+
switch (kind) {
117+
case "flag":
118+
return DataModel.FEATURES;
119+
case "segment":
120+
return DataModel.SEGMENTS;
121+
default:
122+
return null;
123+
}
124+
}
125+
}

0 commit comments

Comments
 (0)