Skip to content

Commit c987dc4

Browse files
authored
chore: add DataSourceUpdatesSinkV2 support to DataSourceUpdatesImpl (#107)
**Requirements** - [x] I have added test coverage for new or changed functionality - [x] I have followed the repository's [pull request submission guidelines](../blob/main/CONTRIBUTING.md#submitting-pull-requests) - [x] I have validated my changes against all supported platform versions **Related issues** SDK-1608 **Describe the solution you've provided** Porting related Dotnet sdk changes to Java SDK. Finding new homes for various helper functionality. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Introduces FDv2 transactional update flow and supporting APIs. > > - Adds `TransactionalDataSourceUpdateSink` and `DataSourceUpdateSinkV2` (status access + `updateStatus`) for FDv2 data sources > - Extends `DataSourceUpdatesImpl` to implement V2 and handle `apply(ChangeSet)`: > - Transactional path for `TransactionalDataStore.apply` with dependency/event handling and status updates > - Legacy path maps full changesets to `init` and partial to ordered `upsert`s > - Generates flag change events and maintains dependency graph for both paths > - Adds `DataModelDependencies.sortChangeset` to order updates by dependencies and data kind > - Updates tests to cover sorting, transactional/legacy paths, events, and environment/selector metadata; test helpers now support V2 sink > - Minor: version file comment markers added around `SDK_VERSION` > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit f3b09f5. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent bef2500 commit c987dc4

File tree

8 files changed

+1121
-4
lines changed

8 files changed

+1121
-4
lines changed

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataModelDependencies.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import com.google.common.collect.Iterables;
77
import com.launchdarkly.sdk.LDValue;
88
import com.launchdarkly.sdk.server.DataModel.Operator;
9+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSet;
10+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSetType;
911
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind;
1012
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet;
1113
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor;
@@ -132,6 +134,23 @@ public static FullDataSet<ItemDescriptor> sortAllCollections(FullDataSet<ItemDes
132134
return new FullDataSet<>(builder.build().entrySet());
133135
}
134136

137+
/**
138+
* Sort the data in the changeset in dependency order. If there are any duplicates, then the highest version
139+
* of the duplicate item will be retained.
140+
*
141+
* @param inSet the changeset to sort
142+
* @return a sorted copy of the changeset
143+
*/
144+
public static ChangeSet<ItemDescriptor> sortChangeset(ChangeSet<ItemDescriptor> inSet) {
145+
ImmutableSortedMap.Builder<DataKind, KeyedItems<ItemDescriptor>> builder =
146+
ImmutableSortedMap.orderedBy(dataKindPriorityOrder);
147+
for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> entry: inSet.getData()) {
148+
DataKind kind = entry.getKey();
149+
builder.put(kind, sortCollection(kind, entry.getValue()));
150+
}
151+
return new ChangeSet<>(inSet.getType(), inSet.getSelector(), builder.build().entrySet(), inSet.getEnvironmentId());
152+
}
153+
135154
private static KeyedItems<ItemDescriptor> sortCollection(DataKind kind, KeyedItems<ItemDescriptor> input) {
136155
if (!isDependencyOrdered(kind) || isEmpty(input.getItems())) {
137156
return input;

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSourceUpdatesImpl.java

Lines changed: 176 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,14 @@
1212
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.Status;
1313
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.StatusListener;
1414
import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSink;
15+
import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSinkV2;
1516
import com.launchdarkly.sdk.server.subsystems.DataStore;
17+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSet;
1618
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind;
1719
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet;
1820
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor;
1921
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems;
22+
import com.launchdarkly.sdk.server.subsystems.TransactionalDataStore;
2023
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider;
2124
import com.launchdarkly.sdk.server.interfaces.FlagChangeEvent;
2225
import com.launchdarkly.sdk.server.interfaces.FlagChangeListener;
@@ -48,7 +51,7 @@
4851
*
4952
* @since 4.11.0
5053
*/
51-
final class DataSourceUpdatesImpl implements DataSourceUpdateSink {
54+
final class DataSourceUpdatesImpl implements DataSourceUpdateSink, DataSourceUpdateSinkV2 {
5255
private final DataStore store;
5356
private final EventBroadcasterImpl<FlagChangeListener, FlagChangeEvent> flagChangeEventNotifier;
5457
private final EventBroadcasterImpl<StatusListener, Status> dataSourceStatusNotifier;
@@ -365,4 +368,176 @@ private void onTimeout() {
365368
private static String describeErrorCount(Map.Entry<ErrorInfo, Integer> entry) {
366369
return entry.getKey() + " (" + entry.getValue() + (entry.getValue() == 1 ? " time" : " times") + ")";
367370
}
371+
372+
@Override
373+
public boolean apply(ChangeSet<ItemDescriptor> changeSet) {
374+
if (store instanceof TransactionalDataStore) {
375+
return applyToTransactionalStore((TransactionalDataStore) store, changeSet);
376+
}
377+
378+
// Legacy update path for non-transactional stores
379+
return applyToLegacyStore(changeSet);
380+
}
381+
382+
private boolean applyToTransactionalStore(TransactionalDataStore transactionalDataStore,
383+
ChangeSet<ItemDescriptor> changeSet) {
384+
Map<DataKind, Map<String, ItemDescriptor>> oldData;
385+
// Getting the old values requires accessing the store, which can fail.
386+
// If there is a failure to read the store, then we stop treating it as a failure.
387+
try {
388+
oldData = getOldDataIfFlagChangeListeners();
389+
} catch (RuntimeException e) {
390+
reportStoreFailure(e);
391+
return false;
392+
}
393+
394+
ChangeSet<ItemDescriptor> sortedChangeSet = DataModelDependencies.sortChangeset(changeSet);
395+
396+
try {
397+
transactionalDataStore.apply(sortedChangeSet);
398+
lastStoreUpdateFailed = false;
399+
} catch (RuntimeException e) {
400+
reportStoreFailure(e);
401+
return false;
402+
}
403+
404+
// Calling Apply implies that the data source is now in a valid state.
405+
updateStatus(State.VALID, null);
406+
407+
Set<KindAndKey> changes = updateDependencyTrackerForChangesetAndDetermineChanges(oldData, sortedChangeSet);
408+
409+
// Now, if we previously queried the old data because someone is listening for flag change events, compare
410+
// the versions of all items and generate events for those (and any other items that depend on them)
411+
if (changes != null) {
412+
sendChangeEvents(changes);
413+
}
414+
415+
return true;
416+
}
417+
418+
private boolean applyToLegacyStore(ChangeSet<ItemDescriptor> sortedChangeSet) {
419+
switch (sortedChangeSet.getType()) {
420+
case Full:
421+
return applyFullChangeSetToLegacyStore(sortedChangeSet);
422+
case Partial:
423+
return applyPartialChangeSetToLegacyStore(sortedChangeSet);
424+
case None:
425+
default:
426+
return true;
427+
}
428+
}
429+
430+
private boolean applyFullChangeSetToLegacyStore(ChangeSet<ItemDescriptor> unsortedChangeset) {
431+
// Convert ChangeSet to FullDataSet for legacy init path
432+
return init(new FullDataSet<>(unsortedChangeset.getData()));
433+
}
434+
435+
private boolean applyPartialChangeSetToLegacyStore(ChangeSet<ItemDescriptor> changeSet) {
436+
// Sorting isn't strictly required here, as upsert behavior didn't traditionally have it,
437+
// but it also doesn't hurt, and there could be cases where it results in slightly
438+
// greater store consistency for persistent stores.
439+
ChangeSet<ItemDescriptor> sortedChangeset = DataModelDependencies.sortChangeset(changeSet);
440+
441+
for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> kindItemsPair: sortedChangeset.getData()) {
442+
for (Map.Entry<String, ItemDescriptor> item: kindItemsPair.getValue().getItems()) {
443+
boolean applySuccess = upsert(kindItemsPair.getKey(), item.getKey(), item.getValue());
444+
if (!applySuccess) {
445+
return false;
446+
}
447+
}
448+
}
449+
// The upsert will update the store status in the case of a store failure.
450+
// The application of the upserts does not set the store initialized.
451+
452+
// Considering the store will be the same for the duration of the application
453+
// lifecycle we will not be applying a partial update to a store that didn't
454+
// already get a full update. The non-transactional store will also not support a selector.
455+
456+
return true;
457+
}
458+
459+
private Map<DataKind, Map<String, ItemDescriptor>> getOldDataIfFlagChangeListeners() {
460+
if (hasFlagChangeEventListeners()) {
461+
// Query the existing data if any, so that after the update we can send events for
462+
// whatever was changed
463+
Map<DataKind, Map<String, ItemDescriptor>> oldData = new HashMap<>();
464+
for (DataKind kind: ALL_DATA_KINDS) {
465+
KeyedItems<ItemDescriptor> items = store.getAll(kind);
466+
oldData.put(kind, ImmutableMap.copyOf(items.getItems()));
467+
}
468+
return oldData;
469+
} else {
470+
return null;
471+
}
472+
}
473+
474+
private Map<DataKind, Map<String, ItemDescriptor>> changeSetToMap(ChangeSet<ItemDescriptor> changeSet) {
475+
Map<DataKind, Map<String, ItemDescriptor>> ret = new HashMap<>();
476+
for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> e: changeSet.getData()) {
477+
ret.put(e.getKey(), ImmutableMap.copyOf(e.getValue().getItems()));
478+
}
479+
return ret;
480+
}
481+
482+
private Set<KindAndKey> updateDependencyTrackerForChangesetAndDetermineChanges(
483+
Map<DataKind, Map<String, ItemDescriptor>> oldDataMap,
484+
ChangeSet<ItemDescriptor> changeSet) {
485+
switch (changeSet.getType()) {
486+
case Full:
487+
return handleFullChangeset(oldDataMap, changeSet);
488+
case Partial:
489+
return handlePartialChangeset(oldDataMap, changeSet);
490+
case None:
491+
return null;
492+
default:
493+
return null;
494+
}
495+
}
496+
497+
private Set<KindAndKey> handleFullChangeset(
498+
Map<DataKind, Map<String, ItemDescriptor>> oldDataMap,
499+
ChangeSet<ItemDescriptor> changeSet) {
500+
dependencyTracker.reset();
501+
for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> kindEntry: changeSet.getData()) {
502+
DataKind kind = kindEntry.getKey();
503+
for (Map.Entry<String, ItemDescriptor> itemEntry: kindEntry.getValue().getItems()) {
504+
String key = itemEntry.getKey();
505+
dependencyTracker.updateDependenciesFrom(kind, key, itemEntry.getValue());
506+
}
507+
}
508+
509+
if (oldDataMap == null) {
510+
return null;
511+
}
512+
513+
Map<DataKind, Map<String, ItemDescriptor>> newDataMap = changeSetToMap(changeSet);
514+
return computeChangedItemsForFullDataSet(oldDataMap, newDataMap);
515+
}
516+
517+
private Set<KindAndKey> handlePartialChangeset(
518+
Map<DataKind, Map<String, ItemDescriptor>> oldDataMap,
519+
ChangeSet<ItemDescriptor> changeSet) {
520+
if (oldDataMap == null) {
521+
// Update dependencies but don't track changes when no listeners
522+
for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> kindEntry: changeSet.getData()) {
523+
DataKind kind = kindEntry.getKey();
524+
for (Map.Entry<String, ItemDescriptor> itemEntry: kindEntry.getValue().getItems()) {
525+
dependencyTracker.updateDependenciesFrom(kind, itemEntry.getKey(), itemEntry.getValue());
526+
}
527+
}
528+
return null;
529+
}
530+
531+
Set<KindAndKey> affectedItems = new HashSet<>();
532+
for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> kindEntry: changeSet.getData()) {
533+
DataKind kind = kindEntry.getKey();
534+
for (Map.Entry<String, ItemDescriptor> itemEntry: kindEntry.getValue().getItems()) {
535+
String key = itemEntry.getKey();
536+
dependencyTracker.updateDependenciesFrom(kind, key, itemEntry.getValue());
537+
dependencyTracker.addAffectedItems(affectedItems, new KindAndKey(kind, key));
538+
}
539+
}
540+
541+
return affectedItems;
542+
}
368543
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.launchdarkly.sdk.server.subsystems;
2+
3+
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
4+
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.ErrorInfo;
5+
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.State;
6+
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider;
7+
8+
/**
9+
* Interfaces required by data source updates implementations in FDv2.
10+
* <p>
11+
* This interface extends {@link TransactionalDataSourceUpdateSink} to add status tracking
12+
* and status update capabilities required for FDv2 data sources.
13+
* <p>
14+
* This interface is not stable, and not subject to any backwards compatibility guarantees or semantic versioning.
15+
* It is in early access. If you want access to this feature please join the EAP. https://launchdarkly.com/docs/sdk/features/data-saving-mode
16+
*
17+
* @since 5.0.0
18+
* @see TransactionalDataSourceUpdateSink
19+
* @see DataSource
20+
*/
21+
public interface DataSourceUpdateSinkV2 extends TransactionalDataSourceUpdateSink {
22+
/**
23+
* An object that provides status tracking for the data store, if applicable.
24+
* <p>
25+
* This may be useful if the data source needs to be aware of storage problems that might require it
26+
* to take some special action: for instance, if a database outage may have caused some data to be
27+
* lost and therefore the data should be re-requested from LaunchDarkly.
28+
*
29+
* @return a {@link DataStoreStatusProvider}
30+
*/
31+
DataStoreStatusProvider getDataStoreStatusProvider();
32+
33+
/**
34+
* Informs the SDK of a change in the data source's status.
35+
* <p>
36+
* Data source implementations should use this method if they have any concept of being in a valid
37+
* state, a temporarily disconnected state, or a permanently stopped state.
38+
* <p>
39+
* If {@code newState} is different from the previous state, and/or {@code newError} is non-null, the
40+
* SDK will start returning the new status (adding a timestamp for the change) from
41+
* {@link DataSourceStatusProvider#getStatus()}, and will trigger status change events to any
42+
* registered listeners.
43+
* <p>
44+
* A special case is that if {@code newState} is {@link State#INTERRUPTED},
45+
* but the previous state was {@link State#INITIALIZING}, the state will
46+
* remain at {@link State#INITIALIZING} because {@link State#INTERRUPTED}
47+
* is only meaningful after a successful startup.
48+
*
49+
* @param newState the data source state
50+
* @param newError information about a new error, if any
51+
* @see DataSourceStatusProvider
52+
*/
53+
void updateStatus(State newState, ErrorInfo newError);
54+
}
55+
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.launchdarkly.sdk.server.subsystems;
2+
3+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSet;
4+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor;
5+
6+
/**
7+
* Interface that an implementation of {@link DataSource} will use to push data into the SDK transactionally.
8+
* <p>
9+
* The data source interacts with this object, rather than manipulating the data store directly, so
10+
* that the SDK can perform any other necessary operations that must happen when data is updated. This
11+
* object also provides a mechanism to report status changes.
12+
* <p>
13+
* Component factories for {@link DataSource} implementations will receive an implementation of this
14+
* interface in the {@link ClientContext#getDataSourceUpdateSink()} property of {@link ClientContext}.
15+
* <p>
16+
* This interface is not stable, and not subject to any backwards compatibility guarantees or semantic versioning.
17+
* It is in early access. If you want access to this feature please join the EAP. https://launchdarkly.com/docs/sdk/features/data-saving-mode
18+
*
19+
* @since 5.0.0
20+
* @see DataSource
21+
* @see ClientContext
22+
*/
23+
public interface TransactionalDataSourceUpdateSink {
24+
/**
25+
* Apply the given change set to the store. This should be done atomically if possible.
26+
*
27+
* @param changeSet the changeset to apply
28+
* @return true if the update succeeded, false if it failed
29+
*/
30+
boolean apply(ChangeSet<ItemDescriptor> changeSet);
31+
}
32+

0 commit comments

Comments
 (0)