Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: SSE #162

Merged
merged 10 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ protobuf {
}

ext {
retrofit_version = "2.9.0"
retrofit_version = "2.11.0"
jackson_version = "2.15.3"
swagger_annotations_version = "2.2.18"
lombok_version = "1.18.30"
Expand All @@ -142,6 +142,7 @@ ext {
mockito_core_version = "5.6.0"
protobuf_version = "3.24.4"
openfeature_version = "1.7.0"
eventsource_version = "4.1.1"
}

dependencies {
Expand All @@ -163,6 +164,7 @@ dependencies {
implementation("com.google.protobuf:protobuf-java:$protobuf_version")

implementation("dev.openfeature:sdk:$openfeature_version")
implementation("com.launchdarkly:okhttp-eventsource:$eventsource_version")

compileOnly("org.projectlombok:lombok:$lombok_version")

Expand Down Expand Up @@ -191,17 +193,17 @@ configurations {
task runLocalExample(type: JavaExec) {
description = "Run the local bucketing example"
classpath = sourceSets.examples.runtimeClasspath
main = 'com.devcycle.examples.LocalExample'
mainClass = 'com.devcycle.examples.LocalExample'
}

task runCloudExample(type: JavaExec) {
description = "Run the cloud bucketing example"
classpath = sourceSets.examples.runtimeClasspath
main = 'com.devcycle.examples.CloudExample'
mainClass = 'com.devcycle.examples.CloudExample'
}

task runOpenFeatureExample(type: JavaExec) {
description = "Run the OpenFeature example"
classpath = sourceSets.examples.runtimeClasspath
main = 'com.devcycle.examples.OpenFeatureExample'
mainClass = 'com.devcycle.examples.OpenFeatureExample'
}
9 changes: 7 additions & 2 deletions src/examples/java/com/devcycle/examples/LocalExample.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.devcycle.examples;

import com.devcycle.sdk.server.common.logging.SimpleDevCycleLogger;
import com.devcycle.sdk.server.common.model.DevCycleUser;
import com.devcycle.sdk.server.local.api.DevCycleLocalClient;
import com.devcycle.sdk.server.local.model.DevCycleLocalOptions;
Expand All @@ -22,8 +23,11 @@ public static void main(String[] args) throws InterruptedException {
// The default value can be of type string, boolean, number, or JSON
Boolean defaultValue = false;

DevCycleLocalOptions options = DevCycleLocalOptions.builder().configPollingIntervalMs(60000)
.disableAutomaticEventLogging(false).disableCustomEventLogging(false).build();
DevCycleLocalOptions options = DevCycleLocalOptions.builder()
.configPollingIntervalMS(60000)
.customLogger(new SimpleDevCycleLogger(SimpleDevCycleLogger.Level.DEBUG))
.enableBetaRealtimeUpdates(true)
.build();

// Initialize DevCycle Client
DevCycleLocalClient client = new DevCycleLocalClient(server_sdk_key, options);
Expand All @@ -46,5 +50,6 @@ public static void main(String[] args) throws InterruptedException {
} else {
System.out.println("feature is NOT enabled");
}
Thread.sleep(10000);
JamieSinn marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class ProjectConfig {

@Schema(description = "Project Settings")
private Object project;

Expand All @@ -30,4 +31,8 @@ public class ProjectConfig {

@Schema(description = "Variable Hashes for all Variables in this Project")
private Object variableHashes;
}

@Schema(description = "SSE Configuration")
private SSE sse;
}

18 changes: 18 additions & 0 deletions src/main/java/com/devcycle/sdk/server/common/model/SSE.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.devcycle.sdk.server.common.model;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class SSE {
private String hostname;
private String path;
}

18 changes: 18 additions & 0 deletions src/main/java/com/devcycle/sdk/server/common/model/SSEMessage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.devcycle.sdk.server.common.model;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class SSEMessage {
private String etag;
private double lastModified;
private String type;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@
import com.devcycle.sdk.server.common.api.IDevCycleApi;
import com.devcycle.sdk.server.common.exception.DevCycleException;
import com.devcycle.sdk.server.common.logging.DevCycleLogger;
import com.devcycle.sdk.server.common.model.ErrorResponse;
import com.devcycle.sdk.server.common.model.HttpResponseCode;
import com.devcycle.sdk.server.common.model.ProjectConfig;
import com.devcycle.sdk.server.common.model.*;
import com.devcycle.sdk.server.local.api.DevCycleLocalApiClient;
import com.devcycle.sdk.server.local.bucketing.LocalBucketing;
import com.devcycle.sdk.server.local.model.DevCycleLocalOptions;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.launchdarkly.eventsource.FaultEvent;
import com.launchdarkly.eventsource.MessageEvent;
import com.launchdarkly.eventsource.StartedEvent;
import retrofit2.Call;
import retrofit2.Response;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.Executors;
Expand All @@ -26,46 +29,52 @@ public final class EnvironmentConfigManager {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final int DEFAULT_POLL_INTERVAL_MS = 30000;
private static final int MIN_INTERVALS_MS = 1000;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new DaemonThreadFactory());
private ScheduledExecutorService scheduler;
private final IDevCycleApi configApiClient;
private final LocalBucketing localBucketing;
private SSEManager sseManager;
private boolean isSSEConnected = false;
private final DevCycleLocalOptions options;

private ProjectConfig config;
private String configETag = "";
private String configLastModified = "";

private final String sdkKey;
private final int pollingIntervalMS;
private static final int pollingIntervalSSEMS = 15 * 60 * 60 * 1000;
private boolean pollingEnabled = true;

public EnvironmentConfigManager(String sdkKey, LocalBucketing localBucketing, DevCycleLocalOptions options) {
this.sdkKey = sdkKey;
this.localBucketing = localBucketing;
this.options = options;

configApiClient = new DevCycleLocalApiClient(sdkKey, options).initialize();

int configPollingIntervalMS = options.getConfigPollingIntervalMS();
pollingIntervalMS = configPollingIntervalMS >= MIN_INTERVALS_MS ? configPollingIntervalMS
: DEFAULT_POLL_INTERVAL_MS;

setupScheduler();
scheduler = setupScheduler();
scheduler.scheduleAtFixedRate(getConfigRunnable, 0, this.pollingIntervalMS, TimeUnit.MILLISECONDS);
}

private void setupScheduler() {
Runnable getConfigRunnable = new Runnable() {
public void run() {
try {
if (pollingEnabled) {
getConfig();
}
} catch (DevCycleException e) {
DevCycleLogger.error("Failed to load config: " + e.getMessage());
private ScheduledExecutorService setupScheduler() {
return Executors.newScheduledThreadPool(1, new DaemonThreadFactory());
}

private final Runnable getConfigRunnable = new Runnable() {
public void run() {
try {
if (pollingEnabled) {
getConfig();
}
} catch (DevCycleException e) {
DevCycleLogger.error("Failed to load config: " + e.getMessage());
}
};

scheduler.scheduleAtFixedRate(getConfigRunnable, 0, this.pollingIntervalMS, TimeUnit.MILLISECONDS);
}
}
};

public boolean isConfigInitialized() {
return config != null;
Expand All @@ -74,9 +83,57 @@ public boolean isConfigInitialized() {
private ProjectConfig getConfig() throws DevCycleException {
Call<ProjectConfig> config = this.configApiClient.getConfig(this.sdkKey, this.configETag, this.configLastModified);
this.config = getResponseWithRetries(config, 1);
if (this.options.isEnableBetaRealtimeUpdates()) {
try {
URI uri = new URI(this.config.getSse().getHostname() + this.config.getSse().getPath());
if (sseManager == null) {
sseManager = new SSEManager(uri);
}
sseManager.restart(uri, this::handleSSEMessage, this::handleSSEError, this::handleSSEStarted);
} catch (URISyntaxException e) {
DevCycleLogger.warning("Failed to create SSEManager: " + e.getMessage());
}
}
return this.config;
}

private Void handleSSEMessage(MessageEvent messageEvent) {
DevCycleLogger.debug("Received message: " + messageEvent.getData());
if (!isSSEConnected)
{
handleSSEStarted(null);
}

String data = messageEvent.getData();
if (data == null || data.isEmpty() || data.equals("keepalive")) {
return null;
}
try {
SSEMessage message = OBJECT_MAPPER.readValue(data, SSEMessage.class);
if (message.getType() == null || message.getType().equals("refetchConfig") || message.getType().isEmpty()) {
DevCycleLogger.debug("Received refetchConfig message, fetching new config");
getConfigRunnable.run();
}
} catch (JsonProcessingException e) {
DevCycleLogger.warning("Failed to parse SSE message: " + e.getMessage());
}
return null;
}

private Void handleSSEError(FaultEvent faultEvent) {
DevCycleLogger.warning("Received error: " + faultEvent.getCause());
return null;
}

private Void handleSSEStarted(StartedEvent startedEvent) {
isSSEConnected = true;
DevCycleLogger.debug("SSE Connected - setting polling interval to " + pollingIntervalSSEMS);
scheduler.shutdown();
scheduler = setupScheduler();
scheduler.scheduleAtFixedRate(getConfigRunnable, 0, pollingIntervalSSEMS, TimeUnit.MILLISECONDS);
return null;
}

private ProjectConfig getResponseWithRetries(Call<ProjectConfig> call, int maxRetries) throws DevCycleException {
// attempt 0 is the initial request, attempt > 0 are all retries
int attempt = 0;
Expand Down Expand Up @@ -206,6 +263,9 @@ private void stopPolling() {
}

public void cleanup() {
if (sseManager != null) {
sseManager.close();
}
stopPolling();
}
}
Loading
Loading