diff --git a/fluxgate-samples/fluxgate-sample-standalone/elk-docker-compose.yml b/docker/elk.yml
similarity index 100%
rename from fluxgate-samples/fluxgate-sample-standalone/elk-docker-compose.yml
rename to docker/elk.yml
diff --git a/docker/full.yml b/docker/full.yml
new file mode 100644
index 0000000..d71696d
--- /dev/null
+++ b/docker/full.yml
@@ -0,0 +1,96 @@
+version: "3.8"
+
+# Full FluxGate Development Environment
+# Includes: MongoDB, Redis (standalone), ELK Stack
+#
+# Usage:
+# docker-compose -f docker/full.yml up -d
+# docker-compose -f docker/full.yml down
+#
+# Services:
+# - MongoDB: localhost:27017 (user: fluxgate, password: fluxgate123)
+# - Redis: localhost:6379
+# - Elasticsearch: localhost:9200
+# - Kibana: localhost:5601
+# - Logstash: localhost:5044 (TCP input), localhost:9600 (monitoring)
+
+services:
+ # ============================================
+ # MongoDB - Rule Storage
+ # ============================================
+ mongo:
+ image: mongo:7.0
+ container_name: fluxgate-mongo
+ ports:
+ - "27017:27017"
+ environment:
+ MONGO_INITDB_ROOT_USERNAME: fluxgate
+ MONGO_INITDB_ROOT_PASSWORD: fluxgate123
+ MONGO_INITDB_DATABASE: fluxgate
+ volumes:
+ - mongo-data:/data/db
+
+ # ============================================
+ # Redis Standalone - Rate Limiting
+ # ============================================
+ redis:
+ image: redis:7.2-alpine
+ container_name: fluxgate-redis
+ ports:
+ - "6379:6379"
+
+ # ============================================
+ # ELK Stack - Logging & Monitoring
+ # ============================================
+ elasticsearch:
+ image: docker.elastic.co/elasticsearch/elasticsearch:8.12.2
+ container_name: elk-elasticsearch
+ environment:
+ - discovery.type=single-node
+ - xpack.security.enabled=false
+ - ES_JAVA_OPTS=-Xms1g -Xmx1g
+ ports:
+ - "9200:9200"
+ volumes:
+ - es-data:/usr/share/elasticsearch/data
+
+ kibana:
+ image: docker.elastic.co/kibana/kibana:8.12.2
+ container_name: elk-kibana
+ depends_on:
+ - elasticsearch
+ environment:
+ - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
+ ports:
+ - "5601:5601"
+
+ logstash:
+ image: docker.elastic.co/logstash/logstash:8.12.2
+ container_name: elk-logstash
+ depends_on:
+ - elasticsearch
+ ports:
+ - "5044:5044" # TCP input
+ - "9600:9600" # Logstash monitoring API
+ environment:
+ - XPACK_MONITORING_ENABLED=false
+ - PIPELINE_WORKERS=1
+ command: >
+ logstash -e '
+ input {
+ tcp {
+ port => 5044
+ codec => json_lines
+ }
+ }
+ output {
+ elasticsearch {
+ hosts => ["http://elasticsearch:9200"]
+ index => "fluxgate-logs-%{+YYYY.MM.dd}"
+ }
+ }
+ '
+
+volumes:
+ mongo-data:
+ es-data:
diff --git a/docker/mongo.yml b/docker/mongo.yml
new file mode 100644
index 0000000..5e2f94e
--- /dev/null
+++ b/docker/mongo.yml
@@ -0,0 +1,17 @@
+version: "3.8"
+
+services:
+ mongo:
+ image: mongo:7.0
+ container_name: fluxgate-mongo
+ ports:
+ - "27017:27017"
+ environment:
+ MONGO_INITDB_ROOT_USERNAME: fluxgate
+ MONGO_INITDB_ROOT_PASSWORD: fluxgate123
+ MONGO_INITDB_DATABASE: fluxgate
+ volumes:
+ - mongo-data:/data/db
+
+volumes:
+ mongo-data:
diff --git a/docker/redis-cluster.yml b/docker/redis-cluster.yml
new file mode 100644
index 0000000..453e07b
--- /dev/null
+++ b/docker/redis-cluster.yml
@@ -0,0 +1,11 @@
+version: "3.8"
+
+services:
+ redis-cluster:
+ image: grokzen/redis-cluster:7.0.10
+ container_name: redis-cluster-test
+ environment:
+ IP: 0.0.0.0
+ INITIAL_PORT: 7100
+ ports:
+ - "7100-7105:7100-7105"
diff --git a/docker/redis-standalone.yml b/docker/redis-standalone.yml
new file mode 100644
index 0000000..6557be6
--- /dev/null
+++ b/docker/redis-standalone.yml
@@ -0,0 +1,8 @@
+version: "3.8"
+
+services:
+ redis:
+ image: redis:7.2-alpine
+ container_name: fluxgate-redis
+ ports:
+ - "6379:6379"
diff --git a/fluxgate-control-support/pom.xml b/fluxgate-control-support/pom.xml
new file mode 100644
index 0000000..f1e1f15
--- /dev/null
+++ b/fluxgate-control-support/pom.xml
@@ -0,0 +1,135 @@
+
+
When applied to a method, the {@link RuleChangeAspect} will automatically call {@link + * org.fluxgate.control.notify.RuleChangeNotifier#notifyFullReload()} after the method completes + * successfully. + * + *
Use this annotation for operations that affect multiple rules or when the specific rule set ID + * is not available. + * + *
Example usage: + * + *
{@code
+ * @Service
+ * public class RuleManagementService {
+ *
+ * @NotifyFullReload
+ * public void deleteAllRules() {
+ * mongoRepository.deleteAll();
+ * // Full reload notification is sent automatically
+ * }
+ *
+ * @NotifyFullReload
+ * public void importRules(List rules) {
+ * mongoRepository.deleteAll();
+ * mongoRepository.saveAll(rules);
+ * }
+ *
+ * @NotifyFullReload
+ * public void resetToDefaults() {
+ * mongoRepository.deleteAll();
+ * mongoRepository.saveAll(defaultRules);
+ * }
+ * }
+ * }
+ *
+ * @see NotifyRuleChange
+ * @see RuleChangeAspect
+ */
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+public @interface NotifyFullReload {}
diff --git a/fluxgate-control-support/src/main/java/org/fluxgate/control/aop/NotifyRuleChange.java b/fluxgate-control-support/src/main/java/org/fluxgate/control/aop/NotifyRuleChange.java
new file mode 100644
index 0000000..92d38d8
--- /dev/null
+++ b/fluxgate-control-support/src/main/java/org/fluxgate/control/aop/NotifyRuleChange.java
@@ -0,0 +1,75 @@
+package org.fluxgate.control.aop;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation to automatically notify FluxGate instances when a rule is changed.
+ *
+ * When applied to a method, the {@link RuleChangeAspect} will automatically call {@link + * org.fluxgate.control.notify.RuleChangeNotifier#notifyChange(String)} after the method completes + * successfully. + * + *
The {@code ruleSetId} attribute supports Spring Expression Language (SpEL) to extract the rule + * set ID from method parameters. + * + *
Example usage: + * + *
{@code
+ * @Service
+ * public class RuleManagementService {
+ *
+ * @NotifyRuleChange(ruleSetId = "#ruleSetId")
+ * public void updateRule(String ruleSetId, RuleDto dto) {
+ * mongoRepository.save(dto);
+ * // Notification is sent automatically after this method returns
+ * }
+ *
+ * @NotifyRuleChange(ruleSetId = "#dto.ruleSetId")
+ * public void saveRule(RuleDto dto) {
+ * mongoRepository.save(dto);
+ * }
+ *
+ * @NotifyRuleChange(ruleSetId = "#result.id")
+ * public Rule createRule(RuleDto dto) {
+ * return mongoRepository.save(dto);
+ * // Uses the returned object's id
+ * }
+ * }
+ * }
+ *
+ * SpEL expressions can reference: + * + *
Examples: + * + *
This aspect intercepts methods annotated with rule change annotations and automatically + * notifies FluxGate instances after successful method execution. + * + *
For {@link NotifyRuleChange}, it evaluates the SpEL expression to extract the rule set ID from + * method parameters or return value. + * + *
Example: + * + *
{@code
+ * @NotifyRuleChange(ruleSetId = "#ruleSetId")
+ * public void updateRule(String ruleSetId, RuleDto dto) {
+ * // After this method returns successfully,
+ * // notifier.notifyChange(ruleSetId) is called automatically
+ * }
+ * }
+ */
+@Aspect
+public class RuleChangeAspect {
+
+ private static final Logger log = LoggerFactory.getLogger(RuleChangeAspect.class);
+
+ private final RuleChangeNotifier notifier;
+ private final ExpressionParser parser = new SpelExpressionParser();
+ private final ParameterNameDiscoverer parameterNameDiscoverer =
+ new DefaultParameterNameDiscoverer();
+
+ public RuleChangeAspect(RuleChangeNotifier notifier) {
+ this.notifier = notifier;
+ log.info("RuleChangeAspect initialized");
+ }
+
+ /**
+ * Handles methods annotated with {@link NotifyRuleChange}.
+ *
+ * After the method returns successfully, extracts the rule set ID using the SpEL expression + * and notifies all FluxGate instances. + * + * @param joinPoint the join point + * @param annotation the annotation + * @param result the return value of the method + */ + @AfterReturning( + pointcut = "@annotation(annotation)", + returning = "result", + argNames = "joinPoint,annotation,result") + public void afterRuleChange(JoinPoint joinPoint, NotifyRuleChange annotation, Object result) { + try { + String ruleSetId = extractRuleSetId(joinPoint, annotation.ruleSetId(), result); + + if (ruleSetId == null || ruleSetId.isBlank()) { + log.warn( + "Could not extract ruleSetId from expression '{}' in method {}", + annotation.ruleSetId(), + joinPoint.getSignature().getName()); + return; + } + + log.debug( + "Notifying rule change for ruleSetId={} from method {}", + ruleSetId, + joinPoint.getSignature().getName()); + + notifier.notifyChange(ruleSetId); + + } catch (Exception e) { + log.error( + "Failed to notify rule change for method {}: {}", + joinPoint.getSignature().getName(), + e.getMessage(), + e); + // Don't rethrow - notification failure should not fail the business operation + } + } + + /** + * Handles methods annotated with {@link NotifyFullReload}. + * + *
After the method returns successfully, notifies all FluxGate instances to perform a full + * reload. + * + * @param joinPoint the join point + * @param annotation the annotation + */ + @AfterReturning(pointcut = "@annotation(annotation)", argNames = "joinPoint,annotation") + public void afterFullReload(JoinPoint joinPoint, NotifyFullReload annotation) { + try { + log.debug("Notifying full reload from method {}", joinPoint.getSignature().getName()); + + notifier.notifyFullReload(); + + } catch (Exception e) { + log.error( + "Failed to notify full reload for method {}: {}", + joinPoint.getSignature().getName(), + e.getMessage(), + e); + // Don't rethrow - notification failure should not fail the business operation + } + } + + /** + * Extracts the rule set ID from the SpEL expression. + * + * @param joinPoint the join point + * @param expression the SpEL expression + * @param result the return value of the method + * @return the extracted rule set ID, or null if extraction fails + */ + private String extractRuleSetId(JoinPoint joinPoint, String expression, Object result) { + MethodSignature signature = (MethodSignature) joinPoint.getSignature(); + Method method = signature.getMethod(); + Object target = joinPoint.getTarget(); + Object[] args = joinPoint.getArgs(); + + EvaluationContext context = + new MethodBasedEvaluationContext(target, method, args, parameterNameDiscoverer); + + // Add result to context for expressions like #result.id + context.setVariable("result", result); + + Object value = parser.parseExpression(expression).getValue(context); + + return value != null ? value.toString() : null; + } +} diff --git a/fluxgate-control-support/src/main/java/org/fluxgate/control/autoconfigure/ControlSupportAutoConfiguration.java b/fluxgate-control-support/src/main/java/org/fluxgate/control/autoconfigure/ControlSupportAutoConfiguration.java new file mode 100644 index 0000000..43b2eb8 --- /dev/null +++ b/fluxgate-control-support/src/main/java/org/fluxgate/control/autoconfigure/ControlSupportAutoConfiguration.java @@ -0,0 +1,107 @@ +package org.fluxgate.control.autoconfigure; + +import org.fluxgate.control.aop.RuleChangeAspect; +import org.fluxgate.control.notify.RedisRuleChangeNotifier; +import org.fluxgate.control.notify.RuleChangeNotifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.EnableAspectJAutoProxy; + +/** + * Auto-configuration for FluxGate Control Support. + * + *
Automatically configures: + * + *
Example usage in application.yml: + * + *
+ * fluxgate: + * control: + * redis: + * uri: redis://localhost:6379 + * channel: fluxgate:rule-reload + *+ * + *
Example usage with annotations: + * + *
{@code
+ * @Service
+ * public class RuleManagementService {
+ *
+ * @NotifyRuleChange(ruleSetId = "#ruleSetId")
+ * public void updateRule(String ruleSetId, RuleDto dto) {
+ * mongoRepository.save(dto);
+ * }
+ *
+ * @NotifyFullReload
+ * public void deleteAllRules() {
+ * mongoRepository.deleteAll();
+ * }
+ * }
+ * }
+ */
+@AutoConfiguration
+@ConditionalOnClass(name = "io.lettuce.core.RedisClient")
+@ConditionalOnProperty(prefix = "fluxgate.control.redis", name = "uri")
+@EnableConfigurationProperties(ControlSupportProperties.class)
+@EnableAspectJAutoProxy
+public class ControlSupportAutoConfiguration {
+
+ private static final Logger log = LoggerFactory.getLogger(ControlSupportAutoConfiguration.class);
+
+ /**
+ * Creates the Redis-based rule change notifier.
+ *
+ * @param properties the configuration properties
+ * @return the notifier instance
+ */
+ @Bean
+ @ConditionalOnMissingBean(RuleChangeNotifier.class)
+ public RuleChangeNotifier ruleChangeNotifier(ControlSupportProperties properties) {
+ ControlSupportProperties.RedisProperties redis = properties.getRedis();
+
+ log.info(
+ "Creating RedisRuleChangeNotifier: uri={}, channel={}, source={}",
+ redis.getUri(),
+ redis.getChannel(),
+ properties.getSource());
+
+ return new RedisRuleChangeNotifier(
+ redis.getUri(), redis.getChannel(), redis.getTimeout(), properties.getSource());
+ }
+
+ /**
+ * Creates the AOP aspect for rule change annotations.
+ *
+ * Only created when: + * + *
Example configuration: + * + *
+ * fluxgate: + * control: + * redis: + * uri: redis://localhost:6379 + * channel: fluxgate:rule-reload + * timeout: 5s + * source: my-admin-app + *+ */ +@ConfigurationProperties(prefix = "fluxgate.control") +public class ControlSupportProperties { + + private final RedisProperties redis = new RedisProperties(); + + /** Source identifier for notifications (appears in messages). */ + private String source = "fluxgate-control"; + + public RedisProperties getRedis() { + return redis; + } + + public String getSource() { + return source; + } + + public void setSource(String source) { + this.source = source; + } + + /** Redis configuration for rule change notifications. */ + public static class RedisProperties { + + /** Redis URI (e.g., "redis://localhost:6379"). For cluster, use comma-separated URIs. */ + private String uri = "redis://localhost:6379"; + + /** Pub/Sub channel name for rule change notifications. */ + private String channel = "fluxgate:rule-reload"; + + /** Connection timeout. */ + private Duration timeout = Duration.ofSeconds(5); + + public String getUri() { + return uri; + } + + public void setUri(String uri) { + this.uri = uri; + } + + public String getChannel() { + return channel; + } + + public void setChannel(String channel) { + this.channel = channel; + } + + public Duration getTimeout() { + return timeout; + } + + public void setTimeout(Duration timeout) { + this.timeout = timeout; + } + } +} diff --git a/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RedisRuleChangeNotifier.java b/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RedisRuleChangeNotifier.java new file mode 100644 index 0000000..d430b3e --- /dev/null +++ b/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RedisRuleChangeNotifier.java @@ -0,0 +1,212 @@ +package org.fluxgate.control.notify; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.cluster.RedisClusterClient; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Redis Pub/Sub implementation of {@link RuleChangeNotifier}. + * + *
Publishes rule change notifications to a Redis channel. All FluxGate instances subscribed to + * this channel will receive the notification and invalidate their local caches. + * + *
Supports both standalone Redis and Redis Cluster configurations. + * + *
Example usage: + * + *
{@code
+ * // Standalone Redis
+ * RuleChangeNotifier notifier = new RedisRuleChangeNotifier(
+ * "redis://localhost:6379",
+ * "fluxgate:rule-reload"
+ * );
+ *
+ * // Notify specific rule change
+ * notifier.notifyChange("my-rule-set-id");
+ *
+ * // Notify full reload
+ * notifier.notifyFullReload();
+ *
+ * // Cleanup
+ * notifier.close();
+ * }
+ */
+public class RedisRuleChangeNotifier implements RuleChangeNotifier {
+
+ private static final Logger log = LoggerFactory.getLogger(RedisRuleChangeNotifier.class);
+ private static final String DEFAULT_SOURCE = "fluxgate-control";
+
+ private final String redisUri;
+ private final String channel;
+ private final Duration timeout;
+ private final String source;
+ private final ObjectMapper objectMapper;
+ private final boolean isCluster;
+
+ private volatile RedisClient redisClient;
+ private volatile RedisClusterClient redisClusterClient;
+ private volatile StatefulRedisConnectionThis class is serialized to JSON and published via Redis Pub/Sub. FluxGate instances + * subscribed to the channel will deserialize this message and take appropriate action. + */ +public class RuleChangeMessage { + + private final String ruleSetId; + private final boolean fullReload; + private final long timestamp; + private final String source; + + @JsonCreator + public RuleChangeMessage( + @JsonProperty("ruleSetId") String ruleSetId, + @JsonProperty("fullReload") boolean fullReload, + @JsonProperty("timestamp") long timestamp, + @JsonProperty("source") String source) { + this.ruleSetId = ruleSetId; + this.fullReload = fullReload; + this.timestamp = timestamp; + this.source = source; + } + + /** + * Creates a message for a specific rule set change. + * + * @param ruleSetId the ID of the changed rule set + * @param source identifier of the source application (e.g., "fluxgate-control") + * @return the change message + */ + public static RuleChangeMessage forRuleSet(String ruleSetId, String source) { + Objects.requireNonNull(ruleSetId, "ruleSetId must not be null"); + return new RuleChangeMessage(ruleSetId, false, Instant.now().toEpochMilli(), source); + } + + /** + * Creates a message for a full reload of all rules. + * + * @param source identifier of the source application (e.g., "fluxgate-control") + * @return the full reload message + */ + public static RuleChangeMessage fullReload(String source) { + return new RuleChangeMessage(null, true, Instant.now().toEpochMilli(), source); + } + + /** Returns the rule set ID, or null if this is a full reload. */ + public String getRuleSetId() { + return ruleSetId; + } + + /** Returns true if this is a full reload request. */ + public boolean isFullReload() { + return fullReload; + } + + /** Returns the timestamp when this message was created (epoch millis). */ + public long getTimestamp() { + return timestamp; + } + + /** Returns the source application identifier. */ + public String getSource() { + return source; + } + + @Override + public String toString() { + if (fullReload) { + return "RuleChangeMessage{fullReload=true, source='" + + source + + "', timestamp=" + + timestamp + + "}"; + } + return "RuleChangeMessage{ruleSetId='" + + ruleSetId + + "', source='" + + source + + "', timestamp=" + + timestamp + + "}"; + } +} diff --git a/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeNotificationException.java b/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeNotificationException.java new file mode 100644 index 0000000..9d45587 --- /dev/null +++ b/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeNotificationException.java @@ -0,0 +1,13 @@ +package org.fluxgate.control.notify; + +/** Exception thrown when a rule change notification fails to be published. */ +public class RuleChangeNotificationException extends RuntimeException { + + public RuleChangeNotificationException(String message) { + super(message); + } + + public RuleChangeNotificationException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeNotifier.java b/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeNotifier.java new file mode 100644 index 0000000..fb11025 --- /dev/null +++ b/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeNotifier.java @@ -0,0 +1,57 @@ +package org.fluxgate.control.notify; + +/** + * Interface for notifying FluxGate application servers about rule changes. + * + *
When rules are modified in the Admin/Studio application, this notifier broadcasts the change + * to all FluxGate instances so they can invalidate their local caches and reload the updated rules. + * + *
Example usage: + * + *
{@code
+ * @Service
+ * public class RuleManagementService {
+ * private final RuleChangeNotifier notifier;
+ *
+ * public void updateRule(String ruleSetId, RuleDto dto) {
+ * // 1. Save to database
+ * mongoRepository.save(dto);
+ *
+ * // 2. Notify all FluxGate instances
+ * notifier.notifyChange(ruleSetId);
+ * }
+ *
+ * public void deleteAllRules() {
+ * mongoRepository.deleteAll();
+ * notifier.notifyFullReload();
+ * }
+ * }
+ * }
+ */
+public interface RuleChangeNotifier {
+
+ /**
+ * Notifies all FluxGate instances that a specific rule set has changed.
+ *
+ * The instances will invalidate their local cache for this rule set and reload it from the + * database on the next request. + * + * @param ruleSetId the ID of the changed rule set + */ + void notifyChange(String ruleSetId); + + /** + * Notifies all FluxGate instances to perform a full reload of all rules. + * + *
Use this when multiple rules have changed or when performing bulk operations. All instances + * will invalidate their entire rule cache. + */ + void notifyFullReload(); + + /** + * Closes the notifier and releases any resources. + * + *
After calling this method, the notifier should not be used. + */ + void close(); +} diff --git a/fluxgate-control-support/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/fluxgate-control-support/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 0000000..affbfb4 --- /dev/null +++ b/fluxgate-control-support/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +org.fluxgate.control.autoconfigure.ControlSupportAutoConfiguration diff --git a/fluxgate-control-support/src/test/java/org/fluxgate/control/notify/RedisRuleChangeNotifierTest.java b/fluxgate-control-support/src/test/java/org/fluxgate/control/notify/RedisRuleChangeNotifierTest.java new file mode 100644 index 0000000..02b1afb --- /dev/null +++ b/fluxgate-control-support/src/test/java/org/fluxgate/control/notify/RedisRuleChangeNotifierTest.java @@ -0,0 +1,101 @@ +package org.fluxgate.control.notify; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.time.Duration; +import org.junit.jupiter.api.Test; + +class RedisRuleChangeNotifierTest { + + @Test + void shouldRequireRedisUri() { + assertThatThrownBy(() -> new RedisRuleChangeNotifier(null, "channel")) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("redisUri"); + } + + @Test + void shouldRequireChannel() { + assertThatThrownBy(() -> new RedisRuleChangeNotifier("redis://localhost:6379", null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("channel"); + } + + @Test + void shouldRequireTimeout() { + assertThatThrownBy( + () -> new RedisRuleChangeNotifier("redis://localhost:6379", "channel", null, "source")) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("timeout"); + } + + @Test + void shouldRequireSource() { + assertThatThrownBy( + () -> + new RedisRuleChangeNotifier( + "redis://localhost:6379", "channel", Duration.ofSeconds(5), null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("source"); + } + + @Test + void shouldDetectClusterMode() { + // Cluster mode is detected by comma-separated URIs + RedisRuleChangeNotifier notifier = + new RedisRuleChangeNotifier( + "redis://node1:6379,redis://node2:6379", "channel", Duration.ofSeconds(5), "source"); + + // Should not throw - cluster mode detected + assertThat(notifier).isNotNull(); + notifier.close(); + } + + @Test + void shouldThrowWhenClosedAndNotify() { + RedisRuleChangeNotifier notifier = + new RedisRuleChangeNotifier("redis://localhost:6379", "channel"); + notifier.close(); + + assertThatThrownBy(() -> notifier.notifyChange("rule-id")) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("closed"); + } + + @Test + void shouldThrowWhenClosedAndNotifyFullReload() { + RedisRuleChangeNotifier notifier = + new RedisRuleChangeNotifier("redis://localhost:6379", "channel"); + notifier.close(); + + assertThatThrownBy(() -> notifier.notifyFullReload()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("closed"); + } + + @Test + void shouldRequireRuleSetIdForNotifyChange() { + RedisRuleChangeNotifier notifier = + new RedisRuleChangeNotifier("redis://localhost:6379", "channel"); + + try { + assertThatThrownBy(() -> notifier.notifyChange(null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("ruleSetId"); + } finally { + notifier.close(); + } + } + + @Test + void shouldAllowMultipleClose() { + RedisRuleChangeNotifier notifier = + new RedisRuleChangeNotifier("redis://localhost:6379", "channel"); + + // Should not throw + notifier.close(); + notifier.close(); + notifier.close(); + } +} diff --git a/fluxgate-control-support/src/test/java/org/fluxgate/control/notify/RuleChangeMessageTest.java b/fluxgate-control-support/src/test/java/org/fluxgate/control/notify/RuleChangeMessageTest.java new file mode 100644 index 0000000..4808084 --- /dev/null +++ b/fluxgate-control-support/src/test/java/org/fluxgate/control/notify/RuleChangeMessageTest.java @@ -0,0 +1,82 @@ +package org.fluxgate.control.notify; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; + +class RuleChangeMessageTest { + + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Test + void shouldCreateMessageForRuleSet() { + RuleChangeMessage message = RuleChangeMessage.forRuleSet("test-rule", "test-source"); + + assertThat(message.getRuleSetId()).isEqualTo("test-rule"); + assertThat(message.isFullReload()).isFalse(); + assertThat(message.getSource()).isEqualTo("test-source"); + assertThat(message.getTimestamp()).isPositive(); + } + + @Test + void shouldCreateFullReloadMessage() { + RuleChangeMessage message = RuleChangeMessage.fullReload("test-source"); + + assertThat(message.getRuleSetId()).isNull(); + assertThat(message.isFullReload()).isTrue(); + assertThat(message.getSource()).isEqualTo("test-source"); + } + + @Test + void shouldThrowOnNullRuleSetId() { + assertThatThrownBy(() -> RuleChangeMessage.forRuleSet(null, "source")) + .isInstanceOf(NullPointerException.class); + } + + @Test + void shouldSerializeAndDeserialize() throws Exception { + RuleChangeMessage original = RuleChangeMessage.forRuleSet("my-rule", "studio"); + + String json = objectMapper.writeValueAsString(original); + RuleChangeMessage deserialized = objectMapper.readValue(json, RuleChangeMessage.class); + + assertThat(deserialized.getRuleSetId()).isEqualTo(original.getRuleSetId()); + assertThat(deserialized.isFullReload()).isEqualTo(original.isFullReload()); + assertThat(deserialized.getSource()).isEqualTo(original.getSource()); + assertThat(deserialized.getTimestamp()).isEqualTo(original.getTimestamp()); + } + + @Test + void shouldSerializeFullReloadMessage() throws Exception { + RuleChangeMessage original = RuleChangeMessage.fullReload("admin"); + + String json = objectMapper.writeValueAsString(original); + RuleChangeMessage deserialized = objectMapper.readValue(json, RuleChangeMessage.class); + + assertThat(deserialized.getRuleSetId()).isNull(); + assertThat(deserialized.isFullReload()).isTrue(); + assertThat(deserialized.getSource()).isEqualTo("admin"); + } + + @Test + void shouldHaveToStringForRuleSet() { + RuleChangeMessage message = RuleChangeMessage.forRuleSet("test-rule", "source"); + + String str = message.toString(); + + assertThat(str).contains("test-rule"); + assertThat(str).contains("source"); + } + + @Test + void shouldHaveToStringForFullReload() { + RuleChangeMessage message = RuleChangeMessage.fullReload("source"); + + String str = message.toString(); + + assertThat(str).contains("fullReload=true"); + assertThat(str).contains("source"); + } +} diff --git a/fluxgate-core/src/main/java/org/fluxgate/core/reload/BucketResetHandler.java b/fluxgate-core/src/main/java/org/fluxgate/core/reload/BucketResetHandler.java new file mode 100644 index 0000000..859e5cd --- /dev/null +++ b/fluxgate-core/src/main/java/org/fluxgate/core/reload/BucketResetHandler.java @@ -0,0 +1,47 @@ +package org.fluxgate.core.reload; + +/** + * Handler for resetting rate limit buckets when rules are changed. + * + *
When a rule is modified in the Admin UI, the cached rule definitions are invalidated, but the + * token bucket state in Redis (or other storage) remains. This handler is responsible for resetting + * the bucket state so that the new rules take effect immediately. + * + *
Implementations should delete or reset the token buckets associated with the changed rule set. + * + *
Example implementation for Redis: + * + *
{@code
+ * public class RedisBucketResetHandler implements BucketResetHandler {
+ * private final RedisTokenBucketStore store;
+ *
+ * @Override
+ * public void resetBuckets(String ruleSetId) {
+ * store.deleteBucketsByRuleSetId(ruleSetId);
+ * }
+ *
+ * @Override
+ * public void resetAllBuckets() {
+ * store.deleteAllBuckets();
+ * }
+ * }
+ * }
+ */
+public interface BucketResetHandler {
+
+ /**
+ * Resets all buckets associated with the given rule set.
+ *
+ * Called when a specific rule set is modified or deleted. + * + * @param ruleSetId the rule set ID whose buckets should be reset + */ + void resetBuckets(String ruleSetId); + + /** + * Resets all buckets (full reset). + * + *
Called when a full reload is triggered. + */ + void resetAllBuckets(); +} diff --git a/fluxgate-core/src/main/java/org/fluxgate/core/reload/CachingRuleSetProvider.java b/fluxgate-core/src/main/java/org/fluxgate/core/reload/CachingRuleSetProvider.java new file mode 100644 index 0000000..8382bd8 --- /dev/null +++ b/fluxgate-core/src/main/java/org/fluxgate/core/reload/CachingRuleSetProvider.java @@ -0,0 +1,102 @@ +package org.fluxgate.core.reload; + +import java.util.Objects; +import java.util.Optional; +import org.fluxgate.core.ratelimiter.RateLimitRuleSet; +import org.fluxgate.core.spi.RateLimitRuleSetProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A decorator that adds caching capabilities to any {@link RateLimitRuleSetProvider}. + * + *
This provider wraps a delegate provider and caches rule sets locally. It also implements + * {@link RuleReloadListener} to invalidate cache entries when reload events are received. + * + *
Example usage: + * + *
{@code
+ * RateLimitRuleSetProvider mongoProvider = new MongoRuleSetProvider(...);
+ * RuleCache cache = new CaffeineRuleCache(...);
+ * RuleReloadStrategy reloadStrategy = new PollingReloadStrategy(...);
+ *
+ * CachingRuleSetProvider cachingProvider =
+ * new CachingRuleSetProvider(mongoProvider, cache);
+ *
+ * // Register as reload listener
+ * reloadStrategy.addListener(cachingProvider);
+ * }
+ */
+public class CachingRuleSetProvider implements RateLimitRuleSetProvider, RuleReloadListener {
+
+ private static final Logger log = LoggerFactory.getLogger(CachingRuleSetProvider.class);
+
+ private final RateLimitRuleSetProvider delegate;
+ private final RuleCache cache;
+
+ /**
+ * Creates a new caching provider.
+ *
+ * @param delegate the underlying provider to delegate cache misses to
+ * @param cache the cache to store rule sets
+ */
+ public CachingRuleSetProvider(RateLimitRuleSetProvider delegate, RuleCache cache) {
+ this.delegate = Objects.requireNonNull(delegate, "delegate must not be null");
+ this.cache = Objects.requireNonNull(cache, "cache must not be null");
+ }
+
+ @Override
+ public OptionalImplementations should be thread-safe and support concurrent access.
+ */
+public interface RuleCache {
+
+ /**
+ * Retrieves a cached rule set by ID.
+ *
+ * @param ruleSetId the ID of the rule set to retrieve
+ * @return an Optional containing the rule set if cached, or empty if not found
+ */
+ Optional This should be used sparingly as it may impact performance during cache repopulation.
+ */
+ void invalidateAll();
+
+ /**
+ * Returns the IDs of all currently cached rule sets.
+ *
+ * This is useful for polling strategies that need to check for changes in known rule sets.
+ *
+ * @return an unmodifiable set of cached rule set IDs
+ */
+ Set This event is published when rules need to be reloaded, either for a specific rule set or for
+ * all cached rules.
+ */
+public final class RuleReloadEvent {
+
+ private final String ruleSetId;
+ private final ReloadSource source;
+ private final Instant timestamp;
+ private final Map Implementations can react to rule changes, such as invalidating caches or updating
+ * configurations.
+ */
+@FunctionalInterface
+public interface RuleReloadListener {
+
+ /**
+ * Called when a rule reload event is received.
+ *
+ * @param event the reload event containing details about what should be reloaded
+ */
+ void onReload(RuleReloadEvent event);
+}
diff --git a/fluxgate-core/src/main/java/org/fluxgate/core/reload/RuleReloadStrategy.java b/fluxgate-core/src/main/java/org/fluxgate/core/reload/RuleReloadStrategy.java
new file mode 100644
index 0000000..3589e85
--- /dev/null
+++ b/fluxgate-core/src/main/java/org/fluxgate/core/reload/RuleReloadStrategy.java
@@ -0,0 +1,79 @@
+package org.fluxgate.core.reload;
+
+/**
+ * Strategy interface for rule hot reload.
+ *
+ * Implementations define how rules are reloaded (e.g., via polling, Redis Pub/Sub, etc.) and
+ * manage the lifecycle of reload mechanisms.
+ *
+ * Typical usage:
+ *
+ * For polling strategies, this starts the scheduled task. For Pub/Sub strategies, this
+ * establishes the subscription.
+ *
+ * This method is idempotent - calling it multiple times has no additional effect.
+ */
+ void start();
+
+ /**
+ * Stops the reload mechanism and releases resources.
+ *
+ * This method is idempotent - calling it multiple times has no additional effect.
+ */
+ void stop();
+
+ /**
+ * Returns whether the reload mechanism is currently running.
+ *
+ * @return true if started and not stopped
+ */
+ boolean isRunning();
+
+ /**
+ * Triggers a reload for a specific rule set.
+ *
+ * This method can be called to programmatically trigger a reload, for example after updating a
+ * rule via an admin API.
+ *
+ * @param ruleSetId the ID of the rule set to reload
+ */
+ void triggerReload(String ruleSetId);
+
+ /**
+ * Triggers a full reload of all cached rules.
+ *
+ * This is useful for scenarios like configuration refresh or manual cache invalidation.
+ */
+ void triggerReloadAll();
+
+ /**
+ * Adds a listener that will be notified when reload events occur.
+ *
+ * @param listener the listener to add
+ */
+ void addListener(RuleReloadListener listener);
+
+ /**
+ * Removes a previously added listener.
+ *
+ * @param listener the listener to remove
+ */
+ void removeListener(RuleReloadListener listener);
+}
diff --git a/fluxgate-core/src/test/java/org/fluxgate/core/reload/CachingRuleSetProviderTest.java b/fluxgate-core/src/test/java/org/fluxgate/core/reload/CachingRuleSetProviderTest.java
new file mode 100644
index 0000000..bb26378
--- /dev/null
+++ b/fluxgate-core/src/test/java/org/fluxgate/core/reload/CachingRuleSetProviderTest.java
@@ -0,0 +1,159 @@
+package org.fluxgate.core.reload;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.fluxgate.core.config.RateLimitBand;
+import org.fluxgate.core.config.RateLimitRule;
+import org.fluxgate.core.key.RateLimitKey;
+import org.fluxgate.core.ratelimiter.RateLimitRuleSet;
+import org.fluxgate.core.spi.RateLimitRuleSetProvider;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class CachingRuleSetProviderTest {
+
+ private TestRuleSetProvider delegate;
+ private TestRuleCache cache;
+ private CachingRuleSetProvider cachingProvider;
+
+ @BeforeEach
+ void setUp() {
+ delegate = new TestRuleSetProvider();
+ cache = new TestRuleCache();
+ cachingProvider = new CachingRuleSetProvider(delegate, cache);
+ }
+
+ @Test
+ void shouldReturnCachedRuleSetOnHit() {
+ RateLimitRuleSet ruleSet = createTestRuleSet("test-rule");
+ cache.put("test-rule", ruleSet);
+
+ Optional This is used when rules are changed to reset rate limit state. The pattern matches keys
+ * like: {@code fluxgate:{ruleSetId}:*}
+ *
+ * Warning: Uses KEYS command which can be slow on large databases. Consider using SCAN in
+ * high-traffic production environments.
+ *
+ * @param ruleSetId the rule set ID to match
+ * @return the number of buckets deleted
+ */
+ public long deleteBucketsByRuleSetId(String ruleSetId) {
+ Objects.requireNonNull(ruleSetId, "ruleSetId must not be null");
+
+ String pattern = "fluxgate:" + ruleSetId + ":*";
+ log.debug("Deleting token buckets matching pattern: {}", pattern);
+
+ java.util.List This is used when a full reload is triggered to reset all rate limit state. The pattern
+ * matches all FluxGate keys: {@code fluxgate:*}
+ *
+ * Warning: Uses KEYS command which can be slow on large databases.
+ *
+ * @return the number of buckets deleted
+ */
+ public long deleteAllBuckets() {
+ String pattern = "fluxgate:*";
+ log.debug("Deleting all token buckets matching pattern: {}", pattern);
+
+ java.util.List This handler: 1. Looks up rules from MongoDB via RateLimitRuleSetProvider 2. Applies rate
* limiting via Redis using RedisRateLimiter 3. Returns rate limit response to the filter
+ *
+ * The behavior when no rule is found can be configured via:
+ *
+ * This configuration provides:
+ *
+ * Note: For publishing rule changes from Admin/Control Plane applications, use the {@code
+ * fluxgate-control-support} module instead.
+ *
+ * Strategy selection:
+ *
+ * Configuration example:
+ *
+ * Only created when:
+ *
+ * Strategy selection:
+ *
+ * This handler automatically resets token buckets when rules change, ensuring that the new
+ * rate limits take effect immediately.
+ *
+ * Only created when Redis token bucket store is available.
+ */
+ @Bean
+ @ConditionalOnMissingBean(BucketResetHandler.class)
+ @ConditionalOnBean(RedisTokenBucketStore.class)
+ public BucketResetHandler bucketResetHandler(RedisTokenBucketStore tokenBucketStore) {
+ log.info("Creating RedisBucketResetHandler for automatic bucket reset on rule changes");
+ return new RedisBucketResetHandler(tokenBucketStore);
+ }
+
+ /**
+ * Creates the caching rule set provider that wraps the delegate provider.
+ *
+ * This is marked as @Primary so it takes precedence over the delegate provider when autowiring
+ * RateLimitRuleSetProvider.
+ */
+ @Bean(name = "cachingRuleSetProvider")
+ @Primary
+ @ConditionalOnBean({RuleCache.class, RuleReloadStrategy.class})
+ public CachingRuleSetProvider cachingRuleSetProvider(
+ @Qualifier("delegateRuleSetProvider") RateLimitRuleSetProvider ruleSetProvider,
+ RuleCache ruleCache,
+ RuleReloadStrategy reloadStrategy,
+ ObjectProvider In production environments with strict security requirements, consider setting this to
+ * DENY to ensure all requests are rate-limited.
+ */
+ private MissingRuleBehavior missingRuleBehavior = MissingRuleBehavior.ALLOW;
+
/**
* Filter order (lower = higher priority). Default is high priority to run before other filters.
*/
@@ -398,6 +423,31 @@ public boolean isIncludeHeaders() {
public void setIncludeHeaders(boolean includeHeaders) {
this.includeHeaders = includeHeaders;
}
+
+ public MissingRuleBehavior getMissingRuleBehavior() {
+ return missingRuleBehavior;
+ }
+
+ public void setMissingRuleBehavior(MissingRuleBehavior missingRuleBehavior) {
+ this.missingRuleBehavior = missingRuleBehavior;
+ }
+
+ /**
+ * Check if requests should be denied when no rule is found.
+ *
+ * @return true if missing rules should result in denial
+ */
+ public boolean isDenyWhenRuleMissing() {
+ return missingRuleBehavior == MissingRuleBehavior.DENY;
+ }
+ }
+
+ /** Behavior when no matching rate limit rule is found. */
+ public enum MissingRuleBehavior {
+ /** Allow the request to proceed (permissive mode). */
+ ALLOW,
+ /** Deny the request (strict mode, fail-closed). */
+ DENY
}
/** Metrics configuration for Prometheus/Micrometer integration. */
@@ -458,4 +508,210 @@ public void setEnabled(boolean enabled) {
}
}
}
+
+ /**
+ * Rule reload configuration for hot reload support.
+ *
+ * Supports multiple strategies:
+ *
+ * Provides high-performance, thread-safe local caching of rate limit rule sets with configurable
+ * TTL and maximum size.
+ *
+ * Example usage:
+ *
+ * This is typically called automatically by Caffeine, but can be invoked manually if needed.
+ */
+ public void cleanUp() {
+ cache.cleanUp();
+ }
+}
diff --git a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/handler/RedisBucketResetHandler.java b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/handler/RedisBucketResetHandler.java
new file mode 100644
index 0000000..b67d418
--- /dev/null
+++ b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/handler/RedisBucketResetHandler.java
@@ -0,0 +1,62 @@
+package org.fluxgate.spring.reload.handler;
+
+import java.util.Objects;
+import org.fluxgate.core.reload.BucketResetHandler;
+import org.fluxgate.core.reload.RuleReloadEvent;
+import org.fluxgate.core.reload.RuleReloadListener;
+import org.fluxgate.redis.store.RedisTokenBucketStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Redis implementation of {@link BucketResetHandler}.
+ *
+ * This handler deletes token buckets from Redis when rules are changed, ensuring that the new
+ * rules take effect immediately.
+ *
+ * It also implements {@link RuleReloadListener} to automatically reset buckets when reload
+ * events are received via Pub/Sub or polling.
+ */
+public class RedisBucketResetHandler implements BucketResetHandler, RuleReloadListener {
+
+ private static final Logger log = LoggerFactory.getLogger(RedisBucketResetHandler.class);
+
+ private final RedisTokenBucketStore tokenBucketStore;
+
+ /**
+ * Creates a new RedisBucketResetHandler.
+ *
+ * @param tokenBucketStore the Redis token bucket store
+ */
+ public RedisBucketResetHandler(RedisTokenBucketStore tokenBucketStore) {
+ this.tokenBucketStore =
+ Objects.requireNonNull(tokenBucketStore, "tokenBucketStore must not be null");
+ }
+
+ @Override
+ public void resetBuckets(String ruleSetId) {
+ Objects.requireNonNull(ruleSetId, "ruleSetId must not be null");
+ log.info("Resetting token buckets for ruleSetId: {}", ruleSetId);
+ long deleted = tokenBucketStore.deleteBucketsByRuleSetId(ruleSetId);
+ log.info("Reset complete: {} buckets deleted for ruleSetId: {}", deleted, ruleSetId);
+ }
+
+ @Override
+ public void resetAllBuckets() {
+ log.info("Resetting all token buckets (full reset)");
+ long deleted = tokenBucketStore.deleteAllBuckets();
+ log.info("Full reset complete: {} buckets deleted", deleted);
+ }
+
+ @Override
+ public void onReload(RuleReloadEvent event) {
+ if (event.isFullReload()) {
+ log.info("Full reload event received, resetting all buckets");
+ resetAllBuckets();
+ } else {
+ String ruleSetId = event.getRuleSetId();
+ log.info("Reload event received for ruleSetId: {}, resetting buckets", ruleSetId);
+ resetBuckets(ruleSetId);
+ }
+ }
+}
diff --git a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/AbstractReloadStrategy.java b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/AbstractReloadStrategy.java
new file mode 100644
index 0000000..e0432ac
--- /dev/null
+++ b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/AbstractReloadStrategy.java
@@ -0,0 +1,117 @@
+package org.fluxgate.spring.reload.strategy;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.fluxgate.core.reload.ReloadSource;
+import org.fluxgate.core.reload.RuleReloadEvent;
+import org.fluxgate.core.reload.RuleReloadListener;
+import org.fluxgate.core.reload.RuleReloadStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base class for reload strategies.
+ *
+ * Provides common functionality for managing listeners and lifecycle state.
+ */
+public abstract class AbstractReloadStrategy implements RuleReloadStrategy {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final List Used when hot reload is disabled (strategy = NONE). Rules are always fetched fresh from the
+ * provider without caching.
+ */
+public class NoOpReloadStrategy extends AbstractReloadStrategy {
+
+ @Override
+ protected ReloadSource getReloadSource() {
+ return ReloadSource.MANUAL;
+ }
+
+ @Override
+ protected void doStart() {
+ log.info("NoOp reload strategy active - hot reload disabled");
+ }
+
+ @Override
+ protected void doStop() {
+ // Nothing to clean up
+ }
+
+ @Override
+ public void triggerReload(String ruleSetId) {
+ log.debug("NoOp reload triggered for ruleSetId: {} (ignored)", ruleSetId);
+ }
+
+ @Override
+ public void triggerReloadAll() {
+ log.debug("NoOp full reload triggered (ignored)");
+ }
+}
diff --git a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/PollingReloadStrategy.java b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/PollingReloadStrategy.java
new file mode 100644
index 0000000..6e2c160
--- /dev/null
+++ b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/PollingReloadStrategy.java
@@ -0,0 +1,234 @@
+package org.fluxgate.spring.reload.strategy;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.fluxgate.core.ratelimiter.RateLimitRuleSet;
+import org.fluxgate.core.reload.ReloadSource;
+import org.fluxgate.core.reload.RuleCache;
+import org.fluxgate.core.reload.RuleReloadEvent;
+import org.fluxgate.core.spi.RateLimitRuleSetProvider;
+
+/**
+ * Polling-based reload strategy that periodically checks for rule changes.
+ *
+ * This strategy maintains a version map (using hashCode) of cached rule sets and compares them
+ * against the source provider at regular intervals.
+ *
+ * Configuration example:
+ *
+ * Uses the rule set's content to generate a hash that changes when the rules change.
+ *
+ * @param ruleSet the rule set
+ * @return version hash
+ */
+ private int computeVersion(RateLimitRuleSet ruleSet) {
+ // Use a combination of ID, description, and rules hash
+ int hash = ruleSet.getId().hashCode();
+ if (ruleSet.getDescription() != null) {
+ hash = 31 * hash + ruleSet.getDescription().hashCode();
+ }
+ hash = 31 * hash + ruleSet.getRules().hashCode();
+ return hash;
+ }
+
+ /**
+ * Manually triggers a version check for a specific rule set.
+ *
+ * @param ruleSetId the rule set ID to check
+ */
+ public void forceCheck(String ruleSetId) {
+ checkForChange(ruleSetId);
+ }
+
+ /**
+ * Returns the configured poll interval.
+ *
+ * @return poll interval
+ */
+ public Duration getPollInterval() {
+ return pollInterval;
+ }
+
+ /**
+ * Returns the configured initial delay.
+ *
+ * @return initial delay
+ */
+ public Duration getInitialDelay() {
+ return initialDelay;
+ }
+
+ /**
+ * Returns the current version map for debugging.
+ *
+ * @return unmodifiable copy of version map
+ */
+ public Map This strategy subscribes to a Redis channel and listens for rule change messages. When a
+ * message is received, it triggers a reload event to invalidate cached rules.
+ *
+ * Message format:
+ *
+ * Configuration example:
+ *
+ * Supports two message formats:
+ *
+ * {@code
+ * RuleReloadStrategy strategy = new PollingReloadStrategy(...);
+ * strategy.addListener(event -> cache.invalidate(event.getRuleSetId()));
+ * strategy.start();
+ *
+ * // Later, to trigger a reload programmatically:
+ * strategy.triggerReload("my-rule-set");
+ *
+ * // On shutdown:
+ * strategy.stop();
+ * }
+ */
+public interface RuleReloadStrategy {
+
+ /**
+ * Starts the reload mechanism.
+ *
+ *
+ * fluxgate:
+ * ratelimit:
+ * missing-rule-behavior: ALLOW # or DENY
+ *
*/
@Component
public class StandaloneRateLimitHandler implements FluxgateRateLimitHandler {
@@ -25,12 +34,17 @@ public class StandaloneRateLimitHandler implements FluxgateRateLimitHandler {
private final RateLimitRuleSetProvider ruleSetProvider;
private final RedisRateLimiter rateLimiter;
+ private final boolean denyWhenRuleMissing;
public StandaloneRateLimitHandler(
- RateLimitRuleSetProvider ruleSetProvider, RedisRateLimiter rateLimiter) {
+ RateLimitRuleSetProvider ruleSetProvider,
+ RedisRateLimiter rateLimiter,
+ FluxgateProperties properties) {
this.ruleSetProvider = ruleSetProvider;
this.rateLimiter = rateLimiter;
- log.info("StandaloneRateLimitHandler initialized");
+ this.denyWhenRuleMissing = properties.getRatelimit().isDenyWhenRuleMissing();
+ log.info(
+ "StandaloneRateLimitHandler initialized (denyWhenRuleMissing={})", denyWhenRuleMissing);
}
@Override
@@ -41,8 +55,14 @@ public RateLimitResponse tryConsume(RequestContext context, String ruleSetId) {
// Look up the ruleset from MongoDB
Optional
+ *
+ *
+ *
+ *
+ *
+ *
+ * fluxgate:
+ * reload:
+ * enabled: true
+ * strategy: AUTO
+ * cache:
+ * ttl: 5m
+ * max-size: 1000
+ * polling:
+ * interval: 30s
+ * pubsub:
+ * channel: fluxgate:rule-reload
+ *
+ */
+@AutoConfiguration(after = {FluxgateMongoAutoConfiguration.class})
+@ConditionalOnProperty(
+ prefix = "fluxgate.reload",
+ name = "enabled",
+ havingValue = "true",
+ matchIfMissing = true)
+@EnableConfigurationProperties(FluxgateProperties.class)
+public class FluxgateReloadAutoConfiguration {
+
+ private static final Logger log = LoggerFactory.getLogger(FluxgateReloadAutoConfiguration.class);
+
+ private final FluxgateProperties properties;
+
+ public FluxgateReloadAutoConfiguration(FluxgateProperties properties) {
+ this.properties = properties;
+ }
+
+ /**
+ * Creates the Caffeine-based rule cache.
+ *
+ *
+ *
+ */
+ @Bean
+ @ConditionalOnMissingBean(RuleCache.class)
+ @ConditionalOnClass(name = "com.github.benmanes.caffeine.cache.Caffeine")
+ @ConditionalOnProperty(
+ prefix = "fluxgate.reload.cache",
+ name = "enabled",
+ havingValue = "true",
+ matchIfMissing = true)
+ public RuleCache ruleCache() {
+ ReloadProperties reloadProps = properties.getReload();
+
+ if (reloadProps.getStrategy() == ReloadStrategy.NONE) {
+ log.info("Rule cache disabled (strategy=NONE)");
+ return null;
+ }
+
+ ReloadProperties.CacheProperties cacheProps = reloadProps.getCache();
+ log.info(
+ "Creating CaffeineRuleCache with ttl={}, maxSize={}",
+ cacheProps.getTtl(),
+ cacheProps.getMaxSize());
+
+ return new CaffeineRuleCache(cacheProps.getTtl(), cacheProps.getMaxSize());
+ }
+
+ /**
+ * Creates the rule reload strategy based on configuration.
+ *
+ *
+ *
+ */
+ @Bean
+ @ConditionalOnMissingBean(RuleReloadStrategy.class)
+ public RuleReloadStrategy ruleReloadStrategy(
+ @Qualifier("delegateRuleSetProvider") RateLimitRuleSetProvider ruleSetProvider,
+ ObjectProvider
+ *
+ *
+ *
+ *
+ *
+ *
+ * fluxgate:
+ * reload:
+ * enabled: true
+ * strategy: AUTO
+ * cache:
+ * enabled: true
+ * ttl: 5m
+ * max-size: 1000
+ * polling:
+ * interval: 30s
+ * pubsub:
+ * channel: fluxgate:rule-reload
+ *
+ */
+ public static class ReloadProperties {
+
+ /** Enable rule hot reload feature. */
+ private boolean enabled = true;
+
+ /**
+ * Reload strategy to use.
+ *
+ *
+ *
+ */
+ private ReloadStrategy strategy = ReloadStrategy.AUTO;
+
+ /** Cache configuration. */
+ private CacheProperties cache = new CacheProperties();
+
+ /** Polling strategy configuration. */
+ private PollingProperties polling = new PollingProperties();
+
+ /** Pub/Sub strategy configuration. */
+ private PubSubProperties pubsub = new PubSubProperties();
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public ReloadStrategy getStrategy() {
+ return strategy;
+ }
+
+ public void setStrategy(ReloadStrategy strategy) {
+ this.strategy = strategy;
+ }
+
+ public CacheProperties getCache() {
+ return cache;
+ }
+
+ public void setCache(CacheProperties cache) {
+ this.cache = cache;
+ }
+
+ public PollingProperties getPolling() {
+ return polling;
+ }
+
+ public void setPolling(PollingProperties polling) {
+ this.polling = polling;
+ }
+
+ public PubSubProperties getPubsub() {
+ return pubsub;
+ }
+
+ public void setPubsub(PubSubProperties pubsub) {
+ this.pubsub = pubsub;
+ }
+
+ /** Rule cache configuration. */
+ public static class CacheProperties {
+
+ /** Enable local caching of rules. */
+ private boolean enabled = true;
+
+ /** Time-to-live for cached rules. Rules will be refetched after this duration. */
+ private Duration ttl = Duration.ofMinutes(5);
+
+ /** Maximum number of rules to cache. */
+ private int maxSize = 1000;
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public Duration getTtl() {
+ return ttl;
+ }
+
+ public void setTtl(Duration ttl) {
+ this.ttl = ttl;
+ }
+
+ public int getMaxSize() {
+ return maxSize;
+ }
+
+ public void setMaxSize(int maxSize) {
+ this.maxSize = maxSize;
+ }
+ }
+
+ /** Polling strategy configuration. */
+ public static class PollingProperties {
+
+ /** Interval between polling checks. */
+ private Duration interval = Duration.ofSeconds(30);
+
+ /** Initial delay before first poll. */
+ private Duration initialDelay = Duration.ofSeconds(10);
+
+ public Duration getInterval() {
+ return interval;
+ }
+
+ public void setInterval(Duration interval) {
+ this.interval = interval;
+ }
+
+ public Duration getInitialDelay() {
+ return initialDelay;
+ }
+
+ public void setInitialDelay(Duration initialDelay) {
+ this.initialDelay = initialDelay;
+ }
+ }
+
+ /** Pub/Sub strategy configuration. */
+ public static class PubSubProperties {
+
+ /** Redis channel name for reload notifications. */
+ private String channel = "fluxgate:rule-reload";
+
+ /** Retry subscription on failure. */
+ private boolean retryOnFailure = true;
+
+ /** Interval between retry attempts. */
+ private Duration retryInterval = Duration.ofSeconds(5);
+
+ public String getChannel() {
+ return channel;
+ }
+
+ public void setChannel(String channel) {
+ this.channel = channel;
+ }
+
+ public boolean isRetryOnFailure() {
+ return retryOnFailure;
+ }
+
+ public void setRetryOnFailure(boolean retryOnFailure) {
+ this.retryOnFailure = retryOnFailure;
+ }
+
+ public Duration getRetryInterval() {
+ return retryInterval;
+ }
+
+ public void setRetryInterval(Duration retryInterval) {
+ this.retryInterval = retryInterval;
+ }
+ }
+ }
+
+ /** Reload strategy options. */
+ public enum ReloadStrategy {
+ /** Automatically select best strategy based on available infrastructure. */
+ AUTO,
+ /** Use Redis Pub/Sub for real-time reload notifications. */
+ PUBSUB,
+ /** Use periodic polling to check for changes. */
+ POLLING,
+ /** Disable hot reload - always fetch fresh rules from provider. */
+ NONE
+ }
}
diff --git a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/cache/CaffeineRuleCache.java b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/cache/CaffeineRuleCache.java
new file mode 100644
index 0000000..bb74959
--- /dev/null
+++ b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/cache/CaffeineRuleCache.java
@@ -0,0 +1,135 @@
+package org.fluxgate.spring.reload.cache;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import org.fluxgate.core.ratelimiter.RateLimitRuleSet;
+import org.fluxgate.core.reload.RuleCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Caffeine-based implementation of {@link RuleCache}.
+ *
+ * {@code
+ * RuleCache cache = new CaffeineRuleCache(
+ * Duration.ofMinutes(5), // TTL
+ * 1000 // max size
+ * );
+ * }
+ */
+public class CaffeineRuleCache implements RuleCache {
+
+ private static final Logger log = LoggerFactory.getLogger(CaffeineRuleCache.class);
+
+ private final Cache
+ * fluxgate:
+ * reload:
+ * strategy: POLLING
+ * polling:
+ * interval: 30s
+ * initial-delay: 10s
+ *
+ */
+public class PollingReloadStrategy extends AbstractReloadStrategy {
+
+ private final RateLimitRuleSetProvider provider;
+ private final RuleCache cache;
+ private final Duration pollInterval;
+ private final Duration initialDelay;
+
+ private ScheduledExecutorService scheduler;
+ private ScheduledFuture> pollTask;
+
+ // Track rule set versions using hash codes
+ private final Map
+ *
+ *
+ *
+ * fluxgate:
+ * reload:
+ * strategy: PUBSUB
+ * pubsub:
+ * channel: fluxgate:rule-reload
+ * retry-on-failure: true
+ * retry-interval: 5s
+ *
+ */
+public class RedisPubSubReloadStrategy extends AbstractReloadStrategy {
+
+ /** Message indicating a full reload should occur. */
+ public static final String FULL_RELOAD_MESSAGE = "*";
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private final String redisUri;
+ private final String channel;
+ private final boolean retryOnFailure;
+ private final Duration retryInterval;
+ private final boolean isCluster;
+ private final Duration timeout;
+
+ private Object redisClient; // Created lazily
+ private final AtomicReference
+ *
+ *
+ * @param message the message received
+ */
+ private void handleMessage(String message) {
+ log.debug("Received Pub/Sub message: {}", message);
+
+ RuleReloadEvent event;
+ if (message == null || message.isEmpty() || FULL_RELOAD_MESSAGE.equals(message)) {
+ event = RuleReloadEvent.fullReload(ReloadSource.PUBSUB);
+ log.info("Full reload triggered via Pub/Sub");
+ } else if (message.trim().startsWith("{")) {
+ // JSON format message
+ event = parseJsonMessage(message);
+ } else {
+ // Plain text ruleSetId
+ event = RuleReloadEvent.forRuleSet(message, ReloadSource.PUBSUB);
+ log.info("Reload triggered via Pub/Sub for ruleSetId: {}", message);
+ }
+
+ notifyListeners(event);
+ }
+
+ /**
+ * Parses a JSON format message.
+ *
+ * @param message the JSON message
+ * @return the parsed reload event
+ */
+ private RuleReloadEvent parseJsonMessage(String message) {
+ try {
+ JsonNode root = OBJECT_MAPPER.readTree(message);
+
+ boolean fullReload = root.path("fullReload").asBoolean(false);
+ if (fullReload) {
+ log.info("Full reload triggered via Pub/Sub (JSON)");
+ return RuleReloadEvent.fullReload(ReloadSource.PUBSUB);
+ }
+
+ String ruleSetId = root.path("ruleSetId").asText(null);
+ if (ruleSetId != null && !ruleSetId.isEmpty()) {
+ log.info("Reload triggered via Pub/Sub for ruleSetId: {}", ruleSetId);
+ return RuleReloadEvent.forRuleSet(ruleSetId, ReloadSource.PUBSUB);
+ }
+
+ log.warn("Invalid JSON message, triggering full reload: {}", message);
+ return RuleReloadEvent.fullReload(ReloadSource.PUBSUB);
+ } catch (JsonProcessingException e) {
+ log.warn("Failed to parse JSON message, treating as ruleSetId: {}", message, e);
+ return RuleReloadEvent.forRuleSet(message, ReloadSource.PUBSUB);
+ }
+ }
+
+ /**
+ * Returns the configured channel name.
+ *
+ * @return channel name
+ */
+ public String getChannel() {
+ return channel;
+ }
+
+ /**
+ * Returns whether retry on failure is enabled.
+ *
+ * @return true if retry is enabled
+ */
+ public boolean isRetryOnFailure() {
+ return retryOnFailure;
+ }
+
+ /**
+ * Returns the retry interval.
+ *
+ * @return retry interval
+ */
+ public Duration getRetryInterval() {
+ return retryInterval;
+ }
+
+ /**
+ * Checks if currently connected to Redis Pub/Sub.
+ *
+ * @return true if connected
+ */
+ public boolean isConnected() {
+ StatefulRedisPubSubConnection