diff --git a/fluxgate-samples/fluxgate-sample-standalone/elk-docker-compose.yml b/docker/elk.yml
similarity index 100%
rename from fluxgate-samples/fluxgate-sample-standalone/elk-docker-compose.yml
rename to docker/elk.yml
diff --git a/docker/full.yml b/docker/full.yml
new file mode 100644
index 0000000..d71696d
--- /dev/null
+++ b/docker/full.yml
@@ -0,0 +1,96 @@
+version: "3.8"
+
+# Full FluxGate Development Environment
+# Includes: MongoDB, Redis (standalone), ELK Stack
+#
+# Usage:
+# docker-compose -f docker/full.yml up -d
+# docker-compose -f docker/full.yml down
+#
+# Services:
+# - MongoDB: localhost:27017 (user: fluxgate, password: fluxgate123)
+# - Redis: localhost:6379
+# - Elasticsearch: localhost:9200
+# - Kibana: localhost:5601
+# - Logstash: localhost:5044 (TCP input), localhost:9600 (monitoring)
+
+services:
+ # ============================================
+ # MongoDB - Rule Storage
+ # ============================================
+ mongo:
+ image: mongo:7.0
+ container_name: fluxgate-mongo
+ ports:
+ - "27017:27017"
+ environment:
+ MONGO_INITDB_ROOT_USERNAME: fluxgate
+ MONGO_INITDB_ROOT_PASSWORD: fluxgate123
+ MONGO_INITDB_DATABASE: fluxgate
+ volumes:
+ - mongo-data:/data/db
+
+ # ============================================
+ # Redis Standalone - Rate Limiting
+ # ============================================
+ redis:
+ image: redis:7.2-alpine
+ container_name: fluxgate-redis
+ ports:
+ - "6379:6379"
+
+ # ============================================
+ # ELK Stack - Logging & Monitoring
+ # ============================================
+ elasticsearch:
+ image: docker.elastic.co/elasticsearch/elasticsearch:8.12.2
+ container_name: elk-elasticsearch
+ environment:
+ - discovery.type=single-node
+ - xpack.security.enabled=false
+ - ES_JAVA_OPTS=-Xms1g -Xmx1g
+ ports:
+ - "9200:9200"
+ volumes:
+ - es-data:/usr/share/elasticsearch/data
+
+ kibana:
+ image: docker.elastic.co/kibana/kibana:8.12.2
+ container_name: elk-kibana
+ depends_on:
+ - elasticsearch
+ environment:
+ - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
+ ports:
+ - "5601:5601"
+
+ logstash:
+ image: docker.elastic.co/logstash/logstash:8.12.2
+ container_name: elk-logstash
+ depends_on:
+ - elasticsearch
+ ports:
+ - "5044:5044" # TCP input
+ - "9600:9600" # Logstash monitoring API
+ environment:
+ - XPACK_MONITORING_ENABLED=false
+ - PIPELINE_WORKERS=1
+ command: >
+ logstash -e '
+ input {
+ tcp {
+ port => 5044
+ codec => json_lines
+ }
+ }
+ output {
+ elasticsearch {
+ hosts => ["http://elasticsearch:9200"]
+ index => "fluxgate-logs-%{+YYYY.MM.dd}"
+ }
+ }
+ '
+
+volumes:
+ mongo-data:
+ es-data:
diff --git a/docker/mongo.yml b/docker/mongo.yml
new file mode 100644
index 0000000..5e2f94e
--- /dev/null
+++ b/docker/mongo.yml
@@ -0,0 +1,17 @@
+version: "3.8"
+
+services:
+ mongo:
+ image: mongo:7.0
+ container_name: fluxgate-mongo
+ ports:
+ - "27017:27017"
+ environment:
+ MONGO_INITDB_ROOT_USERNAME: fluxgate
+ MONGO_INITDB_ROOT_PASSWORD: fluxgate123
+ MONGO_INITDB_DATABASE: fluxgate
+ volumes:
+ - mongo-data:/data/db
+
+volumes:
+ mongo-data:
diff --git a/docker/redis-cluster.yml b/docker/redis-cluster.yml
new file mode 100644
index 0000000..453e07b
--- /dev/null
+++ b/docker/redis-cluster.yml
@@ -0,0 +1,11 @@
+version: "3.8"
+
+services:
+ redis-cluster:
+ image: grokzen/redis-cluster:7.0.10
+ container_name: redis-cluster-test
+ environment:
+ IP: 0.0.0.0
+ INITIAL_PORT: 7100
+ ports:
+ - "7100-7105:7100-7105"
diff --git a/docker/redis-standalone.yml b/docker/redis-standalone.yml
new file mode 100644
index 0000000..6557be6
--- /dev/null
+++ b/docker/redis-standalone.yml
@@ -0,0 +1,8 @@
+version: "3.8"
+
+services:
+ redis:
+ image: redis:7.2-alpine
+ container_name: fluxgate-redis
+ ports:
+ - "6379:6379"
diff --git a/fluxgate-control-support/pom.xml b/fluxgate-control-support/pom.xml
new file mode 100644
index 0000000..f1e1f15
--- /dev/null
+++ b/fluxgate-control-support/pom.xml
@@ -0,0 +1,135 @@
+
+
When applied to a method, the {@link RuleChangeAspect} will automatically call {@link + * org.fluxgate.control.notify.RuleChangeNotifier#notifyFullReload()} after the method completes + * successfully. + * + *
Use this annotation for operations that affect multiple rules or when the specific rule set ID + * is not available. + * + *
Example usage: + * + *
{@code
+ * @Service
+ * public class RuleManagementService {
+ *
+ * @NotifyFullReload
+ * public void deleteAllRules() {
+ * mongoRepository.deleteAll();
+ * // Full reload notification is sent automatically
+ * }
+ *
+ * @NotifyFullReload
+ * public void importRules(List rules) {
+ * mongoRepository.deleteAll();
+ * mongoRepository.saveAll(rules);
+ * }
+ *
+ * @NotifyFullReload
+ * public void resetToDefaults() {
+ * mongoRepository.deleteAll();
+ * mongoRepository.saveAll(defaultRules);
+ * }
+ * }
+ * }
+ *
+ * @see NotifyRuleChange
+ * @see RuleChangeAspect
+ */
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+public @interface NotifyFullReload {}
diff --git a/fluxgate-control-support/src/main/java/org/fluxgate/control/aop/NotifyRuleChange.java b/fluxgate-control-support/src/main/java/org/fluxgate/control/aop/NotifyRuleChange.java
new file mode 100644
index 0000000..92d38d8
--- /dev/null
+++ b/fluxgate-control-support/src/main/java/org/fluxgate/control/aop/NotifyRuleChange.java
@@ -0,0 +1,75 @@
+package org.fluxgate.control.aop;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation to automatically notify FluxGate instances when a rule is changed.
+ *
+ * When applied to a method, the {@link RuleChangeAspect} will automatically call {@link + * org.fluxgate.control.notify.RuleChangeNotifier#notifyChange(String)} after the method completes + * successfully. + * + *
The {@code ruleSetId} attribute supports Spring Expression Language (SpEL) to extract the rule + * set ID from method parameters. + * + *
Example usage: + * + *
{@code
+ * @Service
+ * public class RuleManagementService {
+ *
+ * @NotifyRuleChange(ruleSetId = "#ruleSetId")
+ * public void updateRule(String ruleSetId, RuleDto dto) {
+ * mongoRepository.save(dto);
+ * // Notification is sent automatically after this method returns
+ * }
+ *
+ * @NotifyRuleChange(ruleSetId = "#dto.ruleSetId")
+ * public void saveRule(RuleDto dto) {
+ * mongoRepository.save(dto);
+ * }
+ *
+ * @NotifyRuleChange(ruleSetId = "#result.id")
+ * public Rule createRule(RuleDto dto) {
+ * return mongoRepository.save(dto);
+ * // Uses the returned object's id
+ * }
+ * }
+ * }
+ *
+ * SpEL expressions can reference: + * + *
Examples: + * + *
This aspect intercepts methods annotated with rule change annotations and automatically + * notifies FluxGate instances after successful method execution. + * + *
For {@link NotifyRuleChange}, it evaluates the SpEL expression to extract the rule set ID from + * method parameters or return value. + * + *
Example: + * + *
{@code
+ * @NotifyRuleChange(ruleSetId = "#ruleSetId")
+ * public void updateRule(String ruleSetId, RuleDto dto) {
+ * // After this method returns successfully,
+ * // notifier.notifyChange(ruleSetId) is called automatically
+ * }
+ * }
+ */
+@Aspect
+public class RuleChangeAspect {
+
+ private static final Logger log = LoggerFactory.getLogger(RuleChangeAspect.class);
+
+ private final RuleChangeNotifier notifier;
+ private final ExpressionParser parser = new SpelExpressionParser();
+ private final ParameterNameDiscoverer parameterNameDiscoverer =
+ new DefaultParameterNameDiscoverer();
+
+ public RuleChangeAspect(RuleChangeNotifier notifier) {
+ this.notifier = notifier;
+ log.info("RuleChangeAspect initialized");
+ }
+
+ /**
+ * Handles methods annotated with {@link NotifyRuleChange}.
+ *
+ * After the method returns successfully, extracts the rule set ID using the SpEL expression + * and notifies all FluxGate instances. + * + * @param joinPoint the join point + * @param annotation the annotation + * @param result the return value of the method + */ + @AfterReturning( + pointcut = "@annotation(annotation)", + returning = "result", + argNames = "joinPoint,annotation,result") + public void afterRuleChange(JoinPoint joinPoint, NotifyRuleChange annotation, Object result) { + try { + String ruleSetId = extractRuleSetId(joinPoint, annotation.ruleSetId(), result); + + if (ruleSetId == null || ruleSetId.isBlank()) { + log.warn( + "Could not extract ruleSetId from expression '{}' in method {}", + annotation.ruleSetId(), + joinPoint.getSignature().getName()); + return; + } + + log.debug( + "Notifying rule change for ruleSetId={} from method {}", + ruleSetId, + joinPoint.getSignature().getName()); + + notifier.notifyChange(ruleSetId); + + } catch (Exception e) { + log.error( + "Failed to notify rule change for method {}: {}", + joinPoint.getSignature().getName(), + e.getMessage(), + e); + // Don't rethrow - notification failure should not fail the business operation + } + } + + /** + * Handles methods annotated with {@link NotifyFullReload}. + * + *
After the method returns successfully, notifies all FluxGate instances to perform a full + * reload. + * + * @param joinPoint the join point + * @param annotation the annotation + */ + @AfterReturning(pointcut = "@annotation(annotation)", argNames = "joinPoint,annotation") + public void afterFullReload(JoinPoint joinPoint, NotifyFullReload annotation) { + try { + log.debug("Notifying full reload from method {}", joinPoint.getSignature().getName()); + + notifier.notifyFullReload(); + + } catch (Exception e) { + log.error( + "Failed to notify full reload for method {}: {}", + joinPoint.getSignature().getName(), + e.getMessage(), + e); + // Don't rethrow - notification failure should not fail the business operation + } + } + + /** + * Extracts the rule set ID from the SpEL expression. + * + * @param joinPoint the join point + * @param expression the SpEL expression + * @param result the return value of the method + * @return the extracted rule set ID, or null if extraction fails + */ + private String extractRuleSetId(JoinPoint joinPoint, String expression, Object result) { + MethodSignature signature = (MethodSignature) joinPoint.getSignature(); + Method method = signature.getMethod(); + Object target = joinPoint.getTarget(); + Object[] args = joinPoint.getArgs(); + + EvaluationContext context = + new MethodBasedEvaluationContext(target, method, args, parameterNameDiscoverer); + + // Add result to context for expressions like #result.id + context.setVariable("result", result); + + Object value = parser.parseExpression(expression).getValue(context); + + return value != null ? value.toString() : null; + } +} diff --git a/fluxgate-control-support/src/main/java/org/fluxgate/control/autoconfigure/ControlSupportAutoConfiguration.java b/fluxgate-control-support/src/main/java/org/fluxgate/control/autoconfigure/ControlSupportAutoConfiguration.java new file mode 100644 index 0000000..43b2eb8 --- /dev/null +++ b/fluxgate-control-support/src/main/java/org/fluxgate/control/autoconfigure/ControlSupportAutoConfiguration.java @@ -0,0 +1,107 @@ +package org.fluxgate.control.autoconfigure; + +import org.fluxgate.control.aop.RuleChangeAspect; +import org.fluxgate.control.notify.RedisRuleChangeNotifier; +import org.fluxgate.control.notify.RuleChangeNotifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.EnableAspectJAutoProxy; + +/** + * Auto-configuration for FluxGate Control Support. + * + *
Automatically configures: + * + *
Example usage in application.yml: + * + *
+ * fluxgate: + * control: + * redis: + * uri: redis://localhost:6379 + * channel: fluxgate:rule-reload + *+ * + *
Example usage with annotations: + * + *
{@code
+ * @Service
+ * public class RuleManagementService {
+ *
+ * @NotifyRuleChange(ruleSetId = "#ruleSetId")
+ * public void updateRule(String ruleSetId, RuleDto dto) {
+ * mongoRepository.save(dto);
+ * }
+ *
+ * @NotifyFullReload
+ * public void deleteAllRules() {
+ * mongoRepository.deleteAll();
+ * }
+ * }
+ * }
+ */
+@AutoConfiguration
+@ConditionalOnClass(name = "io.lettuce.core.RedisClient")
+@ConditionalOnProperty(prefix = "fluxgate.control.redis", name = "uri")
+@EnableConfigurationProperties(ControlSupportProperties.class)
+@EnableAspectJAutoProxy
+public class ControlSupportAutoConfiguration {
+
+ private static final Logger log = LoggerFactory.getLogger(ControlSupportAutoConfiguration.class);
+
+ /**
+ * Creates the Redis-based rule change notifier.
+ *
+ * @param properties the configuration properties
+ * @return the notifier instance
+ */
+ @Bean
+ @ConditionalOnMissingBean(RuleChangeNotifier.class)
+ public RuleChangeNotifier ruleChangeNotifier(ControlSupportProperties properties) {
+ ControlSupportProperties.RedisProperties redis = properties.getRedis();
+
+ log.info(
+ "Creating RedisRuleChangeNotifier: uri={}, channel={}, source={}",
+ redis.getUri(),
+ redis.getChannel(),
+ properties.getSource());
+
+ return new RedisRuleChangeNotifier(
+ redis.getUri(), redis.getChannel(), redis.getTimeout(), properties.getSource());
+ }
+
+ /**
+ * Creates the AOP aspect for rule change annotations.
+ *
+ * Only created when: + * + *
Example configuration: + * + *
+ * fluxgate: + * control: + * redis: + * uri: redis://localhost:6379 + * channel: fluxgate:rule-reload + * timeout: 5s + * source: my-admin-app + *+ */ +@ConfigurationProperties(prefix = "fluxgate.control") +public class ControlSupportProperties { + + private final RedisProperties redis = new RedisProperties(); + + /** Source identifier for notifications (appears in messages). */ + private String source = "fluxgate-control"; + + public RedisProperties getRedis() { + return redis; + } + + public String getSource() { + return source; + } + + public void setSource(String source) { + this.source = source; + } + + /** Redis configuration for rule change notifications. */ + public static class RedisProperties { + + /** Redis URI (e.g., "redis://localhost:6379"). For cluster, use comma-separated URIs. */ + private String uri = "redis://localhost:6379"; + + /** Pub/Sub channel name for rule change notifications. */ + private String channel = "fluxgate:rule-reload"; + + /** Connection timeout. */ + private Duration timeout = Duration.ofSeconds(5); + + public String getUri() { + return uri; + } + + public void setUri(String uri) { + this.uri = uri; + } + + public String getChannel() { + return channel; + } + + public void setChannel(String channel) { + this.channel = channel; + } + + public Duration getTimeout() { + return timeout; + } + + public void setTimeout(Duration timeout) { + this.timeout = timeout; + } + } +} diff --git a/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RedisRuleChangeNotifier.java b/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RedisRuleChangeNotifier.java new file mode 100644 index 0000000..d430b3e --- /dev/null +++ b/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RedisRuleChangeNotifier.java @@ -0,0 +1,212 @@ +package org.fluxgate.control.notify; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.cluster.RedisClusterClient; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Redis Pub/Sub implementation of {@link RuleChangeNotifier}. + * + *
Publishes rule change notifications to a Redis channel. All FluxGate instances subscribed to + * this channel will receive the notification and invalidate their local caches. + * + *
Supports both standalone Redis and Redis Cluster configurations. + * + *
Example usage: + * + *
{@code
+ * // Standalone Redis
+ * RuleChangeNotifier notifier = new RedisRuleChangeNotifier(
+ * "redis://localhost:6379",
+ * "fluxgate:rule-reload"
+ * );
+ *
+ * // Notify specific rule change
+ * notifier.notifyChange("my-rule-set-id");
+ *
+ * // Notify full reload
+ * notifier.notifyFullReload();
+ *
+ * // Cleanup
+ * notifier.close();
+ * }
+ */
+public class RedisRuleChangeNotifier implements RuleChangeNotifier {
+
+ private static final Logger log = LoggerFactory.getLogger(RedisRuleChangeNotifier.class);
+ private static final String DEFAULT_SOURCE = "fluxgate-control";
+
+ private final String redisUri;
+ private final String channel;
+ private final Duration timeout;
+ private final String source;
+ private final ObjectMapper objectMapper;
+ private final boolean isCluster;
+
+ private volatile RedisClient redisClient;
+ private volatile RedisClusterClient redisClusterClient;
+ private volatile StatefulRedisConnectionThis class is serialized to JSON and published via Redis Pub/Sub. FluxGate instances + * subscribed to the channel will deserialize this message and take appropriate action. + */ +public class RuleChangeMessage { + + private final String ruleSetId; + private final boolean fullReload; + private final long timestamp; + private final String source; + + @JsonCreator + public RuleChangeMessage( + @JsonProperty("ruleSetId") String ruleSetId, + @JsonProperty("fullReload") boolean fullReload, + @JsonProperty("timestamp") long timestamp, + @JsonProperty("source") String source) { + this.ruleSetId = ruleSetId; + this.fullReload = fullReload; + this.timestamp = timestamp; + this.source = source; + } + + /** + * Creates a message for a specific rule set change. + * + * @param ruleSetId the ID of the changed rule set + * @param source identifier of the source application (e.g., "fluxgate-control") + * @return the change message + */ + public static RuleChangeMessage forRuleSet(String ruleSetId, String source) { + Objects.requireNonNull(ruleSetId, "ruleSetId must not be null"); + return new RuleChangeMessage(ruleSetId, false, Instant.now().toEpochMilli(), source); + } + + /** + * Creates a message for a full reload of all rules. + * + * @param source identifier of the source application (e.g., "fluxgate-control") + * @return the full reload message + */ + public static RuleChangeMessage fullReload(String source) { + return new RuleChangeMessage(null, true, Instant.now().toEpochMilli(), source); + } + + /** Returns the rule set ID, or null if this is a full reload. */ + public String getRuleSetId() { + return ruleSetId; + } + + /** Returns true if this is a full reload request. */ + public boolean isFullReload() { + return fullReload; + } + + /** Returns the timestamp when this message was created (epoch millis). */ + public long getTimestamp() { + return timestamp; + } + + /** Returns the source application identifier. */ + public String getSource() { + return source; + } + + @Override + public String toString() { + if (fullReload) { + return "RuleChangeMessage{fullReload=true, source='" + + source + + "', timestamp=" + + timestamp + + "}"; + } + return "RuleChangeMessage{ruleSetId='" + + ruleSetId + + "', source='" + + source + + "', timestamp=" + + timestamp + + "}"; + } +} diff --git a/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeNotificationException.java b/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeNotificationException.java new file mode 100644 index 0000000..9d45587 --- /dev/null +++ b/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeNotificationException.java @@ -0,0 +1,13 @@ +package org.fluxgate.control.notify; + +/** Exception thrown when a rule change notification fails to be published. */ +public class RuleChangeNotificationException extends RuntimeException { + + public RuleChangeNotificationException(String message) { + super(message); + } + + public RuleChangeNotificationException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeNotifier.java b/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeNotifier.java new file mode 100644 index 0000000..fb11025 --- /dev/null +++ b/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeNotifier.java @@ -0,0 +1,57 @@ +package org.fluxgate.control.notify; + +/** + * Interface for notifying FluxGate application servers about rule changes. + * + *
When rules are modified in the Admin/Studio application, this notifier broadcasts the change + * to all FluxGate instances so they can invalidate their local caches and reload the updated rules. + * + *
Example usage: + * + *
{@code
+ * @Service
+ * public class RuleManagementService {
+ * private final RuleChangeNotifier notifier;
+ *
+ * public void updateRule(String ruleSetId, RuleDto dto) {
+ * // 1. Save to database
+ * mongoRepository.save(dto);
+ *
+ * // 2. Notify all FluxGate instances
+ * notifier.notifyChange(ruleSetId);
+ * }
+ *
+ * public void deleteAllRules() {
+ * mongoRepository.deleteAll();
+ * notifier.notifyFullReload();
+ * }
+ * }
+ * }
+ */
+public interface RuleChangeNotifier {
+
+ /**
+ * Notifies all FluxGate instances that a specific rule set has changed.
+ *
+ * The instances will invalidate their local cache for this rule set and reload it from the + * database on the next request. + * + * @param ruleSetId the ID of the changed rule set + */ + void notifyChange(String ruleSetId); + + /** + * Notifies all FluxGate instances to perform a full reload of all rules. + * + *
Use this when multiple rules have changed or when performing bulk operations. All instances + * will invalidate their entire rule cache. + */ + void notifyFullReload(); + + /** + * Closes the notifier and releases any resources. + * + *
After calling this method, the notifier should not be used. + */ + void close(); +} diff --git a/fluxgate-control-support/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/fluxgate-control-support/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 0000000..affbfb4 --- /dev/null +++ b/fluxgate-control-support/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +org.fluxgate.control.autoconfigure.ControlSupportAutoConfiguration diff --git a/fluxgate-control-support/src/test/java/org/fluxgate/control/notify/RedisRuleChangeNotifierTest.java b/fluxgate-control-support/src/test/java/org/fluxgate/control/notify/RedisRuleChangeNotifierTest.java new file mode 100644 index 0000000..02b1afb --- /dev/null +++ b/fluxgate-control-support/src/test/java/org/fluxgate/control/notify/RedisRuleChangeNotifierTest.java @@ -0,0 +1,101 @@ +package org.fluxgate.control.notify; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.time.Duration; +import org.junit.jupiter.api.Test; + +class RedisRuleChangeNotifierTest { + + @Test + void shouldRequireRedisUri() { + assertThatThrownBy(() -> new RedisRuleChangeNotifier(null, "channel")) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("redisUri"); + } + + @Test + void shouldRequireChannel() { + assertThatThrownBy(() -> new RedisRuleChangeNotifier("redis://localhost:6379", null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("channel"); + } + + @Test + void shouldRequireTimeout() { + assertThatThrownBy( + () -> new RedisRuleChangeNotifier("redis://localhost:6379", "channel", null, "source")) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("timeout"); + } + + @Test + void shouldRequireSource() { + assertThatThrownBy( + () -> + new RedisRuleChangeNotifier( + "redis://localhost:6379", "channel", Duration.ofSeconds(5), null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("source"); + } + + @Test + void shouldDetectClusterMode() { + // Cluster mode is detected by comma-separated URIs + RedisRuleChangeNotifier notifier = + new RedisRuleChangeNotifier( + "redis://node1:6379,redis://node2:6379", "channel", Duration.ofSeconds(5), "source"); + + // Should not throw - cluster mode detected + assertThat(notifier).isNotNull(); + notifier.close(); + } + + @Test + void shouldThrowWhenClosedAndNotify() { + RedisRuleChangeNotifier notifier = + new RedisRuleChangeNotifier("redis://localhost:6379", "channel"); + notifier.close(); + + assertThatThrownBy(() -> notifier.notifyChange("rule-id")) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("closed"); + } + + @Test + void shouldThrowWhenClosedAndNotifyFullReload() { + RedisRuleChangeNotifier notifier = + new RedisRuleChangeNotifier("redis://localhost:6379", "channel"); + notifier.close(); + + assertThatThrownBy(() -> notifier.notifyFullReload()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("closed"); + } + + @Test + void shouldRequireRuleSetIdForNotifyChange() { + RedisRuleChangeNotifier notifier = + new RedisRuleChangeNotifier("redis://localhost:6379", "channel"); + + try { + assertThatThrownBy(() -> notifier.notifyChange(null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("ruleSetId"); + } finally { + notifier.close(); + } + } + + @Test + void shouldAllowMultipleClose() { + RedisRuleChangeNotifier notifier = + new RedisRuleChangeNotifier("redis://localhost:6379", "channel"); + + // Should not throw + notifier.close(); + notifier.close(); + notifier.close(); + } +} diff --git a/fluxgate-control-support/src/test/java/org/fluxgate/control/notify/RuleChangeMessageTest.java b/fluxgate-control-support/src/test/java/org/fluxgate/control/notify/RuleChangeMessageTest.java new file mode 100644 index 0000000..4808084 --- /dev/null +++ b/fluxgate-control-support/src/test/java/org/fluxgate/control/notify/RuleChangeMessageTest.java @@ -0,0 +1,82 @@ +package org.fluxgate.control.notify; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; + +class RuleChangeMessageTest { + + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Test + void shouldCreateMessageForRuleSet() { + RuleChangeMessage message = RuleChangeMessage.forRuleSet("test-rule", "test-source"); + + assertThat(message.getRuleSetId()).isEqualTo("test-rule"); + assertThat(message.isFullReload()).isFalse(); + assertThat(message.getSource()).isEqualTo("test-source"); + assertThat(message.getTimestamp()).isPositive(); + } + + @Test + void shouldCreateFullReloadMessage() { + RuleChangeMessage message = RuleChangeMessage.fullReload("test-source"); + + assertThat(message.getRuleSetId()).isNull(); + assertThat(message.isFullReload()).isTrue(); + assertThat(message.getSource()).isEqualTo("test-source"); + } + + @Test + void shouldThrowOnNullRuleSetId() { + assertThatThrownBy(() -> RuleChangeMessage.forRuleSet(null, "source")) + .isInstanceOf(NullPointerException.class); + } + + @Test + void shouldSerializeAndDeserialize() throws Exception { + RuleChangeMessage original = RuleChangeMessage.forRuleSet("my-rule", "studio"); + + String json = objectMapper.writeValueAsString(original); + RuleChangeMessage deserialized = objectMapper.readValue(json, RuleChangeMessage.class); + + assertThat(deserialized.getRuleSetId()).isEqualTo(original.getRuleSetId()); + assertThat(deserialized.isFullReload()).isEqualTo(original.isFullReload()); + assertThat(deserialized.getSource()).isEqualTo(original.getSource()); + assertThat(deserialized.getTimestamp()).isEqualTo(original.getTimestamp()); + } + + @Test + void shouldSerializeFullReloadMessage() throws Exception { + RuleChangeMessage original = RuleChangeMessage.fullReload("admin"); + + String json = objectMapper.writeValueAsString(original); + RuleChangeMessage deserialized = objectMapper.readValue(json, RuleChangeMessage.class); + + assertThat(deserialized.getRuleSetId()).isNull(); + assertThat(deserialized.isFullReload()).isTrue(); + assertThat(deserialized.getSource()).isEqualTo("admin"); + } + + @Test + void shouldHaveToStringForRuleSet() { + RuleChangeMessage message = RuleChangeMessage.forRuleSet("test-rule", "source"); + + String str = message.toString(); + + assertThat(str).contains("test-rule"); + assertThat(str).contains("source"); + } + + @Test + void shouldHaveToStringForFullReload() { + RuleChangeMessage message = RuleChangeMessage.fullReload("source"); + + String str = message.toString(); + + assertThat(str).contains("fullReload=true"); + assertThat(str).contains("source"); + } +} diff --git a/fluxgate-core/src/main/java/org/fluxgate/core/exception/FluxgateConfigurationException.java b/fluxgate-core/src/main/java/org/fluxgate/core/exception/FluxgateConfigurationException.java new file mode 100644 index 0000000..53dea48 --- /dev/null +++ b/fluxgate-core/src/main/java/org/fluxgate/core/exception/FluxgateConfigurationException.java @@ -0,0 +1,35 @@ +package org.fluxgate.core.exception; + +/** + * Exception thrown when FluxGate configuration is invalid or incomplete. + * + *
This exception indicates a configuration problem that prevents FluxGate from starting or + * operating correctly. Configuration exceptions are not retryable as they require manual + * intervention to fix. + */ +public class FluxgateConfigurationException extends FluxgateException { + + /** + * Constructs a new FluxgateConfigurationException with the specified message. + * + * @param message the detail message + */ + public FluxgateConfigurationException(String message) { + super(message); + } + + /** + * Constructs a new FluxgateConfigurationException with the specified message and cause. + * + * @param message the detail message + * @param cause the cause of the exception + */ + public FluxgateConfigurationException(String message, Throwable cause) { + super(message, cause); + } + + @Override + public boolean isRetryable() { + return false; + } +} diff --git a/fluxgate-core/src/main/java/org/fluxgate/core/exception/FluxgateConnectionException.java b/fluxgate-core/src/main/java/org/fluxgate/core/exception/FluxgateConnectionException.java new file mode 100644 index 0000000..9b49f5c --- /dev/null +++ b/fluxgate-core/src/main/java/org/fluxgate/core/exception/FluxgateConnectionException.java @@ -0,0 +1,34 @@ +package org.fluxgate.core.exception; + +/** + * Exception thrown when a connection to a backend service fails. + * + *
This is the base class for all connection-related exceptions in FluxGate. Connection + * exceptions are typically retryable as they may be caused by temporary network issues. + */ +public class FluxgateConnectionException extends FluxgateException { + + /** + * Constructs a new FluxgateConnectionException with the specified message. + * + * @param message the detail message + */ + public FluxgateConnectionException(String message) { + super(message); + } + + /** + * Constructs a new FluxgateConnectionException with the specified message and cause. + * + * @param message the detail message + * @param cause the cause of the exception + */ + public FluxgateConnectionException(String message, Throwable cause) { + super(message, cause); + } + + @Override + public boolean isRetryable() { + return true; + } +} diff --git a/fluxgate-core/src/main/java/org/fluxgate/core/exception/FluxgateException.java b/fluxgate-core/src/main/java/org/fluxgate/core/exception/FluxgateException.java new file mode 100644 index 0000000..44549e0 --- /dev/null +++ b/fluxgate-core/src/main/java/org/fluxgate/core/exception/FluxgateException.java @@ -0,0 +1,41 @@ +package org.fluxgate.core.exception; + +/** + * Base exception for all FluxGate exceptions. + * + *
This is the root of the FluxGate exception hierarchy. All FluxGate-specific exceptions should + * extend this class to allow for unified exception handling. + */ +public abstract class FluxgateException extends RuntimeException { + + /** + * Constructs a new FluxgateException with the specified message. + * + * @param message the detail message + */ + protected FluxgateException(String message) { + super(message); + } + + /** + * Constructs a new FluxgateException with the specified message and cause. + * + * @param message the detail message + * @param cause the cause of the exception + */ + protected FluxgateException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Returns whether this exception is retryable. + * + *
Subclasses can override this method to indicate whether the operation that caused this + * exception can be retried. + * + * @return true if the operation can be retried, false otherwise + */ + public boolean isRetryable() { + return false; + } +} diff --git a/fluxgate-core/src/main/java/org/fluxgate/core/exception/FluxgateOperationException.java b/fluxgate-core/src/main/java/org/fluxgate/core/exception/FluxgateOperationException.java new file mode 100644 index 0000000..8fbcc4d --- /dev/null +++ b/fluxgate-core/src/main/java/org/fluxgate/core/exception/FluxgateOperationException.java @@ -0,0 +1,51 @@ +package org.fluxgate.core.exception; + +/** + * Exception thrown when a FluxGate operation fails at runtime. + * + *
This is the base class for all runtime operation exceptions in FluxGate. Operation exceptions + * may or may not be retryable depending on the specific failure cause. + */ +public class FluxgateOperationException extends FluxgateException { + + private final boolean retryable; + + /** + * Constructs a new FluxgateOperationException with the specified message. + * + * @param message the detail message + */ + public FluxgateOperationException(String message) { + super(message); + this.retryable = false; + } + + /** + * Constructs a new FluxgateOperationException with the specified message and cause. + * + * @param message the detail message + * @param cause the cause of the exception + */ + public FluxgateOperationException(String message, Throwable cause) { + super(message, cause); + this.retryable = false; + } + + /** + * Constructs a new FluxgateOperationException with the specified message, cause, and retryable + * flag. + * + * @param message the detail message + * @param cause the cause of the exception + * @param retryable whether this operation can be retried + */ + public FluxgateOperationException(String message, Throwable cause, boolean retryable) { + super(message, cause); + this.retryable = retryable; + } + + @Override + public boolean isRetryable() { + return retryable; + } +} diff --git a/fluxgate-core/src/main/java/org/fluxgate/core/exception/FluxgateTimeoutException.java b/fluxgate-core/src/main/java/org/fluxgate/core/exception/FluxgateTimeoutException.java new file mode 100644 index 0000000..7e31573 --- /dev/null +++ b/fluxgate-core/src/main/java/org/fluxgate/core/exception/FluxgateTimeoutException.java @@ -0,0 +1,94 @@ +package org.fluxgate.core.exception; + +import java.time.Duration; + +/** + * Exception thrown when an operation times out. + * + *
This exception is thrown when: + * + *
Timeout exceptions are typically retryable as they may be caused by temporary load spikes or + * network congestion. + */ +public class FluxgateTimeoutException extends FluxgateException { + + private final Duration timeout; + private final String operation; + + /** + * Constructs a new FluxgateTimeoutException with the specified message. + * + * @param message the detail message + */ + public FluxgateTimeoutException(String message) { + super(message); + this.timeout = null; + this.operation = null; + } + + /** + * Constructs a new FluxgateTimeoutException with the specified message and cause. + * + * @param message the detail message + * @param cause the cause of the exception + */ + public FluxgateTimeoutException(String message, Throwable cause) { + super(message, cause); + this.timeout = null; + this.operation = null; + } + + /** + * Constructs a new FluxgateTimeoutException with detailed timeout information. + * + * @param operation the operation that timed out + * @param timeout the timeout duration that was exceeded + */ + public FluxgateTimeoutException(String operation, Duration timeout) { + super("Operation '" + operation + "' timed out after " + timeout.toMillis() + "ms"); + this.operation = operation; + this.timeout = timeout; + } + + /** + * Constructs a new FluxgateTimeoutException with detailed timeout information and cause. + * + * @param operation the operation that timed out + * @param timeout the timeout duration that was exceeded + * @param cause the cause of the exception + */ + public FluxgateTimeoutException(String operation, Duration timeout, Throwable cause) { + super("Operation '" + operation + "' timed out after " + timeout.toMillis() + "ms", cause); + this.operation = operation; + this.timeout = timeout; + } + + /** + * Returns the timeout duration that was exceeded, if available. + * + * @return the timeout duration, or null if not available + */ + public Duration getTimeout() { + return timeout; + } + + /** + * Returns the name of the operation that timed out, if available. + * + * @return the operation name, or null if not available + */ + public String getOperation() { + return operation; + } + + @Override + public boolean isRetryable() { + return true; + } +} diff --git a/fluxgate-core/src/main/java/org/fluxgate/core/exception/InvalidRuleConfigException.java b/fluxgate-core/src/main/java/org/fluxgate/core/exception/InvalidRuleConfigException.java new file mode 100644 index 0000000..b489080 --- /dev/null +++ b/fluxgate-core/src/main/java/org/fluxgate/core/exception/InvalidRuleConfigException.java @@ -0,0 +1,59 @@ +package org.fluxgate.core.exception; + +/** + * Exception thrown when a rate limit rule configuration is invalid. + * + *
This exception is thrown when: + * + *
This exception is thrown when: + * + *
This exception is thrown when: + * + *
This exception is thrown when: + * + *
This exception is thrown when: + * + *
This exception is thrown when: + * + *
When a rule is modified in the Admin UI, the cached rule definitions are invalidated, but the + * token bucket state in Redis (or other storage) remains. This handler is responsible for resetting + * the bucket state so that the new rules take effect immediately. + * + *
Implementations should delete or reset the token buckets associated with the changed rule set. + * + *
Example implementation for Redis: + * + *
{@code
+ * public class RedisBucketResetHandler implements BucketResetHandler {
+ * private final RedisTokenBucketStore store;
+ *
+ * @Override
+ * public void resetBuckets(String ruleSetId) {
+ * store.deleteBucketsByRuleSetId(ruleSetId);
+ * }
+ *
+ * @Override
+ * public void resetAllBuckets() {
+ * store.deleteAllBuckets();
+ * }
+ * }
+ * }
+ */
+public interface BucketResetHandler {
+
+ /**
+ * Resets all buckets associated with the given rule set.
+ *
+ * Called when a specific rule set is modified or deleted. + * + * @param ruleSetId the rule set ID whose buckets should be reset + */ + void resetBuckets(String ruleSetId); + + /** + * Resets all buckets (full reset). + * + *
Called when a full reload is triggered. + */ + void resetAllBuckets(); +} diff --git a/fluxgate-core/src/main/java/org/fluxgate/core/reload/CachingRuleSetProvider.java b/fluxgate-core/src/main/java/org/fluxgate/core/reload/CachingRuleSetProvider.java new file mode 100644 index 0000000..8382bd8 --- /dev/null +++ b/fluxgate-core/src/main/java/org/fluxgate/core/reload/CachingRuleSetProvider.java @@ -0,0 +1,102 @@ +package org.fluxgate.core.reload; + +import java.util.Objects; +import java.util.Optional; +import org.fluxgate.core.ratelimiter.RateLimitRuleSet; +import org.fluxgate.core.spi.RateLimitRuleSetProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A decorator that adds caching capabilities to any {@link RateLimitRuleSetProvider}. + * + *
This provider wraps a delegate provider and caches rule sets locally. It also implements + * {@link RuleReloadListener} to invalidate cache entries when reload events are received. + * + *
Example usage: + * + *
{@code
+ * RateLimitRuleSetProvider mongoProvider = new MongoRuleSetProvider(...);
+ * RuleCache cache = new CaffeineRuleCache(...);
+ * RuleReloadStrategy reloadStrategy = new PollingReloadStrategy(...);
+ *
+ * CachingRuleSetProvider cachingProvider =
+ * new CachingRuleSetProvider(mongoProvider, cache);
+ *
+ * // Register as reload listener
+ * reloadStrategy.addListener(cachingProvider);
+ * }
+ */
+public class CachingRuleSetProvider implements RateLimitRuleSetProvider, RuleReloadListener {
+
+ private static final Logger log = LoggerFactory.getLogger(CachingRuleSetProvider.class);
+
+ private final RateLimitRuleSetProvider delegate;
+ private final RuleCache cache;
+
+ /**
+ * Creates a new caching provider.
+ *
+ * @param delegate the underlying provider to delegate cache misses to
+ * @param cache the cache to store rule sets
+ */
+ public CachingRuleSetProvider(RateLimitRuleSetProvider delegate, RuleCache cache) {
+ this.delegate = Objects.requireNonNull(delegate, "delegate must not be null");
+ this.cache = Objects.requireNonNull(cache, "cache must not be null");
+ }
+
+ @Override
+ public OptionalImplementations should be thread-safe and support concurrent access.
+ */
+public interface RuleCache {
+
+ /**
+ * Retrieves a cached rule set by ID.
+ *
+ * @param ruleSetId the ID of the rule set to retrieve
+ * @return an Optional containing the rule set if cached, or empty if not found
+ */
+ Optional This should be used sparingly as it may impact performance during cache repopulation.
+ */
+ void invalidateAll();
+
+ /**
+ * Returns the IDs of all currently cached rule sets.
+ *
+ * This is useful for polling strategies that need to check for changes in known rule sets.
+ *
+ * @return an unmodifiable set of cached rule set IDs
+ */
+ Set This event is published when rules need to be reloaded, either for a specific rule set or for
+ * all cached rules.
+ */
+public final class RuleReloadEvent {
+
+ private final String ruleSetId;
+ private final ReloadSource source;
+ private final Instant timestamp;
+ private final Map Implementations can react to rule changes, such as invalidating caches or updating
+ * configurations.
+ */
+@FunctionalInterface
+public interface RuleReloadListener {
+
+ /**
+ * Called when a rule reload event is received.
+ *
+ * @param event the reload event containing details about what should be reloaded
+ */
+ void onReload(RuleReloadEvent event);
+}
diff --git a/fluxgate-core/src/main/java/org/fluxgate/core/reload/RuleReloadStrategy.java b/fluxgate-core/src/main/java/org/fluxgate/core/reload/RuleReloadStrategy.java
new file mode 100644
index 0000000..3589e85
--- /dev/null
+++ b/fluxgate-core/src/main/java/org/fluxgate/core/reload/RuleReloadStrategy.java
@@ -0,0 +1,79 @@
+package org.fluxgate.core.reload;
+
+/**
+ * Strategy interface for rule hot reload.
+ *
+ * Implementations define how rules are reloaded (e.g., via polling, Redis Pub/Sub, etc.) and
+ * manage the lifecycle of reload mechanisms.
+ *
+ * Typical usage:
+ *
+ * For polling strategies, this starts the scheduled task. For Pub/Sub strategies, this
+ * establishes the subscription.
+ *
+ * This method is idempotent - calling it multiple times has no additional effect.
+ */
+ void start();
+
+ /**
+ * Stops the reload mechanism and releases resources.
+ *
+ * This method is idempotent - calling it multiple times has no additional effect.
+ */
+ void stop();
+
+ /**
+ * Returns whether the reload mechanism is currently running.
+ *
+ * @return true if started and not stopped
+ */
+ boolean isRunning();
+
+ /**
+ * Triggers a reload for a specific rule set.
+ *
+ * This method can be called to programmatically trigger a reload, for example after updating a
+ * rule via an admin API.
+ *
+ * @param ruleSetId the ID of the rule set to reload
+ */
+ void triggerReload(String ruleSetId);
+
+ /**
+ * Triggers a full reload of all cached rules.
+ *
+ * This is useful for scenarios like configuration refresh or manual cache invalidation.
+ */
+ void triggerReloadAll();
+
+ /**
+ * Adds a listener that will be notified when reload events occur.
+ *
+ * @param listener the listener to add
+ */
+ void addListener(RuleReloadListener listener);
+
+ /**
+ * Removes a previously added listener.
+ *
+ * @param listener the listener to remove
+ */
+ void removeListener(RuleReloadListener listener);
+}
diff --git a/fluxgate-core/src/main/java/org/fluxgate/core/resilience/CircuitBreaker.java b/fluxgate-core/src/main/java/org/fluxgate/core/resilience/CircuitBreaker.java
new file mode 100644
index 0000000..fdaa1d7
--- /dev/null
+++ b/fluxgate-core/src/main/java/org/fluxgate/core/resilience/CircuitBreaker.java
@@ -0,0 +1,83 @@
+package org.fluxgate.core.resilience;
+
+import java.util.function.Supplier;
+
+/**
+ * Circuit breaker that prevents cascading failures.
+ *
+ * The circuit breaker tracks failures and transitions between states:
+ *
+ * This method should be used with caution, typically only for testing or manual recovery.
+ */
+ void reset();
+
+ /** Circuit breaker states. */
+ enum State {
+ /** Circuit is closed, requests flow through normally. */
+ CLOSED,
+
+ /** Circuit is open, requests are rejected or bypassed. */
+ OPEN,
+
+ /** Circuit is half-open, limited requests are allowed to test recovery. */
+ HALF_OPEN
+ }
+}
diff --git a/fluxgate-core/src/main/java/org/fluxgate/core/resilience/CircuitBreakerConfig.java b/fluxgate-core/src/main/java/org/fluxgate/core/resilience/CircuitBreakerConfig.java
new file mode 100644
index 0000000..33ca33a
--- /dev/null
+++ b/fluxgate-core/src/main/java/org/fluxgate/core/resilience/CircuitBreakerConfig.java
@@ -0,0 +1,189 @@
+package org.fluxgate.core.resilience;
+
+import java.time.Duration;
+
+/**
+ * Configuration for circuit breaker behavior.
+ *
+ * The circuit breaker pattern prevents an application from repeatedly trying to execute an
+ * operation that's likely to fail, allowing it to continue without waiting for the fault to be
+ * fixed.
+ */
+public class CircuitBreakerConfig {
+
+ private final boolean enabled;
+ private final int failureThreshold;
+ private final Duration waitDurationInOpenState;
+ private final int permittedCallsInHalfOpenState;
+ private final FallbackStrategy fallbackStrategy;
+
+ private CircuitBreakerConfig(Builder builder) {
+ this.enabled = builder.enabled;
+ this.failureThreshold = builder.failureThreshold;
+ this.waitDurationInOpenState = builder.waitDurationInOpenState;
+ this.permittedCallsInHalfOpenState = builder.permittedCallsInHalfOpenState;
+ this.fallbackStrategy = builder.fallbackStrategy;
+ }
+
+ /**
+ * Returns a new builder with default settings.
+ *
+ * @return a new Builder instance
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Returns a disabled circuit breaker configuration.
+ *
+ * @return a CircuitBreakerConfig with circuit breaker disabled
+ */
+ public static CircuitBreakerConfig disabled() {
+ return builder().enabled(false).build();
+ }
+
+ /**
+ * Returns a default circuit breaker configuration.
+ *
+ * @return a CircuitBreakerConfig with default settings
+ */
+ public static CircuitBreakerConfig defaults() {
+ return builder().build();
+ }
+
+ /**
+ * Returns whether the circuit breaker is enabled.
+ *
+ * @return true if circuit breaker is enabled
+ */
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ /**
+ * Returns the failure threshold that triggers the circuit to open.
+ *
+ * @return the number of failures before opening the circuit
+ */
+ public int getFailureThreshold() {
+ return failureThreshold;
+ }
+
+ /**
+ * Returns the duration to wait in open state before transitioning to half-open.
+ *
+ * @return the wait duration in open state
+ */
+ public Duration getWaitDurationInOpenState() {
+ return waitDurationInOpenState;
+ }
+
+ /**
+ * Returns the number of calls permitted in half-open state.
+ *
+ * @return the number of permitted calls in half-open state
+ */
+ public int getPermittedCallsInHalfOpenState() {
+ return permittedCallsInHalfOpenState;
+ }
+
+ /**
+ * Returns the fallback strategy when the circuit is open.
+ *
+ * @return the fallback strategy
+ */
+ public FallbackStrategy getFallbackStrategy() {
+ return fallbackStrategy;
+ }
+
+ /** Fallback strategy when circuit is open. */
+ public enum FallbackStrategy {
+ /** Allow requests to pass through (fail-open). */
+ FAIL_OPEN,
+
+ /** Reject requests immediately (fail-closed). */
+ FAIL_CLOSED
+ }
+
+ /** Builder for creating CircuitBreakerConfig instances. */
+ public static class Builder {
+ private boolean enabled = false; // Disabled by default
+ private int failureThreshold = 5;
+ private Duration waitDurationInOpenState = Duration.ofSeconds(30);
+ private int permittedCallsInHalfOpenState = 3;
+ private FallbackStrategy fallbackStrategy = FallbackStrategy.FAIL_OPEN;
+
+ private Builder() {}
+
+ /**
+ * Sets whether the circuit breaker is enabled.
+ *
+ * @param enabled true to enable the circuit breaker
+ * @return this builder
+ */
+ public Builder enabled(boolean enabled) {
+ this.enabled = enabled;
+ return this;
+ }
+
+ /**
+ * Sets the failure threshold.
+ *
+ * @param failureThreshold the number of failures to trigger open state
+ * @return this builder
+ */
+ public Builder failureThreshold(int failureThreshold) {
+ if (failureThreshold < 1) {
+ throw new IllegalArgumentException("failureThreshold must be >= 1");
+ }
+ this.failureThreshold = failureThreshold;
+ return this;
+ }
+
+ /**
+ * Sets the wait duration in open state.
+ *
+ * @param waitDuration the duration to wait before transitioning to half-open
+ * @return this builder
+ */
+ public Builder waitDurationInOpenState(Duration waitDuration) {
+ this.waitDurationInOpenState = waitDuration;
+ return this;
+ }
+
+ /**
+ * Sets the number of permitted calls in half-open state.
+ *
+ * @param permittedCalls the number of calls to allow in half-open state
+ * @return this builder
+ */
+ public Builder permittedCallsInHalfOpenState(int permittedCalls) {
+ if (permittedCalls < 1) {
+ throw new IllegalArgumentException("permittedCallsInHalfOpenState must be >= 1");
+ }
+ this.permittedCallsInHalfOpenState = permittedCalls;
+ return this;
+ }
+
+ /**
+ * Sets the fallback strategy.
+ *
+ * @param strategy the fallback strategy
+ * @return this builder
+ */
+ public Builder fallbackStrategy(FallbackStrategy strategy) {
+ this.fallbackStrategy = strategy;
+ return this;
+ }
+
+ /**
+ * Builds the CircuitBreakerConfig.
+ *
+ * @return a new CircuitBreakerConfig instance
+ */
+ public CircuitBreakerConfig build() {
+ return new CircuitBreakerConfig(this);
+ }
+ }
+}
diff --git a/fluxgate-core/src/main/java/org/fluxgate/core/resilience/CircuitBreakerOpenException.java b/fluxgate-core/src/main/java/org/fluxgate/core/resilience/CircuitBreakerOpenException.java
new file mode 100644
index 0000000..1814dfe
--- /dev/null
+++ b/fluxgate-core/src/main/java/org/fluxgate/core/resilience/CircuitBreakerOpenException.java
@@ -0,0 +1,49 @@
+package org.fluxgate.core.resilience;
+
+import org.fluxgate.core.exception.FluxgateException;
+
+/**
+ * Exception thrown when a circuit breaker is open and the fallback strategy is FAIL_CLOSED.
+ *
+ * This exception indicates that the circuit breaker has tripped due to consecutive failures and
+ * is rejecting requests to prevent further damage.
+ */
+public class CircuitBreakerOpenException extends FluxgateException {
+
+ private final String circuitBreakerName;
+
+ /**
+ * Constructs a new CircuitBreakerOpenException.
+ *
+ * @param circuitBreakerName the name of the circuit breaker
+ */
+ public CircuitBreakerOpenException(String circuitBreakerName) {
+ super("Circuit breaker '" + circuitBreakerName + "' is open");
+ this.circuitBreakerName = circuitBreakerName;
+ }
+
+ /**
+ * Constructs a new CircuitBreakerOpenException with a custom message.
+ *
+ * @param circuitBreakerName the name of the circuit breaker
+ * @param message the detail message
+ */
+ public CircuitBreakerOpenException(String circuitBreakerName, String message) {
+ super(message);
+ this.circuitBreakerName = circuitBreakerName;
+ }
+
+ /**
+ * Returns the name of the circuit breaker that is open.
+ *
+ * @return the circuit breaker name
+ */
+ public String getCircuitBreakerName() {
+ return circuitBreakerName;
+ }
+
+ @Override
+ public boolean isRetryable() {
+ return false; // Don't retry when circuit is open
+ }
+}
diff --git a/fluxgate-core/src/main/java/org/fluxgate/core/resilience/DefaultCircuitBreaker.java b/fluxgate-core/src/main/java/org/fluxgate/core/resilience/DefaultCircuitBreaker.java
new file mode 100644
index 0000000..0eb4329
--- /dev/null
+++ b/fluxgate-core/src/main/java/org/fluxgate/core/resilience/DefaultCircuitBreaker.java
@@ -0,0 +1,210 @@
+package org.fluxgate.core.resilience;
+
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default implementation of {@link CircuitBreaker}.
+ *
+ * This implementation tracks consecutive failures and transitions between states based on the
+ * configured thresholds.
+ */
+public class DefaultCircuitBreaker implements CircuitBreaker {
+
+ private static final Logger log = LoggerFactory.getLogger(DefaultCircuitBreaker.class);
+
+ private final String name;
+ private final CircuitBreakerConfig config;
+ private final AtomicReference This implementation provides exponential backoff retry with configurable parameters. It logs
+ * retry attempts and respects the configured retry policy.
+ */
+public class DefaultRetryExecutor implements RetryExecutor {
+
+ private static final Logger log = LoggerFactory.getLogger(DefaultRetryExecutor.class);
+
+ private final RetryConfig config;
+
+ /**
+ * Creates a new DefaultRetryExecutor with the given configuration.
+ *
+ * @param config the retry configuration
+ */
+ public DefaultRetryExecutor(RetryConfig config) {
+ this.config = config;
+ }
+
+ /**
+ * Creates a new DefaultRetryExecutor with default configuration.
+ *
+ * @return a new DefaultRetryExecutor with default settings
+ */
+ public static DefaultRetryExecutor withDefaults() {
+ return new DefaultRetryExecutor(RetryConfig.defaults());
+ }
+
+ @Override
+ public This implementation executes actions directly without any circuit breaker logic. It is useful
+ * when circuit breaker functionality is disabled.
+ */
+public class NoOpCircuitBreaker implements CircuitBreaker {
+
+ private static final NoOpCircuitBreaker INSTANCE = new NoOpCircuitBreaker();
+
+ private final CircuitBreakerConfig config = CircuitBreakerConfig.disabled();
+
+ private NoOpCircuitBreaker() {}
+
+ /**
+ * Returns the singleton instance.
+ *
+ * @return the NoOpCircuitBreaker instance
+ */
+ public static NoOpCircuitBreaker getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public This implementation is useful when retry functionality is disabled or not needed.
+ */
+public class NoOpRetryExecutor implements RetryExecutor {
+
+ private static final NoOpRetryExecutor INSTANCE = new NoOpRetryExecutor();
+
+ private final RetryConfig config = RetryConfig.disabled();
+
+ private NoOpRetryExecutor() {}
+
+ /**
+ * Returns the singleton instance.
+ *
+ * @return the NoOpRetryExecutor instance
+ */
+ public static NoOpRetryExecutor getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public The execution flow is:
+ *
+ * The circuit breaker is checked first to prevent unnecessary retries when the circuit is open.
+ * If the circuit is closed or half-open, the retry executor handles the actual execution with
+ * retries.
+ */
+public class ResilientExecutor {
+
+ private static final Logger log = LoggerFactory.getLogger(ResilientExecutor.class);
+
+ private final RetryExecutor retryExecutor;
+ private final CircuitBreaker circuitBreaker;
+
+ /**
+ * Creates a new ResilientExecutor with the given retry and circuit breaker configurations.
+ *
+ * @param retryConfig the retry configuration
+ * @param circuitBreakerConfig the circuit breaker configuration
+ * @param name the name for the circuit breaker
+ */
+ public ResilientExecutor(
+ RetryConfig retryConfig, CircuitBreakerConfig circuitBreakerConfig, String name) {
+ this.retryExecutor =
+ retryConfig.isEnabled()
+ ? new DefaultRetryExecutor(retryConfig)
+ : NoOpRetryExecutor.getInstance();
+
+ this.circuitBreaker =
+ circuitBreakerConfig.isEnabled()
+ ? new DefaultCircuitBreaker(name, circuitBreakerConfig)
+ : NoOpCircuitBreaker.getInstance();
+ }
+
+ /**
+ * Creates a new ResilientExecutor with the given executors.
+ *
+ * @param retryExecutor the retry executor
+ * @param circuitBreaker the circuit breaker
+ */
+ public ResilientExecutor(RetryExecutor retryExecutor, CircuitBreaker circuitBreaker) {
+ this.retryExecutor = retryExecutor;
+ this.circuitBreaker = circuitBreaker;
+ }
+
+ /**
+ * Creates a ResilientExecutor with retry only.
+ *
+ * @param retryConfig the retry configuration
+ * @return a new ResilientExecutor
+ */
+ public static ResilientExecutor withRetryOnly(RetryConfig retryConfig) {
+ return new ResilientExecutor(retryConfig, CircuitBreakerConfig.disabled(), "default");
+ }
+
+ /**
+ * Creates a ResilientExecutor with circuit breaker only.
+ *
+ * @param circuitBreakerConfig the circuit breaker configuration
+ * @param name the name for the circuit breaker
+ * @return a new ResilientExecutor
+ */
+ public static ResilientExecutor withCircuitBreakerOnly(
+ CircuitBreakerConfig circuitBreakerConfig, String name) {
+ return new ResilientExecutor(RetryConfig.disabled(), circuitBreakerConfig, name);
+ }
+
+ /**
+ * Creates a disabled ResilientExecutor that executes operations directly.
+ *
+ * @return a disabled ResilientExecutor
+ */
+ public static ResilientExecutor disabled() {
+ return new ResilientExecutor(NoOpRetryExecutor.getInstance(), NoOpCircuitBreaker.getInstance());
+ }
+
+ /**
+ * Executes the given action with resilience support.
+ *
+ * @param This class provides immutable configuration for retry operations, including the maximum number
+ * of attempts, backoff timing, and which exceptions should trigger retries.
+ */
+public class RetryConfig {
+
+ private final boolean enabled;
+ private final int maxAttempts;
+ private final Duration initialBackoff;
+ private final double multiplier;
+ private final Duration maxBackoff;
+ private final Set This interface defines the contract for executing operations with automatic retry on failure.
+ * Implementations should respect the configured retry policy including max attempts, backoff
+ * strategy, and retryable exceptions.
+ */
+public interface RetryExecutor {
+
+ /**
+ * Executes the given action with retry support.
+ *
+ * If the action fails with a retryable exception, it will be retried according to the
+ * configured retry policy. If all retries are exhausted, the last exception will be thrown.
+ *
+ * @param The operation name is used for logging and metrics purposes.
+ *
+ * @param This exception wraps underlying connection errors from Lettuce and provides a consistent
* exception type for both standalone and cluster modes.
+ *
+ * @deprecated Use {@link org.fluxgate.core.exception.RedisConnectionException} instead. This class
+ * will be removed in a future release.
*/
+@Deprecated(since = "0.2.0", forRemoval = true)
public class RedisConnectionException extends RuntimeException {
/**
diff --git a/fluxgate-redis-ratelimiter/src/main/java/org/fluxgate/redis/store/RedisTokenBucketStore.java b/fluxgate-redis-ratelimiter/src/main/java/org/fluxgate/redis/store/RedisTokenBucketStore.java
index 4aeec98..e2454b2 100644
--- a/fluxgate-redis-ratelimiter/src/main/java/org/fluxgate/redis/store/RedisTokenBucketStore.java
+++ b/fluxgate-redis-ratelimiter/src/main/java/org/fluxgate/redis/store/RedisTokenBucketStore.java
@@ -140,6 +140,60 @@ public RedisConnectionProvider.RedisMode getMode() {
return connectionProvider.getMode();
}
+ /**
+ * Deletes all token buckets matching the given ruleSetId pattern.
+ *
+ * This is used when rules are changed to reset rate limit state. The pattern matches keys
+ * like: {@code fluxgate:{ruleSetId}:*}
+ *
+ * Warning: Uses KEYS command which can be slow on large databases. Consider using SCAN in
+ * high-traffic production environments.
+ *
+ * @param ruleSetId the rule set ID to match
+ * @return the number of buckets deleted
+ */
+ public long deleteBucketsByRuleSetId(String ruleSetId) {
+ Objects.requireNonNull(ruleSetId, "ruleSetId must not be null");
+
+ String pattern = "fluxgate:" + ruleSetId + ":*";
+ log.debug("Deleting token buckets matching pattern: {}", pattern);
+
+ java.util.List This is used when a full reload is triggered to reset all rate limit state. The pattern
+ * matches all FluxGate keys: {@code fluxgate:*}
+ *
+ * Warning: Uses KEYS command which can be slow on large databases.
+ *
+ * @return the number of buckets deleted
+ */
+ public long deleteAllBuckets() {
+ String pattern = "fluxgate:*";
+ log.debug("Deleting all token buckets matching pattern: {}", pattern);
+
+ java.util.List This handler: 1. Looks up rules from MongoDB via RateLimitRuleSetProvider 2. Applies rate
* limiting via Redis using RedisRateLimiter 3. Returns rate limit response to the filter
+ *
+ * The behavior when no rule is found can be configured via:
+ *
+ * This configuration provides:
+ *
+ * Note: For publishing rule changes from Admin/Control Plane applications, use the {@code
+ * fluxgate-control-support} module instead.
+ *
+ * Strategy selection:
+ *
+ * Configuration example:
+ *
+ * Only created when:
+ *
+ * Strategy selection:
+ *
+ * This handler automatically resets token buckets when rules change, ensuring that the new
+ * rate limits take effect immediately.
+ *
+ * Only created when Redis token bucket store is available.
+ */
+ @Bean
+ @ConditionalOnMissingBean(BucketResetHandler.class)
+ @ConditionalOnBean(RedisTokenBucketStore.class)
+ public BucketResetHandler bucketResetHandler(RedisTokenBucketStore tokenBucketStore) {
+ log.info("Creating RedisBucketResetHandler for automatic bucket reset on rule changes");
+ return new RedisBucketResetHandler(tokenBucketStore);
+ }
+
+ /**
+ * Creates the caching rule set provider that wraps the delegate provider.
+ *
+ * This is marked as @Primary so it takes precedence over the delegate provider when autowiring
+ * RateLimitRuleSetProvider.
+ */
+ @Bean(name = "cachingRuleSetProvider")
+ @Primary
+ @ConditionalOnBean({RuleCache.class, RuleReloadStrategy.class})
+ public CachingRuleSetProvider cachingRuleSetProvider(
+ @Qualifier("delegateRuleSetProvider") RateLimitRuleSetProvider ruleSetProvider,
+ RuleCache ruleCache,
+ RuleReloadStrategy reloadStrategy,
+ ObjectProvider This configuration provides retry and circuit breaker beans based on the configured
+ * properties.
+ */
+@AutoConfiguration
+@EnableConfigurationProperties(FluxgateResilienceProperties.class)
+public class FluxgateResilienceAutoConfiguration {
+
+ private static final Logger log =
+ LoggerFactory.getLogger(FluxgateResilienceAutoConfiguration.class);
+
+ /**
+ * Creates a RetryConfig from properties.
+ *
+ * @param properties the resilience properties
+ * @return the retry configuration
+ */
+ @Bean
+ @ConditionalOnMissingBean
+ public RetryConfig fluxgateRetryConfig(FluxgateResilienceProperties properties) {
+ FluxgateResilienceProperties.Retry retryProps = properties.getRetry();
+
+ RetryConfig config =
+ RetryConfig.builder()
+ .enabled(retryProps.isEnabled())
+ .maxAttempts(retryProps.getMaxAttempts())
+ .initialBackoff(retryProps.getInitialBackoff())
+ .multiplier(retryProps.getMultiplier())
+ .maxBackoff(retryProps.getMaxBackoff())
+ .build();
+
+ log.info(
+ "FluxGate retry configured: enabled={}, maxAttempts={}, initialBackoff={}ms",
+ retryProps.isEnabled(),
+ retryProps.getMaxAttempts(),
+ retryProps.getInitialBackoff().toMillis());
+
+ return config;
+ }
+
+ /**
+ * Creates a CircuitBreakerConfig from properties.
+ *
+ * @param properties the resilience properties
+ * @return the circuit breaker configuration
+ */
+ @Bean
+ @ConditionalOnMissingBean
+ public CircuitBreakerConfig fluxgateCircuitBreakerConfig(
+ FluxgateResilienceProperties properties) {
+ FluxgateResilienceProperties.CircuitBreaker cbProps = properties.getCircuitBreaker();
+
+ CircuitBreakerConfig config =
+ CircuitBreakerConfig.builder()
+ .enabled(cbProps.isEnabled())
+ .failureThreshold(cbProps.getFailureThreshold())
+ .waitDurationInOpenState(cbProps.getWaitDurationInOpenState())
+ .permittedCallsInHalfOpenState(cbProps.getPermittedCallsInHalfOpenState())
+ .fallbackStrategy(cbProps.getFallback())
+ .build();
+
+ log.info(
+ "FluxGate circuit breaker configured: enabled={}, failureThreshold={}, "
+ + "waitDuration={}s, fallback={}",
+ cbProps.isEnabled(),
+ cbProps.getFailureThreshold(),
+ cbProps.getWaitDurationInOpenState().toSeconds(),
+ cbProps.getFallback());
+
+ return config;
+ }
+
+ /**
+ * Creates a RetryExecutor bean.
+ *
+ * @param config the retry configuration
+ * @return the retry executor
+ */
+ @Bean
+ @ConditionalOnMissingBean
+ @ConditionalOnProperty(
+ prefix = "fluxgate.resilience.retry",
+ name = "enabled",
+ havingValue = "true",
+ matchIfMissing = true)
+ public RetryExecutor fluxgateRetryExecutor(RetryConfig config) {
+ return new DefaultRetryExecutor(config);
+ }
+
+ /**
+ * Creates a no-op RetryExecutor when retry is disabled.
+ *
+ * @return the no-op retry executor
+ */
+ @Bean
+ @ConditionalOnMissingBean
+ @ConditionalOnProperty(
+ prefix = "fluxgate.resilience.retry",
+ name = "enabled",
+ havingValue = "false")
+ public RetryExecutor fluxgateNoOpRetryExecutor() {
+ log.info("FluxGate retry is disabled");
+ return NoOpRetryExecutor.getInstance();
+ }
+
+ /**
+ * Creates a CircuitBreaker bean when enabled.
+ *
+ * @param config the circuit breaker configuration
+ * @return the circuit breaker
+ */
+ @Bean
+ @ConditionalOnMissingBean
+ @ConditionalOnProperty(
+ prefix = "fluxgate.resilience.circuit-breaker",
+ name = "enabled",
+ havingValue = "true")
+ public CircuitBreaker fluxgateCircuitBreaker(CircuitBreakerConfig config) {
+ return new DefaultCircuitBreaker("fluxgate", config);
+ }
+
+ /**
+ * Creates a no-op CircuitBreaker when disabled.
+ *
+ * @return the no-op circuit breaker
+ */
+ @Bean
+ @ConditionalOnMissingBean
+ @ConditionalOnProperty(
+ prefix = "fluxgate.resilience.circuit-breaker",
+ name = "enabled",
+ havingValue = "false",
+ matchIfMissing = true)
+ public CircuitBreaker fluxgateNoOpCircuitBreaker() {
+ log.debug("FluxGate circuit breaker is disabled");
+ return NoOpCircuitBreaker.getInstance();
+ }
+
+ /**
+ * Creates a ResilientExecutor that combines retry and circuit breaker.
+ *
+ * @param retryExecutor the retry executor
+ * @param circuitBreaker the circuit breaker
+ * @return the resilient executor
+ */
+ @Bean
+ @ConditionalOnMissingBean
+ public ResilientExecutor fluxgateResilientExecutor(
+ RetryExecutor retryExecutor, CircuitBreaker circuitBreaker) {
+ return new ResilientExecutor(retryExecutor, circuitBreaker);
+ }
+}
diff --git a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/properties/FluxgateProperties.java b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/properties/FluxgateProperties.java
index fc44a2d..bd81ee4 100644
--- a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/properties/FluxgateProperties.java
+++ b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/properties/FluxgateProperties.java
@@ -1,5 +1,6 @@
package org.fluxgate.spring.properties;
+import java.time.Duration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
@@ -46,6 +47,9 @@ public class FluxgateProperties {
/** Actuator configuration. */
@NestedConfigurationProperty private ActuatorProperties actuator = new ActuatorProperties();
+ /** Rule reload configuration for hot reload support. */
+ @NestedConfigurationProperty private ReloadProperties reload = new ReloadProperties();
+
public MongoProperties getMongo() {
return mongo;
}
@@ -86,6 +90,14 @@ public void setActuator(ActuatorProperties actuator) {
this.actuator = actuator;
}
+ public ReloadProperties getReload() {
+ return reload;
+ }
+
+ public void setReload(ReloadProperties reload) {
+ this.reload = reload;
+ }
+
// =========================================================================
// Nested Configuration Classes
// =========================================================================
@@ -304,6 +316,19 @@ public static class RateLimitProperties {
/** Default rule set ID to use when no specific rule set is matched. */
private String defaultRuleSetId;
+ /**
+ * Behavior when no matching rule set is found.
+ *
+ * In production environments with strict security requirements, consider setting this to
+ * DENY to ensure all requests are rate-limited.
+ */
+ private MissingRuleBehavior missingRuleBehavior = MissingRuleBehavior.ALLOW;
+
/**
* Filter order (lower = higher priority). Default is high priority to run before other filters.
*/
@@ -398,6 +423,31 @@ public boolean isIncludeHeaders() {
public void setIncludeHeaders(boolean includeHeaders) {
this.includeHeaders = includeHeaders;
}
+
+ public MissingRuleBehavior getMissingRuleBehavior() {
+ return missingRuleBehavior;
+ }
+
+ public void setMissingRuleBehavior(MissingRuleBehavior missingRuleBehavior) {
+ this.missingRuleBehavior = missingRuleBehavior;
+ }
+
+ /**
+ * Check if requests should be denied when no rule is found.
+ *
+ * @return true if missing rules should result in denial
+ */
+ public boolean isDenyWhenRuleMissing() {
+ return missingRuleBehavior == MissingRuleBehavior.DENY;
+ }
+ }
+
+ /** Behavior when no matching rate limit rule is found. */
+ public enum MissingRuleBehavior {
+ /** Allow the request to proceed (permissive mode). */
+ ALLOW,
+ /** Deny the request (strict mode, fail-closed). */
+ DENY
}
/** Metrics configuration for Prometheus/Micrometer integration. */
@@ -458,4 +508,210 @@ public void setEnabled(boolean enabled) {
}
}
}
+
+ /**
+ * Rule reload configuration for hot reload support.
+ *
+ * Supports multiple strategies:
+ *
+ * These properties configure retry and circuit breaker behavior for FluxGate operations.
+ */
+@ConfigurationProperties(prefix = "fluxgate.resilience")
+public class FluxgateResilienceProperties {
+
+ /** Retry configuration. */
+ private final Retry retry = new Retry();
+
+ /** Circuit breaker configuration. */
+ private final CircuitBreaker circuitBreaker = new CircuitBreaker();
+
+ public Retry getRetry() {
+ return retry;
+ }
+
+ public CircuitBreaker getCircuitBreaker() {
+ return circuitBreaker;
+ }
+
+ /** Retry configuration properties. */
+ public static class Retry {
+
+ /** Whether retry is enabled. Default is true. */
+ private boolean enabled = true;
+
+ /** Maximum number of attempts (including initial attempt). Default is 3. */
+ private int maxAttempts = 3;
+
+ /** Initial backoff duration before first retry. Default is 100ms. */
+ private Duration initialBackoff = Duration.ofMillis(100);
+
+ /** Multiplier for exponential backoff. Default is 2.0. */
+ private double multiplier = 2.0;
+
+ /** Maximum backoff duration. Default is 2 seconds. */
+ private Duration maxBackoff = Duration.ofSeconds(2);
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public int getMaxAttempts() {
+ return maxAttempts;
+ }
+
+ public void setMaxAttempts(int maxAttempts) {
+ this.maxAttempts = maxAttempts;
+ }
+
+ public Duration getInitialBackoff() {
+ return initialBackoff;
+ }
+
+ public void setInitialBackoff(Duration initialBackoff) {
+ this.initialBackoff = initialBackoff;
+ }
+
+ public double getMultiplier() {
+ return multiplier;
+ }
+
+ public void setMultiplier(double multiplier) {
+ this.multiplier = multiplier;
+ }
+
+ public Duration getMaxBackoff() {
+ return maxBackoff;
+ }
+
+ public void setMaxBackoff(Duration maxBackoff) {
+ this.maxBackoff = maxBackoff;
+ }
+ }
+
+ /** Circuit breaker configuration properties. */
+ public static class CircuitBreaker {
+
+ /** Whether circuit breaker is enabled. Default is false. */
+ private boolean enabled = false;
+
+ /** Number of failures before opening the circuit. Default is 5. */
+ private int failureThreshold = 5;
+
+ /** Duration to wait in open state before transitioning to half-open. Default is 30s. */
+ private Duration waitDurationInOpenState = Duration.ofSeconds(30);
+
+ /** Number of calls permitted in half-open state. Default is 3. */
+ private int permittedCallsInHalfOpenState = 3;
+
+ /** Fallback strategy when circuit is open. Default is FAIL_OPEN. */
+ private CircuitBreakerConfig.FallbackStrategy fallback =
+ CircuitBreakerConfig.FallbackStrategy.FAIL_OPEN;
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public int getFailureThreshold() {
+ return failureThreshold;
+ }
+
+ public void setFailureThreshold(int failureThreshold) {
+ this.failureThreshold = failureThreshold;
+ }
+
+ public Duration getWaitDurationInOpenState() {
+ return waitDurationInOpenState;
+ }
+
+ public void setWaitDurationInOpenState(Duration waitDurationInOpenState) {
+ this.waitDurationInOpenState = waitDurationInOpenState;
+ }
+
+ public int getPermittedCallsInHalfOpenState() {
+ return permittedCallsInHalfOpenState;
+ }
+
+ public void setPermittedCallsInHalfOpenState(int permittedCallsInHalfOpenState) {
+ this.permittedCallsInHalfOpenState = permittedCallsInHalfOpenState;
+ }
+
+ public CircuitBreakerConfig.FallbackStrategy getFallback() {
+ return fallback;
+ }
+
+ public void setFallback(CircuitBreakerConfig.FallbackStrategy fallback) {
+ this.fallback = fallback;
+ }
+ }
+}
diff --git a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/cache/CaffeineRuleCache.java b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/cache/CaffeineRuleCache.java
new file mode 100644
index 0000000..bb74959
--- /dev/null
+++ b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/cache/CaffeineRuleCache.java
@@ -0,0 +1,135 @@
+package org.fluxgate.spring.reload.cache;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import org.fluxgate.core.ratelimiter.RateLimitRuleSet;
+import org.fluxgate.core.reload.RuleCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Caffeine-based implementation of {@link RuleCache}.
+ *
+ * Provides high-performance, thread-safe local caching of rate limit rule sets with configurable
+ * TTL and maximum size.
+ *
+ * Example usage:
+ *
+ * This is typically called automatically by Caffeine, but can be invoked manually if needed.
+ */
+ public void cleanUp() {
+ cache.cleanUp();
+ }
+}
diff --git a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/handler/RedisBucketResetHandler.java b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/handler/RedisBucketResetHandler.java
new file mode 100644
index 0000000..b67d418
--- /dev/null
+++ b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/handler/RedisBucketResetHandler.java
@@ -0,0 +1,62 @@
+package org.fluxgate.spring.reload.handler;
+
+import java.util.Objects;
+import org.fluxgate.core.reload.BucketResetHandler;
+import org.fluxgate.core.reload.RuleReloadEvent;
+import org.fluxgate.core.reload.RuleReloadListener;
+import org.fluxgate.redis.store.RedisTokenBucketStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Redis implementation of {@link BucketResetHandler}.
+ *
+ * This handler deletes token buckets from Redis when rules are changed, ensuring that the new
+ * rules take effect immediately.
+ *
+ * It also implements {@link RuleReloadListener} to automatically reset buckets when reload
+ * events are received via Pub/Sub or polling.
+ */
+public class RedisBucketResetHandler implements BucketResetHandler, RuleReloadListener {
+
+ private static final Logger log = LoggerFactory.getLogger(RedisBucketResetHandler.class);
+
+ private final RedisTokenBucketStore tokenBucketStore;
+
+ /**
+ * Creates a new RedisBucketResetHandler.
+ *
+ * @param tokenBucketStore the Redis token bucket store
+ */
+ public RedisBucketResetHandler(RedisTokenBucketStore tokenBucketStore) {
+ this.tokenBucketStore =
+ Objects.requireNonNull(tokenBucketStore, "tokenBucketStore must not be null");
+ }
+
+ @Override
+ public void resetBuckets(String ruleSetId) {
+ Objects.requireNonNull(ruleSetId, "ruleSetId must not be null");
+ log.info("Resetting token buckets for ruleSetId: {}", ruleSetId);
+ long deleted = tokenBucketStore.deleteBucketsByRuleSetId(ruleSetId);
+ log.info("Reset complete: {} buckets deleted for ruleSetId: {}", deleted, ruleSetId);
+ }
+
+ @Override
+ public void resetAllBuckets() {
+ log.info("Resetting all token buckets (full reset)");
+ long deleted = tokenBucketStore.deleteAllBuckets();
+ log.info("Full reset complete: {} buckets deleted", deleted);
+ }
+
+ @Override
+ public void onReload(RuleReloadEvent event) {
+ if (event.isFullReload()) {
+ log.info("Full reload event received, resetting all buckets");
+ resetAllBuckets();
+ } else {
+ String ruleSetId = event.getRuleSetId();
+ log.info("Reload event received for ruleSetId: {}, resetting buckets", ruleSetId);
+ resetBuckets(ruleSetId);
+ }
+ }
+}
diff --git a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/AbstractReloadStrategy.java b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/AbstractReloadStrategy.java
new file mode 100644
index 0000000..e0432ac
--- /dev/null
+++ b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/AbstractReloadStrategy.java
@@ -0,0 +1,117 @@
+package org.fluxgate.spring.reload.strategy;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.fluxgate.core.reload.ReloadSource;
+import org.fluxgate.core.reload.RuleReloadEvent;
+import org.fluxgate.core.reload.RuleReloadListener;
+import org.fluxgate.core.reload.RuleReloadStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base class for reload strategies.
+ *
+ * Provides common functionality for managing listeners and lifecycle state.
+ */
+public abstract class AbstractReloadStrategy implements RuleReloadStrategy {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final List Used when hot reload is disabled (strategy = NONE). Rules are always fetched fresh from the
+ * provider without caching.
+ */
+public class NoOpReloadStrategy extends AbstractReloadStrategy {
+
+ @Override
+ protected ReloadSource getReloadSource() {
+ return ReloadSource.MANUAL;
+ }
+
+ @Override
+ protected void doStart() {
+ log.info("NoOp reload strategy active - hot reload disabled");
+ }
+
+ @Override
+ protected void doStop() {
+ // Nothing to clean up
+ }
+
+ @Override
+ public void triggerReload(String ruleSetId) {
+ log.debug("NoOp reload triggered for ruleSetId: {} (ignored)", ruleSetId);
+ }
+
+ @Override
+ public void triggerReloadAll() {
+ log.debug("NoOp full reload triggered (ignored)");
+ }
+}
diff --git a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/PollingReloadStrategy.java b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/PollingReloadStrategy.java
new file mode 100644
index 0000000..6e2c160
--- /dev/null
+++ b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/PollingReloadStrategy.java
@@ -0,0 +1,234 @@
+package org.fluxgate.spring.reload.strategy;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.fluxgate.core.ratelimiter.RateLimitRuleSet;
+import org.fluxgate.core.reload.ReloadSource;
+import org.fluxgate.core.reload.RuleCache;
+import org.fluxgate.core.reload.RuleReloadEvent;
+import org.fluxgate.core.spi.RateLimitRuleSetProvider;
+
+/**
+ * Polling-based reload strategy that periodically checks for rule changes.
+ *
+ * This strategy maintains a version map (using hashCode) of cached rule sets and compares them
+ * against the source provider at regular intervals.
+ *
+ * Configuration example:
+ *
+ * Uses the rule set's content to generate a hash that changes when the rules change.
+ *
+ * @param ruleSet the rule set
+ * @return version hash
+ */
+ private int computeVersion(RateLimitRuleSet ruleSet) {
+ // Use a combination of ID, description, and rules hash
+ int hash = ruleSet.getId().hashCode();
+ if (ruleSet.getDescription() != null) {
+ hash = 31 * hash + ruleSet.getDescription().hashCode();
+ }
+ hash = 31 * hash + ruleSet.getRules().hashCode();
+ return hash;
+ }
+
+ /**
+ * Manually triggers a version check for a specific rule set.
+ *
+ * @param ruleSetId the rule set ID to check
+ */
+ public void forceCheck(String ruleSetId) {
+ checkForChange(ruleSetId);
+ }
+
+ /**
+ * Returns the configured poll interval.
+ *
+ * @return poll interval
+ */
+ public Duration getPollInterval() {
+ return pollInterval;
+ }
+
+ /**
+ * Returns the configured initial delay.
+ *
+ * @return initial delay
+ */
+ public Duration getInitialDelay() {
+ return initialDelay;
+ }
+
+ /**
+ * Returns the current version map for debugging.
+ *
+ * @return unmodifiable copy of version map
+ */
+ public Map This strategy subscribes to a Redis channel and listens for rule change messages. When a
+ * message is received, it triggers a reload event to invalidate cached rules.
+ *
+ * Message format:
+ *
+ * Configuration example:
+ *
+ * Supports two message formats:
+ *
+ * {@code
+ * RuleReloadStrategy strategy = new PollingReloadStrategy(...);
+ * strategy.addListener(event -> cache.invalidate(event.getRuleSetId()));
+ * strategy.start();
+ *
+ * // Later, to trigger a reload programmatically:
+ * strategy.triggerReload("my-rule-set");
+ *
+ * // On shutdown:
+ * strategy.stop();
+ * }
+ */
+public interface RuleReloadStrategy {
+
+ /**
+ * Starts the reload mechanism.
+ *
+ *
+ *
+ */
+public interface CircuitBreaker {
+
+ /**
+ * Executes the given action with circuit breaker protection.
+ *
+ * @param
+ * Request → CircuitBreaker → Retry → Actual Operation
+ *
+ *
+ *
+ * fluxgate:
+ * ratelimit:
+ * missing-rule-behavior: ALLOW # or DENY
+ *
*/
@Component
public class StandaloneRateLimitHandler implements FluxgateRateLimitHandler {
@@ -25,12 +34,17 @@ public class StandaloneRateLimitHandler implements FluxgateRateLimitHandler {
private final RateLimitRuleSetProvider ruleSetProvider;
private final RedisRateLimiter rateLimiter;
+ private final boolean denyWhenRuleMissing;
public StandaloneRateLimitHandler(
- RateLimitRuleSetProvider ruleSetProvider, RedisRateLimiter rateLimiter) {
+ RateLimitRuleSetProvider ruleSetProvider,
+ RedisRateLimiter rateLimiter,
+ FluxgateProperties properties) {
this.ruleSetProvider = ruleSetProvider;
this.rateLimiter = rateLimiter;
- log.info("StandaloneRateLimitHandler initialized");
+ this.denyWhenRuleMissing = properties.getRatelimit().isDenyWhenRuleMissing();
+ log.info(
+ "StandaloneRateLimitHandler initialized (denyWhenRuleMissing={})", denyWhenRuleMissing);
}
@Override
@@ -41,8 +55,14 @@ public RateLimitResponse tryConsume(RequestContext context, String ruleSetId) {
// Look up the ruleset from MongoDB
Optional
+ *
+ *
+ *
+ *
+ *
+ *
+ * fluxgate:
+ * reload:
+ * enabled: true
+ * strategy: AUTO
+ * cache:
+ * ttl: 5m
+ * max-size: 1000
+ * polling:
+ * interval: 30s
+ * pubsub:
+ * channel: fluxgate:rule-reload
+ *
+ */
+@AutoConfiguration(after = {FluxgateMongoAutoConfiguration.class})
+@ConditionalOnProperty(
+ prefix = "fluxgate.reload",
+ name = "enabled",
+ havingValue = "true",
+ matchIfMissing = true)
+@EnableConfigurationProperties(FluxgateProperties.class)
+public class FluxgateReloadAutoConfiguration {
+
+ private static final Logger log = LoggerFactory.getLogger(FluxgateReloadAutoConfiguration.class);
+
+ private final FluxgateProperties properties;
+
+ public FluxgateReloadAutoConfiguration(FluxgateProperties properties) {
+ this.properties = properties;
+ }
+
+ /**
+ * Creates the Caffeine-based rule cache.
+ *
+ *
+ *
+ */
+ @Bean
+ @ConditionalOnMissingBean(RuleCache.class)
+ @ConditionalOnClass(name = "com.github.benmanes.caffeine.cache.Caffeine")
+ @ConditionalOnProperty(
+ prefix = "fluxgate.reload.cache",
+ name = "enabled",
+ havingValue = "true",
+ matchIfMissing = true)
+ public RuleCache ruleCache() {
+ ReloadProperties reloadProps = properties.getReload();
+
+ if (reloadProps.getStrategy() == ReloadStrategy.NONE) {
+ log.info("Rule cache disabled (strategy=NONE)");
+ return null;
+ }
+
+ ReloadProperties.CacheProperties cacheProps = reloadProps.getCache();
+ log.info(
+ "Creating CaffeineRuleCache with ttl={}, maxSize={}",
+ cacheProps.getTtl(),
+ cacheProps.getMaxSize());
+
+ return new CaffeineRuleCache(cacheProps.getTtl(), cacheProps.getMaxSize());
+ }
+
+ /**
+ * Creates the rule reload strategy based on configuration.
+ *
+ *
+ *
+ */
+ @Bean
+ @ConditionalOnMissingBean(RuleReloadStrategy.class)
+ public RuleReloadStrategy ruleReloadStrategy(
+ @Qualifier("delegateRuleSetProvider") RateLimitRuleSetProvider ruleSetProvider,
+ ObjectProvider
+ *
+ *
+ *
+ *
+ *
+ *
+ * fluxgate:
+ * reload:
+ * enabled: true
+ * strategy: AUTO
+ * cache:
+ * enabled: true
+ * ttl: 5m
+ * max-size: 1000
+ * polling:
+ * interval: 30s
+ * pubsub:
+ * channel: fluxgate:rule-reload
+ *
+ */
+ public static class ReloadProperties {
+
+ /** Enable rule hot reload feature. */
+ private boolean enabled = true;
+
+ /**
+ * Reload strategy to use.
+ *
+ *
+ *
+ */
+ private ReloadStrategy strategy = ReloadStrategy.AUTO;
+
+ /** Cache configuration. */
+ private CacheProperties cache = new CacheProperties();
+
+ /** Polling strategy configuration. */
+ private PollingProperties polling = new PollingProperties();
+
+ /** Pub/Sub strategy configuration. */
+ private PubSubProperties pubsub = new PubSubProperties();
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public ReloadStrategy getStrategy() {
+ return strategy;
+ }
+
+ public void setStrategy(ReloadStrategy strategy) {
+ this.strategy = strategy;
+ }
+
+ public CacheProperties getCache() {
+ return cache;
+ }
+
+ public void setCache(CacheProperties cache) {
+ this.cache = cache;
+ }
+
+ public PollingProperties getPolling() {
+ return polling;
+ }
+
+ public void setPolling(PollingProperties polling) {
+ this.polling = polling;
+ }
+
+ public PubSubProperties getPubsub() {
+ return pubsub;
+ }
+
+ public void setPubsub(PubSubProperties pubsub) {
+ this.pubsub = pubsub;
+ }
+
+ /** Rule cache configuration. */
+ public static class CacheProperties {
+
+ /** Enable local caching of rules. */
+ private boolean enabled = true;
+
+ /** Time-to-live for cached rules. Rules will be refetched after this duration. */
+ private Duration ttl = Duration.ofMinutes(5);
+
+ /** Maximum number of rules to cache. */
+ private int maxSize = 1000;
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public Duration getTtl() {
+ return ttl;
+ }
+
+ public void setTtl(Duration ttl) {
+ this.ttl = ttl;
+ }
+
+ public int getMaxSize() {
+ return maxSize;
+ }
+
+ public void setMaxSize(int maxSize) {
+ this.maxSize = maxSize;
+ }
+ }
+
+ /** Polling strategy configuration. */
+ public static class PollingProperties {
+
+ /** Interval between polling checks. */
+ private Duration interval = Duration.ofSeconds(30);
+
+ /** Initial delay before first poll. */
+ private Duration initialDelay = Duration.ofSeconds(10);
+
+ public Duration getInterval() {
+ return interval;
+ }
+
+ public void setInterval(Duration interval) {
+ this.interval = interval;
+ }
+
+ public Duration getInitialDelay() {
+ return initialDelay;
+ }
+
+ public void setInitialDelay(Duration initialDelay) {
+ this.initialDelay = initialDelay;
+ }
+ }
+
+ /** Pub/Sub strategy configuration. */
+ public static class PubSubProperties {
+
+ /** Redis channel name for reload notifications. */
+ private String channel = "fluxgate:rule-reload";
+
+ /** Retry subscription on failure. */
+ private boolean retryOnFailure = true;
+
+ /** Interval between retry attempts. */
+ private Duration retryInterval = Duration.ofSeconds(5);
+
+ public String getChannel() {
+ return channel;
+ }
+
+ public void setChannel(String channel) {
+ this.channel = channel;
+ }
+
+ public boolean isRetryOnFailure() {
+ return retryOnFailure;
+ }
+
+ public void setRetryOnFailure(boolean retryOnFailure) {
+ this.retryOnFailure = retryOnFailure;
+ }
+
+ public Duration getRetryInterval() {
+ return retryInterval;
+ }
+
+ public void setRetryInterval(Duration retryInterval) {
+ this.retryInterval = retryInterval;
+ }
+ }
+ }
+
+ /** Reload strategy options. */
+ public enum ReloadStrategy {
+ /** Automatically select best strategy based on available infrastructure. */
+ AUTO,
+ /** Use Redis Pub/Sub for real-time reload notifications. */
+ PUBSUB,
+ /** Use periodic polling to check for changes. */
+ POLLING,
+ /** Disable hot reload - always fetch fresh rules from provider. */
+ NONE
+ }
}
diff --git a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/properties/FluxgateResilienceProperties.java b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/properties/FluxgateResilienceProperties.java
new file mode 100644
index 0000000..06b083a
--- /dev/null
+++ b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/properties/FluxgateResilienceProperties.java
@@ -0,0 +1,147 @@
+package org.fluxgate.spring.properties;
+
+import java.time.Duration;
+import org.fluxgate.core.resilience.CircuitBreakerConfig;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+/**
+ * Configuration properties for FluxGate resilience features.
+ *
+ * {@code
+ * RuleCache cache = new CaffeineRuleCache(
+ * Duration.ofMinutes(5), // TTL
+ * 1000 // max size
+ * );
+ * }
+ */
+public class CaffeineRuleCache implements RuleCache {
+
+ private static final Logger log = LoggerFactory.getLogger(CaffeineRuleCache.class);
+
+ private final Cache
+ * fluxgate:
+ * reload:
+ * strategy: POLLING
+ * polling:
+ * interval: 30s
+ * initial-delay: 10s
+ *
+ */
+public class PollingReloadStrategy extends AbstractReloadStrategy {
+
+ private final RateLimitRuleSetProvider provider;
+ private final RuleCache cache;
+ private final Duration pollInterval;
+ private final Duration initialDelay;
+
+ private ScheduledExecutorService scheduler;
+ private ScheduledFuture> pollTask;
+
+ // Track rule set versions using hash codes
+ private final Map
+ *
+ *
+ *
+ * fluxgate:
+ * reload:
+ * strategy: PUBSUB
+ * pubsub:
+ * channel: fluxgate:rule-reload
+ * retry-on-failure: true
+ * retry-interval: 5s
+ *
+ */
+public class RedisPubSubReloadStrategy extends AbstractReloadStrategy {
+
+ /** Message indicating a full reload should occur. */
+ public static final String FULL_RELOAD_MESSAGE = "*";
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private final String redisUri;
+ private final String channel;
+ private final boolean retryOnFailure;
+ private final Duration retryInterval;
+ private final boolean isCluster;
+ private final Duration timeout;
+
+ private Object redisClient; // Created lazily
+ private final AtomicReference
+ *
+ *
+ * @param message the message received
+ */
+ private void handleMessage(String message) {
+ log.debug("Received Pub/Sub message: {}", message);
+
+ RuleReloadEvent event;
+ if (message == null || message.isEmpty() || FULL_RELOAD_MESSAGE.equals(message)) {
+ event = RuleReloadEvent.fullReload(ReloadSource.PUBSUB);
+ log.info("Full reload triggered via Pub/Sub");
+ } else if (message.trim().startsWith("{")) {
+ // JSON format message
+ event = parseJsonMessage(message);
+ } else {
+ // Plain text ruleSetId
+ event = RuleReloadEvent.forRuleSet(message, ReloadSource.PUBSUB);
+ log.info("Reload triggered via Pub/Sub for ruleSetId: {}", message);
+ }
+
+ notifyListeners(event);
+ }
+
+ /**
+ * Parses a JSON format message.
+ *
+ * @param message the JSON message
+ * @return the parsed reload event
+ */
+ private RuleReloadEvent parseJsonMessage(String message) {
+ try {
+ JsonNode root = OBJECT_MAPPER.readTree(message);
+
+ boolean fullReload = root.path("fullReload").asBoolean(false);
+ if (fullReload) {
+ log.info("Full reload triggered via Pub/Sub (JSON)");
+ return RuleReloadEvent.fullReload(ReloadSource.PUBSUB);
+ }
+
+ String ruleSetId = root.path("ruleSetId").asText(null);
+ if (ruleSetId != null && !ruleSetId.isEmpty()) {
+ log.info("Reload triggered via Pub/Sub for ruleSetId: {}", ruleSetId);
+ return RuleReloadEvent.forRuleSet(ruleSetId, ReloadSource.PUBSUB);
+ }
+
+ log.warn("Invalid JSON message, triggering full reload: {}", message);
+ return RuleReloadEvent.fullReload(ReloadSource.PUBSUB);
+ } catch (JsonProcessingException e) {
+ log.warn("Failed to parse JSON message, treating as ruleSetId: {}", message, e);
+ return RuleReloadEvent.forRuleSet(message, ReloadSource.PUBSUB);
+ }
+ }
+
+ /**
+ * Returns the configured channel name.
+ *
+ * @return channel name
+ */
+ public String getChannel() {
+ return channel;
+ }
+
+ /**
+ * Returns whether retry on failure is enabled.
+ *
+ * @return true if retry is enabled
+ */
+ public boolean isRetryOnFailure() {
+ return retryOnFailure;
+ }
+
+ /**
+ * Returns the retry interval.
+ *
+ * @return retry interval
+ */
+ public Duration getRetryInterval() {
+ return retryInterval;
+ }
+
+ /**
+ * Checks if currently connected to Redis Pub/Sub.
+ *
+ * @return true if connected
+ */
+ public boolean isConnected() {
+ StatefulRedisPubSubConnection