Skip to content

Commit 4b8313b

Browse files
committed
Use the DataStoreTypes.ChangeSet type for data source results.
1 parent 9a450e6 commit 4b8313b

File tree

9 files changed

+537
-39
lines changed

9 files changed

+537
-39
lines changed
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package com.launchdarkly.sdk.server;
2+
3+
import com.google.common.collect.ImmutableList;
4+
import com.launchdarkly.logging.LDLogger;
5+
import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ChangeSet;
6+
import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ChangeSet.FDv2Change;
7+
import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ChangeSet.FDv2ChangeType;
8+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes;
9+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSetType;
10+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind;
11+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor;
12+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems;
13+
14+
import java.util.AbstractMap;
15+
import java.util.ArrayList;
16+
import java.util.LinkedHashMap;
17+
import java.util.List;
18+
import java.util.Map;
19+
20+
/**
21+
* Translates FDv2 changesets into data store formats.
22+
*/
23+
final class FDv2ChangeSetTranslator {
24+
private FDv2ChangeSetTranslator() {
25+
}
26+
27+
/**
28+
* Converts an FDv2ChangeSet to a DataStoreTypes.ChangeSet.
29+
*
30+
* @param changeset the FDv2 changeset to convert
31+
* @param logger logger for diagnostic messages
32+
* @param environmentId the environment ID to include in the changeset (may be null)
33+
* @return a DataStoreTypes.ChangeSet containing the converted data
34+
* @throws IllegalArgumentException if the changeset type is unknown
35+
*/
36+
public static DataStoreTypes.ChangeSet<ItemDescriptor> toChangeSet(
37+
FDv2ChangeSet changeset,
38+
LDLogger logger,
39+
String environmentId) {
40+
ChangeSetType changeSetType;
41+
switch (changeset.getType()) {
42+
case FULL:
43+
changeSetType = ChangeSetType.Full;
44+
break;
45+
case PARTIAL:
46+
changeSetType = ChangeSetType.Partial;
47+
break;
48+
case NONE:
49+
changeSetType = ChangeSetType.None;
50+
break;
51+
default:
52+
throw new IllegalArgumentException(
53+
"Unknown FDv2ChangeSetType: " + changeset.getType() + ". This is an implementation error.");
54+
}
55+
56+
// Use a LinkedHashMap to group items by DataKind in a single pass while preserving order
57+
Map<DataKind, List<Map.Entry<String, ItemDescriptor>>> kindToItems = new LinkedHashMap<>();
58+
59+
for (FDv2Change change : changeset.getChanges()) {
60+
DataKind dataKind = getDataKind(change.getKind());
61+
62+
if (dataKind == null) {
63+
logger.warn("Unknown data kind '{}' in changeset, skipping", change.getKind());
64+
continue;
65+
}
66+
67+
ItemDescriptor item;
68+
69+
if (change.getType() == FDv2ChangeType.PUT) {
70+
if (change.getObject() == null) {
71+
logger.warn(
72+
"Put operation for {}/{} missing object data, skipping",
73+
change.getKind(),
74+
change.getKey());
75+
continue;
76+
}
77+
item = dataKind.deserialize(change.getObject().toString());
78+
} else if (change.getType() == FDv2ChangeType.DELETE) {
79+
item = ItemDescriptor.deletedItem(change.getVersion());
80+
} else {
81+
throw new IllegalArgumentException(
82+
"Unknown FDv2ChangeType: " + change.getType() + ". This is an implementation error.");
83+
}
84+
85+
List<Map.Entry<String, ItemDescriptor>> itemsList =
86+
kindToItems.computeIfAbsent(dataKind, k -> new ArrayList<>());
87+
88+
itemsList.add(new AbstractMap.SimpleImmutableEntry<>(change.getKey(), item));
89+
}
90+
91+
ImmutableList.Builder<Map.Entry<DataKind, KeyedItems<ItemDescriptor>>> dataBuilder =
92+
ImmutableList.builder();
93+
94+
for (Map.Entry<DataKind, List<Map.Entry<String, ItemDescriptor>>> entry : kindToItems.entrySet()) {
95+
dataBuilder.add(
96+
new AbstractMap.SimpleImmutableEntry<>(
97+
entry.getKey(),
98+
new KeyedItems<>(entry.getValue())
99+
));
100+
}
101+
102+
return new DataStoreTypes.ChangeSet<>(
103+
changeSetType,
104+
changeset.getSelector(),
105+
dataBuilder.build(),
106+
environmentId);
107+
}
108+
109+
/**
110+
* Maps an FDv2 object kind to the corresponding DataKind.
111+
*
112+
* @param kind the kind string from the FDv2 change
113+
* @return the corresponding DataKind, or null if the kind is not recognized
114+
*/
115+
private static DataKind getDataKind(String kind) {
116+
switch (kind) {
117+
case "flag":
118+
return DataModel.FEATURES;
119+
case "segment":
120+
return DataModel.SEGMENTS;
121+
default:
122+
return null;
123+
}
124+
}
125+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package com.launchdarkly.sdk.server;
2+
3+
public class FDv2DataSource {
4+
}

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingBase.java

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22

33
import com.launchdarkly.logging.LDLogger;
44
import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event;
5-
import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ChangeSet;
65
import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ProtocolHandler;
76
import com.launchdarkly.sdk.internal.fdv2.sources.Selector;
87
import com.launchdarkly.sdk.internal.http.HttpErrors;
98
import com.launchdarkly.sdk.server.datasources.FDv2SourceResult;
109
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
10+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes;
1111
import com.launchdarkly.sdk.server.subsystems.SerializationException;
1212

1313
import java.io.IOException;
@@ -79,26 +79,46 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
7979
FDv2ProtocolHandler.IFDv2ProtocolAction res = handler.handleEvent(event);
8080
switch (res.getAction()) {
8181
case CHANGESET:
82-
return FDv2SourceResult.changeSet(((FDv2ProtocolHandler.FDv2ActionChangeset) res).getChangeset());
83-
case ERROR:
82+
try {
83+
84+
DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> converted = FDv2ChangeSetTranslator.toChangeSet(
85+
((FDv2ProtocolHandler.FDv2ActionChangeset) res).getChangeset(),
86+
logger,
87+
// TODO: Implement environment ID support.
88+
null
89+
);
90+
return FDv2SourceResult.changeSet(converted);
91+
} catch (Exception e) {
92+
// TODO: Do we need to be more specific about the exception type here?
93+
DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo(
94+
DataSourceStatusProvider.ErrorKind.INVALID_DATA,
95+
0,
96+
e.toString(),
97+
new Date().toInstant()
98+
);
99+
return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info);
100+
}
101+
case ERROR: {
84102
FDv2ProtocolHandler.FDv2ActionError error = ((FDv2ProtocolHandler.FDv2ActionError) res);
85-
return FDv2SourceResult.terminalError(
86-
new DataSourceStatusProvider.ErrorInfo(
87-
DataSourceStatusProvider.ErrorKind.UNKNOWN,
88-
0,
89-
error.getReason(),
90-
new Date().toInstant()));
103+
DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo(
104+
DataSourceStatusProvider.ErrorKind.UNKNOWN,
105+
0,
106+
error.getReason(),
107+
new Date().toInstant());
108+
return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info);
109+
}
91110
case GOODBYE:
92111
return FDv2SourceResult.goodbye(((FDv2ProtocolHandler.FDv2ActionGoodbye) res).getReason());
93112
case NONE:
94113
break;
95-
case INTERNAL_ERROR:
96-
return FDv2SourceResult.terminalError(
97-
new DataSourceStatusProvider.ErrorInfo(
98-
DataSourceStatusProvider.ErrorKind.UNKNOWN,
99-
0,
100-
"Internal error occurred during polling",
101-
new Date().toInstant()));
114+
case INTERNAL_ERROR: {
115+
DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo(
116+
DataSourceStatusProvider.ErrorKind.UNKNOWN,
117+
0,
118+
"Internal error occurred during polling",
119+
new Date().toInstant());
120+
return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info);
121+
}
102122
}
103123
}
104124
return FDv2SourceResult.terminalError(new DataSourceStatusProvider.ErrorInfo(
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.launchdarkly.sdk.server.datasources;
2+
3+
/**
4+
* This type is currently experimental and not subject to semantic versioning.
5+
* <p>
6+
* Interface used to shut down a data source.
7+
*/
8+
public interface DataSourceShutdown {
9+
/**
10+
* Shutdown the data source. The data source should emit a status event with a SHUTDOWN state as soon as possible.
11+
* If the data source has already completed, or is in the process of completing, this method should have no effect.
12+
*/
13+
void shutdown();
14+
}

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/FDv2SourceResult.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.launchdarkly.sdk.server.datasources;
2-
3-
import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ChangeSet;
42
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
3+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes;
54

65
/**
76
* This type is currently experimental and not subject to semantic versioning.
@@ -64,12 +63,12 @@ public Status(State state, DataSourceStatusProvider.ErrorInfo errorInfo) {
6463
}
6564
}
6665

67-
private final FDv2ChangeSet changeSet;
66+
private final DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet;
6867
private final Status status;
6968

7069
private final ResultType resultType;
7170

72-
private FDv2SourceResult(FDv2ChangeSet changeSet, Status status, ResultType resultType) {
71+
private FDv2SourceResult(DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet, Status status, ResultType resultType) {
7372
this.changeSet = changeSet;
7473
this.status = status;
7574
this.resultType = resultType;
@@ -87,7 +86,7 @@ public static FDv2SourceResult terminalError(DataSourceStatusProvider.ErrorInfo
8786
return new FDv2SourceResult(null, new Status(State.TERMINAL_ERROR, errorInfo), ResultType.STATUS);
8887
}
8988

90-
public static FDv2SourceResult changeSet(FDv2ChangeSet changeSet) {
89+
public static FDv2SourceResult changeSet(DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet) {
9190
return new FDv2SourceResult(changeSet, null, ResultType.CHANGE_SET);
9291
}
9392

@@ -104,7 +103,7 @@ public Status getStatus() {
104103
return status;
105104
}
106105

107-
public FDv2ChangeSet getChangeSet() {
106+
public DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> getChangeSet() {
108107
return changeSet;
109108
}
110109
}

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/Initializer.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.launchdarkly.sdk.server.datasources;
22

3+
import java.io.Closeable;
34
import java.util.concurrent.CompletableFuture;
45

56
/**
@@ -42,16 +43,10 @@
4243
* GOODBYE --> [*]
4344
* </pre>
4445
*/
45-
public interface Initializer {
46+
public interface Initializer extends DataSourceShutdown {
4647
/**
4748
* Run the initializer to completion.
4849
* @return The result of the initializer.
4950
*/
5051
CompletableFuture<FDv2SourceResult> run();
51-
52-
/**
53-
* Shutdown the initializer. The initializer should emit a status event with a SHUTDOWN state as soon as possible.
54-
* If the initializer has already completed, or is in the process of completing, this method should have no effect.
55-
*/
56-
void shutdown();
5752
}

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/Synchronizer.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
* INTERRUPTED --> RUNNING
4747
* </pre>
4848
*/
49-
public interface Synchronizer {
49+
public interface Synchronizer extends DataSourceShutdown {
5050
/**
5151
* Get the next result from the stream.
5252
* <p>
@@ -55,12 +55,4 @@ public interface Synchronizer {
5555
* @return a future that will complete when the next result is available
5656
*/
5757
CompletableFuture<FDv2SourceResult> next();
58-
59-
/**
60-
* Shutdown the synchronizer. The synchronizer should emit a status event with a SHUTDOWN state as soon as possible
61-
* and then stop producing further results. If the synchronizer involves a resource, such as a network connection,
62-
* then those resources should be released.
63-
* If the synchronizer has already completed, or is in the process of completing, this method should have no effect.
64-
*/
65-
void shutdown();
6658
}

0 commit comments

Comments
 (0)