Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public final class Config {
static final int DEFAULT_MAX_CACHE_SIZE = 1000;
static final int DEFAULT_OFFLINE_POLL_MS = 5000;
static final long DEFAULT_KEEP_ALIVE = 0;
static final String DEFAULT_REINITIALIZE_ON_ERROR = "false";

static final String RESOLVER_ENV_VAR = "FLAGD_RESOLVER";
static final String HOST_ENV_VAR_NAME = "FLAGD_HOST";
Expand Down Expand Up @@ -51,6 +52,7 @@ public final class Config {
static final String KEEP_ALIVE_MS_ENV_VAR_NAME = "FLAGD_KEEP_ALIVE_TIME_MS";
static final String TARGET_URI_ENV_VAR_NAME = "FLAGD_TARGET_URI";
static final String STREAM_RETRY_GRACE_PERIOD = "FLAGD_RETRY_GRACE_PERIOD";
static final String REINITIALIZE_ON_ERROR_ENV_VAR_NAME = "FLAGD_REINITIALIZE_ON_ERROR";

static final String RESOLVER_RPC = "rpc";
static final String RESOLVER_IN_PROCESS = "in-process";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,16 @@ public class FlagdOptions {
@Builder.Default
private String defaultAuthority = fallBackToEnvOrDefault(Config.DEFAULT_AUTHORITY_ENV_VAR_NAME, null);

/**
* !EXPERIMENTAL!
* Whether to reinitialize the channel (TCP connection) after the grace period is exceeded.
* This can help recover from connection issues by creating fresh connections.
* Particularly useful for troubleshooting network issues related to proxies or service meshes.
*/
@Builder.Default
private boolean reinitializeOnError = Boolean.parseBoolean(
fallBackToEnvOrDefault(Config.REINITIALIZE_ON_ERROR_ENV_VAR_NAME, Config.DEFAULT_REINITIALIZE_ON_ERROR));

/**
* Builder overwrite in order to customize the "build" method.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class InProcessResolver implements Resolver {
private final Consumer<FlagdProviderEvent> onConnectionEvent;
private final Operator operator;
private final String scope;
private final QueueSource queueSource;

/**
* Resolves flag values using
Expand All @@ -52,7 +53,8 @@ public class InProcessResolver implements Resolver {
* connection/stream
*/
public InProcessResolver(FlagdOptions options, Consumer<FlagdProviderEvent> onConnectionEvent) {
this.flagStore = new FlagStore(getConnector(options, onConnectionEvent));
this.queueSource = getQueueSource(options, onConnectionEvent);
this.flagStore = new FlagStore(queueSource);
this.onConnectionEvent = onConnectionEvent;
this.operator = new Operator();
this.scope = options.getSelector();
Expand Down Expand Up @@ -94,6 +96,19 @@ public void init() throws Exception {
stateWatcher.start();
}

/**
* Called when the provider enters error state after grace period.
* Attempts to reinitialize the sync connector if enabled.
*/
@Override
public void onError() {
if (queueSource instanceof SyncStreamQueueSource) {
SyncStreamQueueSource syncConnector = (SyncStreamQueueSource) queueSource;
// only reinitialize if option is enabled
syncConnector.reinitializeChannelComponents();
}
}

/**
* Shutdown in-process resolver.
*
Expand Down Expand Up @@ -147,7 +162,7 @@ public ProviderEvaluation<Value> objectEvaluation(String key, Value defaultValue
.build();
}

static QueueSource getConnector(final FlagdOptions options, Consumer<FlagdProviderEvent> onConnectionEvent) {
static QueueSource getQueueSource(final FlagdOptions options, Consumer<FlagdProviderEvent> onConnectionEvent) {
if (options.getCustomConnector() != null) {
return options.getCustomConnector();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@
import lombok.extern.slf4j.Slf4j;

/**
* Implements the {@link QueueSource} contract and emit flags obtained from flagd sync gRPC contract.
* Implements the {@link QueueSource} contract and emit flags obtained from
* flagd sync gRPC contract.
*/
@Slf4j
@SuppressFBWarnings(
value = {"EI_EXPOSE_REP"},
justification = "Random is used to generate a variation & flag configurations require exposing")
justification = "We need to expose the BlockingQueue to allow consumers to read from it")
public class SyncStreamQueueSource implements QueueSource {
private static final int QUEUE_SIZE = 5;

Expand All @@ -45,13 +46,32 @@ public class SyncStreamQueueSource implements QueueSource {
private final String selector;
private final String providerId;
private final boolean syncMetadataDisabled;
private final ChannelConnector channelConnector;
private final boolean reinitializeOnError;
private final FlagdOptions options;
private final Consumer<FlagdProviderEvent> onConnectionEvent;
private final BlockingQueue<QueuePayload> outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
private final FlagSyncServiceStub flagSyncStub;
private final FlagSyncServiceBlockingStub metadataStub;
private volatile GrpcComponents grpcComponents;

/**
* Creates a new SyncStreamQueueSource responsible for observing the event stream.
* Container for gRPC components to ensure atomicity during reinitialization.
* All three components are updated together to prevent consumers from seeing
* an inconsistent state where components are from different channel instances.
*/
private static class GrpcComponents {
final ChannelConnector channelConnector;
final FlagSyncServiceStub flagSyncStub;
final FlagSyncServiceBlockingStub metadataStub;

GrpcComponents(ChannelConnector connector, FlagSyncServiceStub stub, FlagSyncServiceBlockingStub blockingStub) {
this.channelConnector = connector;
this.flagSyncStub = stub;
this.metadataStub = blockingStub;
}
}

/**
* Creates a new SyncStreamQueueSource responsible for observing the event
* stream.
*/
public SyncStreamQueueSource(final FlagdOptions options, Consumer<FlagdProviderEvent> onConnectionEvent) {
streamDeadline = options.getStreamDeadlineMs();
Expand All @@ -60,11 +80,10 @@ public SyncStreamQueueSource(final FlagdOptions options, Consumer<FlagdProviderE
providerId = options.getProviderId();
maxBackoffMs = options.getRetryBackoffMaxMs();
syncMetadataDisabled = options.isSyncMetadataDisabled();
channelConnector = new ChannelConnector(options, onConnectionEvent, ChannelBuilder.nettyChannel(options));
flagSyncStub =
FlagSyncServiceGrpc.newStub(channelConnector.getChannel()).withWaitForReady();
metadataStub = FlagSyncServiceGrpc.newBlockingStub(channelConnector.getChannel())
.withWaitForReady();
reinitializeOnError = options.isReinitializeOnError();
this.options = options;
this.onConnectionEvent = onConnectionEvent;
initializeChannelComponents();
}

// internal use only
Expand All @@ -77,16 +96,57 @@ protected SyncStreamQueueSource(
deadline = options.getDeadline();
selector = options.getSelector();
providerId = options.getProviderId();
channelConnector = connectorMock;
maxBackoffMs = options.getRetryBackoffMaxMs();
flagSyncStub = stubMock;
syncMetadataDisabled = options.isSyncMetadataDisabled();
metadataStub = blockingStubMock;
reinitializeOnError = options.isReinitializeOnError();
this.options = options;
this.onConnectionEvent = e -> {};
this.grpcComponents = new GrpcComponents(connectorMock, stubMock, blockingStubMock);
}

/** Initialize channel connector and stubs. */
private synchronized void initializeChannelComponents() {
ChannelConnector newConnector =
new ChannelConnector(options, onConnectionEvent, ChannelBuilder.nettyChannel(options));
FlagSyncServiceStub newFlagSyncStub =
FlagSyncServiceGrpc.newStub(newConnector.getChannel()).withWaitForReady();
FlagSyncServiceBlockingStub newMetadataStub =
FlagSyncServiceGrpc.newBlockingStub(newConnector.getChannel()).withWaitForReady();

// atomic assignment of all components as a single unit
grpcComponents = new GrpcComponents(newConnector, newFlagSyncStub, newMetadataStub);
}

/** Reinitialize channel connector and stubs on error. */
public synchronized void reinitializeChannelComponents() {
if (!reinitializeOnError || shutdown.get()) {
return;
}

log.info("Reinitializing channel gRPC components in attempt to restore stream.");
GrpcComponents oldComponents = grpcComponents;

try {
// create new channel components first
initializeChannelComponents();
Copy link
Member

@guidobrei guidobrei Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to call grpcComponents.channelConnector.initialize(); after reinitializing the GrpcComponents object, like we do in the init() method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good eye... but... I think I discovered something earlier about this that I lost track of and forgot to mention:

I think the initialize method in the ChannelConnector is actually very well-hidden dead code. I noticed earlier that it basically ONLY sets up a channel monitor that emits events for channel disconnections, but we already will get such events from the stream handler itself.

I think this is why my manual testing had no issue not including the call you suggest above, and same with our e2e test suite. In fact, if I completely comment out all the functionality of the ChannelConnector.initialize method, our entire e2e suite runs fine, including the assertion that we get disconnect events and other stream events.

Unless I'm missing something which we have no tests for, I think this method can be deleted.

cc @aepfli do you know anything about this?

} catch (Exception e) {
log.error("Failed to reinitialize channel components", e);
return;
}

// shutdown old connector after successful reinitialization
if (oldComponents != null && oldComponents.channelConnector != null) {
try {
oldComponents.channelConnector.shutdown();
} catch (Exception e) {
log.debug("Error shutting down old channel connector during reinitialization", e);
}
}
}

/** Initialize sync stream connector. */
public void init() throws Exception {
channelConnector.initialize();
grpcComponents.channelConnector.initialize();
Thread listener = new Thread(this::observeSyncStream);
listener.setDaemon(true);
listener.start();
Expand All @@ -109,7 +169,7 @@ public void shutdown() throws InterruptedException {
log.debug("Shutdown already in progress or completed");
return;
}
this.channelConnector.shutdown();
grpcComponents.channelConnector.shutdown();
}

/** Contains blocking calls, to be used concurrently. */
Expand Down Expand Up @@ -159,13 +219,14 @@ private void observeSyncStream() {
log.info("Shutdown invoked, exiting event stream listener");
}

// TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584
// TODO: remove the metadata call entirely after
// https://github.com/open-feature/flagd/issues/1584
private Struct getMetadata() {
if (syncMetadataDisabled) {
return null;
}

FlagSyncServiceBlockingStub localStub = metadataStub;
FlagSyncServiceBlockingStub localStub = grpcComponents.metadataStub;

if (deadline > 0) {
localStub = localStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS);
Expand All @@ -180,7 +241,8 @@ private Struct getMetadata() {

return null;
} catch (StatusRuntimeException e) {
// In newer versions of flagd, metadata is part of the sync stream. If the method is unimplemented, we
// In newer versions of flagd, metadata is part of the sync stream. If the
// method is unimplemented, we
// can ignore the error
if (e.getStatus() != null
&& Status.Code.UNIMPLEMENTED.equals(e.getStatus().getCode())) {
Expand All @@ -192,7 +254,7 @@ private Struct getMetadata() {
}

private void syncFlags(SyncStreamObserver streamObserver) {
FlagSyncServiceStub localStub = flagSyncStub; // don't mutate the stub
FlagSyncServiceStub localStub = grpcComponents.flagSyncStub; // don't mutate the stub
if (streamDeadline > 0) {
localStub = localStub.withDeadlineAfter(streamDeadline, TimeUnit.MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import dev.openfeature.contrib.providers.flagd.Config;
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
Expand Down Expand Up @@ -51,6 +54,26 @@
import org.junit.jupiter.api.Test;

class InProcessResolverTest {
@Test
void onError_delegatesToQueueSource() throws Exception {
// given
FlagdOptions options = FlagdOptions.builder().build(); // option value doesn't matter here
SyncStreamQueueSource mockConnector = mock(SyncStreamQueueSource.class);
InProcessResolver resolver = new InProcessResolver(options, e -> {});

// Inject mock connector
java.lang.reflect.Field queueSourceField = InProcessResolver.class.getDeclaredField("queueSource");
queueSourceField.setAccessible(true);
queueSourceField.set(resolver, mockConnector);

// when
resolver.onError();

// then
// InProcessResolver should always delegate to the queue source.
// The decision to re-initialize or not is handled within SyncStreamQueueSource.
verify(mockConnector, times(1)).reinitializeChannelComponents();
}

@Test
public void connectorSetup() {
Expand All @@ -70,9 +93,9 @@ public void connectorSetup() {
.build();

// then
assertInstanceOf(SyncStreamQueueSource.class, InProcessResolver.getConnector(forGrpcOptions, e -> {}));
assertInstanceOf(FileQueueSource.class, InProcessResolver.getConnector(forOfflineOptions, e -> {}));
assertInstanceOf(MockConnector.class, InProcessResolver.getConnector(forCustomConnectorOptions, e -> {}));
assertInstanceOf(SyncStreamQueueSource.class, InProcessResolver.getQueueSource(forGrpcOptions, e -> {}));
assertInstanceOf(FileQueueSource.class, InProcessResolver.getQueueSource(forOfflineOptions, e -> {}));
assertInstanceOf(MockConnector.class, InProcessResolver.getQueueSource(forCustomConnectorOptions, e -> {}));
}

@Test
Expand Down
Loading
Loading