From eb09e9b5f98c3ec1f60bf101d96d705d6f1c5423 Mon Sep 17 00:00:00 2001 From: rojae Date: Sun, 14 Dec 2025 20:13:31 +0900 Subject: [PATCH 1/5] feat: implement hot reload for rate limit rules with Redis Pub/Sub Add real-time rule change propagation from Admin API to App Servers: - fluxgate-control-support module with AOP annotations - Redis Pub/Sub and Polling reload strategies - Caffeine-based local rule caching - Automatic token bucket reset on rule changes --- fluxgate-control-support/pom.xml | 135 +++++++ .../control/aop/NotifyFullReload.java | 51 +++ .../control/aop/NotifyRuleChange.java | 75 ++++ .../control/aop/RuleChangeAspect.java | 145 +++++++ .../ControlSupportAutoConfiguration.java | 107 +++++ .../ControlSupportProperties.java | 77 ++++ .../notify/RedisRuleChangeNotifier.java | 212 ++++++++++ .../control/notify/RuleChangeMessage.java | 92 +++++ .../RuleChangeNotificationException.java | 13 + .../control/notify/RuleChangeNotifier.java | 57 +++ ...ot.autoconfigure.AutoConfiguration.imports | 1 + .../notify/RedisRuleChangeNotifierTest.java | 101 +++++ .../control/notify/RuleChangeMessageTest.java | 82 ++++ .../core/reload/BucketResetHandler.java | 47 +++ .../core/reload/CachingRuleSetProvider.java | 102 +++++ .../fluxgate/core/reload/ReloadSource.java | 23 ++ .../org/fluxgate/core/reload/RuleCache.java | 79 ++++ .../fluxgate/core/reload/RuleReloadEvent.java | 146 +++++++ .../core/reload/RuleReloadListener.java | 18 + .../core/reload/RuleReloadStrategy.java | 79 ++++ .../reload/CachingRuleSetProviderTest.java | 159 ++++++++ .../core/reload/RuleReloadEventTest.java | 76 ++++ .../redis/store/RedisTokenBucketStore.java | 54 +++ .../fluxgate-sample-standalone/pom.xml | 6 + .../src/main/resources/application.yml | 8 + fluxgate-spring-boot-starter/pom.xml | 15 + .../FluxgateMongoAutoConfiguration.java | 4 +- .../FluxgateReloadAutoConfiguration.java | 301 ++++++++++++++ .../spring/properties/FluxgateProperties.java | 218 +++++++++++ .../reload/cache/CaffeineRuleCache.java | 135 +++++++ .../handler/RedisBucketResetHandler.java | 62 +++ .../strategy/AbstractReloadStrategy.java | 117 ++++++ .../reload/strategy/NoOpReloadStrategy.java | 37 ++ .../strategy/PollingReloadStrategy.java | 234 +++++++++++ .../strategy/RedisPubSubReloadStrategy.java | 367 ++++++++++++++++++ ...ot.autoconfigure.AutoConfiguration.imports | 1 + pom.xml | 1 + 37 files changed, 3435 insertions(+), 2 deletions(-) create mode 100644 fluxgate-control-support/pom.xml create mode 100644 fluxgate-control-support/src/main/java/org/fluxgate/control/aop/NotifyFullReload.java create mode 100644 fluxgate-control-support/src/main/java/org/fluxgate/control/aop/NotifyRuleChange.java create mode 100644 fluxgate-control-support/src/main/java/org/fluxgate/control/aop/RuleChangeAspect.java create mode 100644 fluxgate-control-support/src/main/java/org/fluxgate/control/autoconfigure/ControlSupportAutoConfiguration.java create mode 100644 fluxgate-control-support/src/main/java/org/fluxgate/control/autoconfigure/ControlSupportProperties.java create mode 100644 fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RedisRuleChangeNotifier.java create mode 100644 fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeMessage.java create mode 100644 fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeNotificationException.java create mode 100644 fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeNotifier.java create mode 100644 fluxgate-control-support/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports create mode 100644 fluxgate-control-support/src/test/java/org/fluxgate/control/notify/RedisRuleChangeNotifierTest.java create mode 100644 fluxgate-control-support/src/test/java/org/fluxgate/control/notify/RuleChangeMessageTest.java create mode 100644 fluxgate-core/src/main/java/org/fluxgate/core/reload/BucketResetHandler.java create mode 100644 fluxgate-core/src/main/java/org/fluxgate/core/reload/CachingRuleSetProvider.java create mode 100644 fluxgate-core/src/main/java/org/fluxgate/core/reload/ReloadSource.java create mode 100644 fluxgate-core/src/main/java/org/fluxgate/core/reload/RuleCache.java create mode 100644 fluxgate-core/src/main/java/org/fluxgate/core/reload/RuleReloadEvent.java create mode 100644 fluxgate-core/src/main/java/org/fluxgate/core/reload/RuleReloadListener.java create mode 100644 fluxgate-core/src/main/java/org/fluxgate/core/reload/RuleReloadStrategy.java create mode 100644 fluxgate-core/src/test/java/org/fluxgate/core/reload/CachingRuleSetProviderTest.java create mode 100644 fluxgate-core/src/test/java/org/fluxgate/core/reload/RuleReloadEventTest.java create mode 100644 fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/autoconfigure/FluxgateReloadAutoConfiguration.java create mode 100644 fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/cache/CaffeineRuleCache.java create mode 100644 fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/handler/RedisBucketResetHandler.java create mode 100644 fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/AbstractReloadStrategy.java create mode 100644 fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/NoOpReloadStrategy.java create mode 100644 fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/PollingReloadStrategy.java create mode 100644 fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/RedisPubSubReloadStrategy.java diff --git a/fluxgate-control-support/pom.xml b/fluxgate-control-support/pom.xml new file mode 100644 index 0000000..f1e1f15 --- /dev/null +++ b/fluxgate-control-support/pom.xml @@ -0,0 +1,135 @@ + + + + 4.0.0 + + + io.github.openfluxgate + fluxgate + 0.1.4 + + + fluxgate-control-support + jar + + FluxGate Control Support + Support library for FluxGate Control Plane (Admin/Studio) - provides rule change notification capabilities + + + + + io.lettuce + lettuce-core + 6.3.2.RELEASE + + + + + com.fasterxml.jackson.core + jackson-databind + 2.17.2 + + + + + org.slf4j + slf4j-api + + + + + org.springframework.boot + spring-boot-autoconfigure + 3.3.5 + true + + + + + org.springframework.boot + spring-boot-configuration-processor + 3.3.5 + true + + + + + org.springframework + spring-aop + 6.1.14 + true + + + org.aspectj + aspectjweaver + 1.9.22 + true + + + + + org.junit.jupiter + junit-jupiter + test + + + org.assertj + assertj-core + test + + + ch.qos.logback + logback-classic + 1.5.6 + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + org.apache.maven.plugins + maven-jar-plugin + + + + org.apache.maven.plugins + maven-source-plugin + + + + org.apache.maven.plugins + maven-javadoc-plugin + + + + org.apache.maven.plugins + maven-surefire-plugin + + + + org.jacoco + jacoco-maven-plugin + + + + **/autoconfigure/** + + **/RedisRuleChangeNotifier.class + + **/aop/RuleChangeAspect.class + + + + + + + diff --git a/fluxgate-control-support/src/main/java/org/fluxgate/control/aop/NotifyFullReload.java b/fluxgate-control-support/src/main/java/org/fluxgate/control/aop/NotifyFullReload.java new file mode 100644 index 0000000..fb45cae --- /dev/null +++ b/fluxgate-control-support/src/main/java/org/fluxgate/control/aop/NotifyFullReload.java @@ -0,0 +1,51 @@ +package org.fluxgate.control.aop; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation to automatically notify FluxGate instances to perform a full rule reload. + * + *

When applied to a method, the {@link RuleChangeAspect} will automatically call {@link + * org.fluxgate.control.notify.RuleChangeNotifier#notifyFullReload()} after the method completes + * successfully. + * + *

Use this annotation for operations that affect multiple rules or when the specific rule set ID + * is not available. + * + *

Example usage: + * + *

{@code
+ * @Service
+ * public class RuleManagementService {
+ *
+ *     @NotifyFullReload
+ *     public void deleteAllRules() {
+ *         mongoRepository.deleteAll();
+ *         // Full reload notification is sent automatically
+ *     }
+ *
+ *     @NotifyFullReload
+ *     public void importRules(List rules) {
+ *         mongoRepository.deleteAll();
+ *         mongoRepository.saveAll(rules);
+ *     }
+ *
+ *     @NotifyFullReload
+ *     public void resetToDefaults() {
+ *         mongoRepository.deleteAll();
+ *         mongoRepository.saveAll(defaultRules);
+ *     }
+ * }
+ * }
+ * + * @see NotifyRuleChange + * @see RuleChangeAspect + */ +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +@Documented +public @interface NotifyFullReload {} diff --git a/fluxgate-control-support/src/main/java/org/fluxgate/control/aop/NotifyRuleChange.java b/fluxgate-control-support/src/main/java/org/fluxgate/control/aop/NotifyRuleChange.java new file mode 100644 index 0000000..92d38d8 --- /dev/null +++ b/fluxgate-control-support/src/main/java/org/fluxgate/control/aop/NotifyRuleChange.java @@ -0,0 +1,75 @@ +package org.fluxgate.control.aop; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation to automatically notify FluxGate instances when a rule is changed. + * + *

When applied to a method, the {@link RuleChangeAspect} will automatically call {@link + * org.fluxgate.control.notify.RuleChangeNotifier#notifyChange(String)} after the method completes + * successfully. + * + *

The {@code ruleSetId} attribute supports Spring Expression Language (SpEL) to extract the rule + * set ID from method parameters. + * + *

Example usage: + * + *

{@code
+ * @Service
+ * public class RuleManagementService {
+ *
+ *     @NotifyRuleChange(ruleSetId = "#ruleSetId")
+ *     public void updateRule(String ruleSetId, RuleDto dto) {
+ *         mongoRepository.save(dto);
+ *         // Notification is sent automatically after this method returns
+ *     }
+ *
+ *     @NotifyRuleChange(ruleSetId = "#dto.ruleSetId")
+ *     public void saveRule(RuleDto dto) {
+ *         mongoRepository.save(dto);
+ *     }
+ *
+ *     @NotifyRuleChange(ruleSetId = "#result.id")
+ *     public Rule createRule(RuleDto dto) {
+ *         return mongoRepository.save(dto);
+ *         // Uses the returned object's id
+ *     }
+ * }
+ * }
+ * + *

SpEL expressions can reference: + * + *

+ * + * @see NotifyFullReload + * @see RuleChangeAspect + */ +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +@Documented +public @interface NotifyRuleChange { + + /** + * SpEL expression to extract the rule set ID. + * + *

Examples: + * + *

+ * + * @return SpEL expression for rule set ID + */ + String ruleSetId(); +} diff --git a/fluxgate-control-support/src/main/java/org/fluxgate/control/aop/RuleChangeAspect.java b/fluxgate-control-support/src/main/java/org/fluxgate/control/aop/RuleChangeAspect.java new file mode 100644 index 0000000..52605db --- /dev/null +++ b/fluxgate-control-support/src/main/java/org/fluxgate/control/aop/RuleChangeAspect.java @@ -0,0 +1,145 @@ +package org.fluxgate.control.aop; + +import java.lang.reflect.Method; +import org.aspectj.lang.JoinPoint; +import org.aspectj.lang.annotation.AfterReturning; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.reflect.MethodSignature; +import org.fluxgate.control.notify.RuleChangeNotifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.expression.MethodBasedEvaluationContext; +import org.springframework.core.DefaultParameterNameDiscoverer; +import org.springframework.core.ParameterNameDiscoverer; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.ExpressionParser; +import org.springframework.expression.spel.standard.SpelExpressionParser; + +/** + * Aspect that handles {@link NotifyRuleChange} and {@link NotifyFullReload} annotations. + * + *

This aspect intercepts methods annotated with rule change annotations and automatically + * notifies FluxGate instances after successful method execution. + * + *

For {@link NotifyRuleChange}, it evaluates the SpEL expression to extract the rule set ID from + * method parameters or return value. + * + *

Example: + * + *

{@code
+ * @NotifyRuleChange(ruleSetId = "#ruleSetId")
+ * public void updateRule(String ruleSetId, RuleDto dto) {
+ *     // After this method returns successfully,
+ *     // notifier.notifyChange(ruleSetId) is called automatically
+ * }
+ * }
+ */ +@Aspect +public class RuleChangeAspect { + + private static final Logger log = LoggerFactory.getLogger(RuleChangeAspect.class); + + private final RuleChangeNotifier notifier; + private final ExpressionParser parser = new SpelExpressionParser(); + private final ParameterNameDiscoverer parameterNameDiscoverer = + new DefaultParameterNameDiscoverer(); + + public RuleChangeAspect(RuleChangeNotifier notifier) { + this.notifier = notifier; + log.info("RuleChangeAspect initialized"); + } + + /** + * Handles methods annotated with {@link NotifyRuleChange}. + * + *

After the method returns successfully, extracts the rule set ID using the SpEL expression + * and notifies all FluxGate instances. + * + * @param joinPoint the join point + * @param annotation the annotation + * @param result the return value of the method + */ + @AfterReturning( + pointcut = "@annotation(annotation)", + returning = "result", + argNames = "joinPoint,annotation,result") + public void afterRuleChange(JoinPoint joinPoint, NotifyRuleChange annotation, Object result) { + try { + String ruleSetId = extractRuleSetId(joinPoint, annotation.ruleSetId(), result); + + if (ruleSetId == null || ruleSetId.isBlank()) { + log.warn( + "Could not extract ruleSetId from expression '{}' in method {}", + annotation.ruleSetId(), + joinPoint.getSignature().getName()); + return; + } + + log.debug( + "Notifying rule change for ruleSetId={} from method {}", + ruleSetId, + joinPoint.getSignature().getName()); + + notifier.notifyChange(ruleSetId); + + } catch (Exception e) { + log.error( + "Failed to notify rule change for method {}: {}", + joinPoint.getSignature().getName(), + e.getMessage(), + e); + // Don't rethrow - notification failure should not fail the business operation + } + } + + /** + * Handles methods annotated with {@link NotifyFullReload}. + * + *

After the method returns successfully, notifies all FluxGate instances to perform a full + * reload. + * + * @param joinPoint the join point + * @param annotation the annotation + */ + @AfterReturning(pointcut = "@annotation(annotation)", argNames = "joinPoint,annotation") + public void afterFullReload(JoinPoint joinPoint, NotifyFullReload annotation) { + try { + log.debug("Notifying full reload from method {}", joinPoint.getSignature().getName()); + + notifier.notifyFullReload(); + + } catch (Exception e) { + log.error( + "Failed to notify full reload for method {}: {}", + joinPoint.getSignature().getName(), + e.getMessage(), + e); + // Don't rethrow - notification failure should not fail the business operation + } + } + + /** + * Extracts the rule set ID from the SpEL expression. + * + * @param joinPoint the join point + * @param expression the SpEL expression + * @param result the return value of the method + * @return the extracted rule set ID, or null if extraction fails + */ + private String extractRuleSetId(JoinPoint joinPoint, String expression, Object result) { + MethodSignature signature = (MethodSignature) joinPoint.getSignature(); + Method method = signature.getMethod(); + Object target = joinPoint.getTarget(); + Object[] args = joinPoint.getArgs(); + + EvaluationContext context = + new MethodBasedEvaluationContext(target, method, args, parameterNameDiscoverer); + + // Add result to context for expressions like #result.id + context.setVariable("result", result); + + Object value = parser.parseExpression(expression).getValue(context); + + return value != null ? value.toString() : null; + } +} diff --git a/fluxgate-control-support/src/main/java/org/fluxgate/control/autoconfigure/ControlSupportAutoConfiguration.java b/fluxgate-control-support/src/main/java/org/fluxgate/control/autoconfigure/ControlSupportAutoConfiguration.java new file mode 100644 index 0000000..43b2eb8 --- /dev/null +++ b/fluxgate-control-support/src/main/java/org/fluxgate/control/autoconfigure/ControlSupportAutoConfiguration.java @@ -0,0 +1,107 @@ +package org.fluxgate.control.autoconfigure; + +import org.fluxgate.control.aop.RuleChangeAspect; +import org.fluxgate.control.notify.RedisRuleChangeNotifier; +import org.fluxgate.control.notify.RuleChangeNotifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.EnableAspectJAutoProxy; + +/** + * Auto-configuration for FluxGate Control Support. + * + *

Automatically configures: + * + *

+ * + *

Example usage in application.yml: + * + *

+ * fluxgate:
+ *   control:
+ *     redis:
+ *       uri: redis://localhost:6379
+ *       channel: fluxgate:rule-reload
+ * 
+ * + *

Example usage with annotations: + * + *

{@code
+ * @Service
+ * public class RuleManagementService {
+ *
+ *     @NotifyRuleChange(ruleSetId = "#ruleSetId")
+ *     public void updateRule(String ruleSetId, RuleDto dto) {
+ *         mongoRepository.save(dto);
+ *     }
+ *
+ *     @NotifyFullReload
+ *     public void deleteAllRules() {
+ *         mongoRepository.deleteAll();
+ *     }
+ * }
+ * }
+ */ +@AutoConfiguration +@ConditionalOnClass(name = "io.lettuce.core.RedisClient") +@ConditionalOnProperty(prefix = "fluxgate.control.redis", name = "uri") +@EnableConfigurationProperties(ControlSupportProperties.class) +@EnableAspectJAutoProxy +public class ControlSupportAutoConfiguration { + + private static final Logger log = LoggerFactory.getLogger(ControlSupportAutoConfiguration.class); + + /** + * Creates the Redis-based rule change notifier. + * + * @param properties the configuration properties + * @return the notifier instance + */ + @Bean + @ConditionalOnMissingBean(RuleChangeNotifier.class) + public RuleChangeNotifier ruleChangeNotifier(ControlSupportProperties properties) { + ControlSupportProperties.RedisProperties redis = properties.getRedis(); + + log.info( + "Creating RedisRuleChangeNotifier: uri={}, channel={}, source={}", + redis.getUri(), + redis.getChannel(), + properties.getSource()); + + return new RedisRuleChangeNotifier( + redis.getUri(), redis.getChannel(), redis.getTimeout(), properties.getSource()); + } + + /** + * Creates the AOP aspect for rule change annotations. + * + *

Only created when: + * + *

+ * + * @param notifier the rule change notifier + * @return the aspect instance + */ + @Bean + @ConditionalOnClass(name = "org.aspectj.lang.annotation.Aspect") + @ConditionalOnBean(RuleChangeNotifier.class) + @ConditionalOnMissingBean(RuleChangeAspect.class) + public RuleChangeAspect ruleChangeAspect(RuleChangeNotifier notifier) { + log.info("Creating RuleChangeAspect for @NotifyRuleChange and @NotifyFullReload support"); + return new RuleChangeAspect(notifier); + } +} diff --git a/fluxgate-control-support/src/main/java/org/fluxgate/control/autoconfigure/ControlSupportProperties.java b/fluxgate-control-support/src/main/java/org/fluxgate/control/autoconfigure/ControlSupportProperties.java new file mode 100644 index 0000000..8aa0c83 --- /dev/null +++ b/fluxgate-control-support/src/main/java/org/fluxgate/control/autoconfigure/ControlSupportProperties.java @@ -0,0 +1,77 @@ +package org.fluxgate.control.autoconfigure; + +import java.time.Duration; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * Configuration properties for FluxGate Control Support. + * + *

Example configuration: + * + *

+ * fluxgate:
+ *   control:
+ *     redis:
+ *       uri: redis://localhost:6379
+ *       channel: fluxgate:rule-reload
+ *       timeout: 5s
+ *     source: my-admin-app
+ * 
+ */ +@ConfigurationProperties(prefix = "fluxgate.control") +public class ControlSupportProperties { + + private final RedisProperties redis = new RedisProperties(); + + /** Source identifier for notifications (appears in messages). */ + private String source = "fluxgate-control"; + + public RedisProperties getRedis() { + return redis; + } + + public String getSource() { + return source; + } + + public void setSource(String source) { + this.source = source; + } + + /** Redis configuration for rule change notifications. */ + public static class RedisProperties { + + /** Redis URI (e.g., "redis://localhost:6379"). For cluster, use comma-separated URIs. */ + private String uri = "redis://localhost:6379"; + + /** Pub/Sub channel name for rule change notifications. */ + private String channel = "fluxgate:rule-reload"; + + /** Connection timeout. */ + private Duration timeout = Duration.ofSeconds(5); + + public String getUri() { + return uri; + } + + public void setUri(String uri) { + this.uri = uri; + } + + public String getChannel() { + return channel; + } + + public void setChannel(String channel) { + this.channel = channel; + } + + public Duration getTimeout() { + return timeout; + } + + public void setTimeout(Duration timeout) { + this.timeout = timeout; + } + } +} diff --git a/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RedisRuleChangeNotifier.java b/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RedisRuleChangeNotifier.java new file mode 100644 index 0000000..d430b3e --- /dev/null +++ b/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RedisRuleChangeNotifier.java @@ -0,0 +1,212 @@ +package org.fluxgate.control.notify; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.cluster.RedisClusterClient; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Redis Pub/Sub implementation of {@link RuleChangeNotifier}. + * + *

Publishes rule change notifications to a Redis channel. All FluxGate instances subscribed to + * this channel will receive the notification and invalidate their local caches. + * + *

Supports both standalone Redis and Redis Cluster configurations. + * + *

Example usage: + * + *

{@code
+ * // Standalone Redis
+ * RuleChangeNotifier notifier = new RedisRuleChangeNotifier(
+ *     "redis://localhost:6379",
+ *     "fluxgate:rule-reload"
+ * );
+ *
+ * // Notify specific rule change
+ * notifier.notifyChange("my-rule-set-id");
+ *
+ * // Notify full reload
+ * notifier.notifyFullReload();
+ *
+ * // Cleanup
+ * notifier.close();
+ * }
+ */ +public class RedisRuleChangeNotifier implements RuleChangeNotifier { + + private static final Logger log = LoggerFactory.getLogger(RedisRuleChangeNotifier.class); + private static final String DEFAULT_SOURCE = "fluxgate-control"; + + private final String redisUri; + private final String channel; + private final Duration timeout; + private final String source; + private final ObjectMapper objectMapper; + private final boolean isCluster; + + private volatile RedisClient redisClient; + private volatile RedisClusterClient redisClusterClient; + private volatile StatefulRedisConnection connection; + private volatile StatefulRedisClusterConnection clusterConnection; + private volatile boolean closed = false; + + /** + * Creates a new RedisRuleChangeNotifier with default settings. + * + * @param redisUri Redis URI (e.g., "redis://localhost:6379" or comma-separated for cluster) + * @param channel the Pub/Sub channel name + */ + public RedisRuleChangeNotifier(String redisUri, String channel) { + this(redisUri, channel, Duration.ofSeconds(5), DEFAULT_SOURCE); + } + + /** + * Creates a new RedisRuleChangeNotifier with custom settings. + * + * @param redisUri Redis URI (e.g., "redis://localhost:6379" or comma-separated for cluster) + * @param channel the Pub/Sub channel name + * @param timeout connection timeout + * @param source identifier for this application in notifications + */ + public RedisRuleChangeNotifier(String redisUri, String channel, Duration timeout, String source) { + this.redisUri = Objects.requireNonNull(redisUri, "redisUri must not be null"); + this.channel = Objects.requireNonNull(channel, "channel must not be null"); + this.timeout = Objects.requireNonNull(timeout, "timeout must not be null"); + this.source = Objects.requireNonNull(source, "source must not be null"); + this.objectMapper = new ObjectMapper(); + this.isCluster = redisUri.contains(","); + + log.info( + "RedisRuleChangeNotifier initialized: channel={}, cluster={}, source={}", + channel, + isCluster, + source); + } + + @Override + public void notifyChange(String ruleSetId) { + Objects.requireNonNull(ruleSetId, "ruleSetId must not be null"); + RuleChangeMessage message = RuleChangeMessage.forRuleSet(ruleSetId, source); + publish(message); + log.info("Published rule change notification: ruleSetId={}", ruleSetId); + } + + @Override + public void notifyFullReload() { + RuleChangeMessage message = RuleChangeMessage.fullReload(source); + publish(message); + log.info("Published full reload notification"); + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + + log.info("Closing RedisRuleChangeNotifier"); + + if (connection != null) { + try { + connection.close(); + } catch (Exception e) { + log.warn("Error closing Redis connection", e); + } + } + + if (clusterConnection != null) { + try { + clusterConnection.close(); + } catch (Exception e) { + log.warn("Error closing Redis cluster connection", e); + } + } + + if (redisClient != null) { + try { + redisClient.shutdown(); + } catch (Exception e) { + log.warn("Error shutting down Redis client", e); + } + } + + if (redisClusterClient != null) { + try { + redisClusterClient.shutdown(); + } catch (Exception e) { + log.warn("Error shutting down Redis cluster client", e); + } + } + } + + private void publish(RuleChangeMessage message) { + if (closed) { + throw new IllegalStateException("RedisRuleChangeNotifier is closed"); + } + + String json = serialize(message); + ensureConnection(); + + try { + if (isCluster) { + clusterConnection.sync().publish(channel, json); + } else { + connection.sync().publish(channel, json); + } + } catch (Exception e) { + log.error("Failed to publish rule change notification", e); + throw new RuleChangeNotificationException("Failed to publish notification", e); + } + } + + private synchronized void ensureConnection() { + if (isCluster) { + if (clusterConnection == null || !clusterConnection.isOpen()) { + createClusterConnection(); + } + } else { + if (connection == null || !connection.isOpen()) { + createStandaloneConnection(); + } + } + } + + private void createStandaloneConnection() { + log.debug("Creating standalone Redis connection"); + RedisURI uri = createRedisUri(redisUri); + redisClient = RedisClient.create(uri); + connection = redisClient.connect(); + } + + private void createClusterConnection() { + log.debug("Creating Redis cluster connection"); + List uris = + Arrays.stream(redisUri.split(",")).map(String::trim).map(this::createRedisUri).toList(); + redisClusterClient = RedisClusterClient.create(uris); + clusterConnection = redisClusterClient.connect(); + } + + private RedisURI createRedisUri(String uri) { + RedisURI redisURI = RedisURI.create(uri); + redisURI.setTimeout(timeout); + return redisURI; + } + + private String serialize(RuleChangeMessage message) { + try { + return objectMapper.writeValueAsString(message); + } catch (JsonProcessingException e) { + throw new RuleChangeNotificationException("Failed to serialize message", e); + } + } +} diff --git a/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeMessage.java b/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeMessage.java new file mode 100644 index 0000000..192c6fc --- /dev/null +++ b/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeMessage.java @@ -0,0 +1,92 @@ +package org.fluxgate.control.notify; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.time.Instant; +import java.util.Objects; + +/** + * Message payload for rule change notifications. + * + *

This class is serialized to JSON and published via Redis Pub/Sub. FluxGate instances + * subscribed to the channel will deserialize this message and take appropriate action. + */ +public class RuleChangeMessage { + + private final String ruleSetId; + private final boolean fullReload; + private final long timestamp; + private final String source; + + @JsonCreator + public RuleChangeMessage( + @JsonProperty("ruleSetId") String ruleSetId, + @JsonProperty("fullReload") boolean fullReload, + @JsonProperty("timestamp") long timestamp, + @JsonProperty("source") String source) { + this.ruleSetId = ruleSetId; + this.fullReload = fullReload; + this.timestamp = timestamp; + this.source = source; + } + + /** + * Creates a message for a specific rule set change. + * + * @param ruleSetId the ID of the changed rule set + * @param source identifier of the source application (e.g., "fluxgate-control") + * @return the change message + */ + public static RuleChangeMessage forRuleSet(String ruleSetId, String source) { + Objects.requireNonNull(ruleSetId, "ruleSetId must not be null"); + return new RuleChangeMessage(ruleSetId, false, Instant.now().toEpochMilli(), source); + } + + /** + * Creates a message for a full reload of all rules. + * + * @param source identifier of the source application (e.g., "fluxgate-control") + * @return the full reload message + */ + public static RuleChangeMessage fullReload(String source) { + return new RuleChangeMessage(null, true, Instant.now().toEpochMilli(), source); + } + + /** Returns the rule set ID, or null if this is a full reload. */ + public String getRuleSetId() { + return ruleSetId; + } + + /** Returns true if this is a full reload request. */ + public boolean isFullReload() { + return fullReload; + } + + /** Returns the timestamp when this message was created (epoch millis). */ + public long getTimestamp() { + return timestamp; + } + + /** Returns the source application identifier. */ + public String getSource() { + return source; + } + + @Override + public String toString() { + if (fullReload) { + return "RuleChangeMessage{fullReload=true, source='" + + source + + "', timestamp=" + + timestamp + + "}"; + } + return "RuleChangeMessage{ruleSetId='" + + ruleSetId + + "', source='" + + source + + "', timestamp=" + + timestamp + + "}"; + } +} diff --git a/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeNotificationException.java b/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeNotificationException.java new file mode 100644 index 0000000..9d45587 --- /dev/null +++ b/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeNotificationException.java @@ -0,0 +1,13 @@ +package org.fluxgate.control.notify; + +/** Exception thrown when a rule change notification fails to be published. */ +public class RuleChangeNotificationException extends RuntimeException { + + public RuleChangeNotificationException(String message) { + super(message); + } + + public RuleChangeNotificationException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeNotifier.java b/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeNotifier.java new file mode 100644 index 0000000..fb11025 --- /dev/null +++ b/fluxgate-control-support/src/main/java/org/fluxgate/control/notify/RuleChangeNotifier.java @@ -0,0 +1,57 @@ +package org.fluxgate.control.notify; + +/** + * Interface for notifying FluxGate application servers about rule changes. + * + *

When rules are modified in the Admin/Studio application, this notifier broadcasts the change + * to all FluxGate instances so they can invalidate their local caches and reload the updated rules. + * + *

Example usage: + * + *

{@code
+ * @Service
+ * public class RuleManagementService {
+ *     private final RuleChangeNotifier notifier;
+ *
+ *     public void updateRule(String ruleSetId, RuleDto dto) {
+ *         // 1. Save to database
+ *         mongoRepository.save(dto);
+ *
+ *         // 2. Notify all FluxGate instances
+ *         notifier.notifyChange(ruleSetId);
+ *     }
+ *
+ *     public void deleteAllRules() {
+ *         mongoRepository.deleteAll();
+ *         notifier.notifyFullReload();
+ *     }
+ * }
+ * }
+ */ +public interface RuleChangeNotifier { + + /** + * Notifies all FluxGate instances that a specific rule set has changed. + * + *

The instances will invalidate their local cache for this rule set and reload it from the + * database on the next request. + * + * @param ruleSetId the ID of the changed rule set + */ + void notifyChange(String ruleSetId); + + /** + * Notifies all FluxGate instances to perform a full reload of all rules. + * + *

Use this when multiple rules have changed or when performing bulk operations. All instances + * will invalidate their entire rule cache. + */ + void notifyFullReload(); + + /** + * Closes the notifier and releases any resources. + * + *

After calling this method, the notifier should not be used. + */ + void close(); +} diff --git a/fluxgate-control-support/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/fluxgate-control-support/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 0000000..affbfb4 --- /dev/null +++ b/fluxgate-control-support/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +org.fluxgate.control.autoconfigure.ControlSupportAutoConfiguration diff --git a/fluxgate-control-support/src/test/java/org/fluxgate/control/notify/RedisRuleChangeNotifierTest.java b/fluxgate-control-support/src/test/java/org/fluxgate/control/notify/RedisRuleChangeNotifierTest.java new file mode 100644 index 0000000..02b1afb --- /dev/null +++ b/fluxgate-control-support/src/test/java/org/fluxgate/control/notify/RedisRuleChangeNotifierTest.java @@ -0,0 +1,101 @@ +package org.fluxgate.control.notify; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.time.Duration; +import org.junit.jupiter.api.Test; + +class RedisRuleChangeNotifierTest { + + @Test + void shouldRequireRedisUri() { + assertThatThrownBy(() -> new RedisRuleChangeNotifier(null, "channel")) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("redisUri"); + } + + @Test + void shouldRequireChannel() { + assertThatThrownBy(() -> new RedisRuleChangeNotifier("redis://localhost:6379", null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("channel"); + } + + @Test + void shouldRequireTimeout() { + assertThatThrownBy( + () -> new RedisRuleChangeNotifier("redis://localhost:6379", "channel", null, "source")) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("timeout"); + } + + @Test + void shouldRequireSource() { + assertThatThrownBy( + () -> + new RedisRuleChangeNotifier( + "redis://localhost:6379", "channel", Duration.ofSeconds(5), null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("source"); + } + + @Test + void shouldDetectClusterMode() { + // Cluster mode is detected by comma-separated URIs + RedisRuleChangeNotifier notifier = + new RedisRuleChangeNotifier( + "redis://node1:6379,redis://node2:6379", "channel", Duration.ofSeconds(5), "source"); + + // Should not throw - cluster mode detected + assertThat(notifier).isNotNull(); + notifier.close(); + } + + @Test + void shouldThrowWhenClosedAndNotify() { + RedisRuleChangeNotifier notifier = + new RedisRuleChangeNotifier("redis://localhost:6379", "channel"); + notifier.close(); + + assertThatThrownBy(() -> notifier.notifyChange("rule-id")) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("closed"); + } + + @Test + void shouldThrowWhenClosedAndNotifyFullReload() { + RedisRuleChangeNotifier notifier = + new RedisRuleChangeNotifier("redis://localhost:6379", "channel"); + notifier.close(); + + assertThatThrownBy(() -> notifier.notifyFullReload()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("closed"); + } + + @Test + void shouldRequireRuleSetIdForNotifyChange() { + RedisRuleChangeNotifier notifier = + new RedisRuleChangeNotifier("redis://localhost:6379", "channel"); + + try { + assertThatThrownBy(() -> notifier.notifyChange(null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("ruleSetId"); + } finally { + notifier.close(); + } + } + + @Test + void shouldAllowMultipleClose() { + RedisRuleChangeNotifier notifier = + new RedisRuleChangeNotifier("redis://localhost:6379", "channel"); + + // Should not throw + notifier.close(); + notifier.close(); + notifier.close(); + } +} diff --git a/fluxgate-control-support/src/test/java/org/fluxgate/control/notify/RuleChangeMessageTest.java b/fluxgate-control-support/src/test/java/org/fluxgate/control/notify/RuleChangeMessageTest.java new file mode 100644 index 0000000..4808084 --- /dev/null +++ b/fluxgate-control-support/src/test/java/org/fluxgate/control/notify/RuleChangeMessageTest.java @@ -0,0 +1,82 @@ +package org.fluxgate.control.notify; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; + +class RuleChangeMessageTest { + + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Test + void shouldCreateMessageForRuleSet() { + RuleChangeMessage message = RuleChangeMessage.forRuleSet("test-rule", "test-source"); + + assertThat(message.getRuleSetId()).isEqualTo("test-rule"); + assertThat(message.isFullReload()).isFalse(); + assertThat(message.getSource()).isEqualTo("test-source"); + assertThat(message.getTimestamp()).isPositive(); + } + + @Test + void shouldCreateFullReloadMessage() { + RuleChangeMessage message = RuleChangeMessage.fullReload("test-source"); + + assertThat(message.getRuleSetId()).isNull(); + assertThat(message.isFullReload()).isTrue(); + assertThat(message.getSource()).isEqualTo("test-source"); + } + + @Test + void shouldThrowOnNullRuleSetId() { + assertThatThrownBy(() -> RuleChangeMessage.forRuleSet(null, "source")) + .isInstanceOf(NullPointerException.class); + } + + @Test + void shouldSerializeAndDeserialize() throws Exception { + RuleChangeMessage original = RuleChangeMessage.forRuleSet("my-rule", "studio"); + + String json = objectMapper.writeValueAsString(original); + RuleChangeMessage deserialized = objectMapper.readValue(json, RuleChangeMessage.class); + + assertThat(deserialized.getRuleSetId()).isEqualTo(original.getRuleSetId()); + assertThat(deserialized.isFullReload()).isEqualTo(original.isFullReload()); + assertThat(deserialized.getSource()).isEqualTo(original.getSource()); + assertThat(deserialized.getTimestamp()).isEqualTo(original.getTimestamp()); + } + + @Test + void shouldSerializeFullReloadMessage() throws Exception { + RuleChangeMessage original = RuleChangeMessage.fullReload("admin"); + + String json = objectMapper.writeValueAsString(original); + RuleChangeMessage deserialized = objectMapper.readValue(json, RuleChangeMessage.class); + + assertThat(deserialized.getRuleSetId()).isNull(); + assertThat(deserialized.isFullReload()).isTrue(); + assertThat(deserialized.getSource()).isEqualTo("admin"); + } + + @Test + void shouldHaveToStringForRuleSet() { + RuleChangeMessage message = RuleChangeMessage.forRuleSet("test-rule", "source"); + + String str = message.toString(); + + assertThat(str).contains("test-rule"); + assertThat(str).contains("source"); + } + + @Test + void shouldHaveToStringForFullReload() { + RuleChangeMessage message = RuleChangeMessage.fullReload("source"); + + String str = message.toString(); + + assertThat(str).contains("fullReload=true"); + assertThat(str).contains("source"); + } +} diff --git a/fluxgate-core/src/main/java/org/fluxgate/core/reload/BucketResetHandler.java b/fluxgate-core/src/main/java/org/fluxgate/core/reload/BucketResetHandler.java new file mode 100644 index 0000000..859e5cd --- /dev/null +++ b/fluxgate-core/src/main/java/org/fluxgate/core/reload/BucketResetHandler.java @@ -0,0 +1,47 @@ +package org.fluxgate.core.reload; + +/** + * Handler for resetting rate limit buckets when rules are changed. + * + *

When a rule is modified in the Admin UI, the cached rule definitions are invalidated, but the + * token bucket state in Redis (or other storage) remains. This handler is responsible for resetting + * the bucket state so that the new rules take effect immediately. + * + *

Implementations should delete or reset the token buckets associated with the changed rule set. + * + *

Example implementation for Redis: + * + *

{@code
+ * public class RedisBucketResetHandler implements BucketResetHandler {
+ *     private final RedisTokenBucketStore store;
+ *
+ *     @Override
+ *     public void resetBuckets(String ruleSetId) {
+ *         store.deleteBucketsByRuleSetId(ruleSetId);
+ *     }
+ *
+ *     @Override
+ *     public void resetAllBuckets() {
+ *         store.deleteAllBuckets();
+ *     }
+ * }
+ * }
+ */ +public interface BucketResetHandler { + + /** + * Resets all buckets associated with the given rule set. + * + *

Called when a specific rule set is modified or deleted. + * + * @param ruleSetId the rule set ID whose buckets should be reset + */ + void resetBuckets(String ruleSetId); + + /** + * Resets all buckets (full reset). + * + *

Called when a full reload is triggered. + */ + void resetAllBuckets(); +} diff --git a/fluxgate-core/src/main/java/org/fluxgate/core/reload/CachingRuleSetProvider.java b/fluxgate-core/src/main/java/org/fluxgate/core/reload/CachingRuleSetProvider.java new file mode 100644 index 0000000..8382bd8 --- /dev/null +++ b/fluxgate-core/src/main/java/org/fluxgate/core/reload/CachingRuleSetProvider.java @@ -0,0 +1,102 @@ +package org.fluxgate.core.reload; + +import java.util.Objects; +import java.util.Optional; +import org.fluxgate.core.ratelimiter.RateLimitRuleSet; +import org.fluxgate.core.spi.RateLimitRuleSetProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A decorator that adds caching capabilities to any {@link RateLimitRuleSetProvider}. + * + *

This provider wraps a delegate provider and caches rule sets locally. It also implements + * {@link RuleReloadListener} to invalidate cache entries when reload events are received. + * + *

Example usage: + * + *

{@code
+ * RateLimitRuleSetProvider mongoProvider = new MongoRuleSetProvider(...);
+ * RuleCache cache = new CaffeineRuleCache(...);
+ * RuleReloadStrategy reloadStrategy = new PollingReloadStrategy(...);
+ *
+ * CachingRuleSetProvider cachingProvider =
+ *     new CachingRuleSetProvider(mongoProvider, cache);
+ *
+ * // Register as reload listener
+ * reloadStrategy.addListener(cachingProvider);
+ * }
+ */ +public class CachingRuleSetProvider implements RateLimitRuleSetProvider, RuleReloadListener { + + private static final Logger log = LoggerFactory.getLogger(CachingRuleSetProvider.class); + + private final RateLimitRuleSetProvider delegate; + private final RuleCache cache; + + /** + * Creates a new caching provider. + * + * @param delegate the underlying provider to delegate cache misses to + * @param cache the cache to store rule sets + */ + public CachingRuleSetProvider(RateLimitRuleSetProvider delegate, RuleCache cache) { + this.delegate = Objects.requireNonNull(delegate, "delegate must not be null"); + this.cache = Objects.requireNonNull(cache, "cache must not be null"); + } + + @Override + public Optional findById(String ruleSetId) { + Objects.requireNonNull(ruleSetId, "ruleSetId must not be null"); + + // Try cache first + Optional cached = cache.get(ruleSetId); + if (cached.isPresent()) { + log.trace("Cache hit for ruleSetId: {}", ruleSetId); + return cached; + } + + // Cache miss - load from delegate + log.debug("Cache miss for ruleSetId: {}, loading from delegate", ruleSetId); + Optional loaded = delegate.findById(ruleSetId); + + // Cache if found + loaded.ifPresent( + ruleSet -> { + cache.put(ruleSetId, ruleSet); + log.debug("Cached ruleSetId: {}", ruleSetId); + }); + + return loaded; + } + + @Override + public void onReload(RuleReloadEvent event) { + if (event.isFullReload()) { + log.info("Full reload triggered from {}, invalidating all cached rules", event.getSource()); + cache.invalidateAll(); + } else { + log.info( + "Reload triggered for ruleSetId: {} from {}", event.getRuleSetId(), event.getSource()); + cache.invalidate(event.getRuleSetId()); + } + } + + /** + * Returns the underlying delegate provider. + * + * @return the delegate provider + */ + public RateLimitRuleSetProvider getDelegate() { + return delegate; + } + + /** + * Returns the cache being used. + * + * @return the rule cache + */ + public RuleCache getCache() { + return cache; + } +} diff --git a/fluxgate-core/src/main/java/org/fluxgate/core/reload/ReloadSource.java b/fluxgate-core/src/main/java/org/fluxgate/core/reload/ReloadSource.java new file mode 100644 index 0000000..cec1d34 --- /dev/null +++ b/fluxgate-core/src/main/java/org/fluxgate/core/reload/ReloadSource.java @@ -0,0 +1,23 @@ +package org.fluxgate.core.reload; + +/** Source that triggered a rule reload. */ +public enum ReloadSource { + + /** Reload triggered via Redis Pub/Sub message. */ + PUBSUB, + + /** Reload triggered by periodic polling detecting a change. */ + POLLING, + + /** Reload triggered manually via API or programmatic call. */ + MANUAL, + + /** Reload triggered by an external API call (e.g., REST endpoint). */ + API, + + /** Reload triggered during application startup. */ + STARTUP, + + /** Reload triggered by cache expiration (TTL). */ + CACHE_EXPIRY +} diff --git a/fluxgate-core/src/main/java/org/fluxgate/core/reload/RuleCache.java b/fluxgate-core/src/main/java/org/fluxgate/core/reload/RuleCache.java new file mode 100644 index 0000000..48ef0f9 --- /dev/null +++ b/fluxgate-core/src/main/java/org/fluxgate/core/reload/RuleCache.java @@ -0,0 +1,79 @@ +package org.fluxgate.core.reload; + +import java.util.Optional; +import java.util.Set; +import org.fluxgate.core.ratelimiter.RateLimitRuleSet; + +/** + * Cache interface for storing and retrieving rate limit rule sets. + * + *

Implementations should be thread-safe and support concurrent access. + */ +public interface RuleCache { + + /** + * Retrieves a cached rule set by ID. + * + * @param ruleSetId the ID of the rule set to retrieve + * @return an Optional containing the rule set if cached, or empty if not found + */ + Optional get(String ruleSetId); + + /** + * Stores a rule set in the cache. + * + * @param ruleSetId the ID of the rule set + * @param ruleSet the rule set to cache + */ + void put(String ruleSetId, RateLimitRuleSet ruleSet); + + /** + * Invalidates (removes) a specific rule set from the cache. + * + * @param ruleSetId the ID of the rule set to invalidate + */ + void invalidate(String ruleSetId); + + /** + * Invalidates all cached rule sets. + * + *

This should be used sparingly as it may impact performance during cache repopulation. + */ + void invalidateAll(); + + /** + * Returns the IDs of all currently cached rule sets. + * + *

This is useful for polling strategies that need to check for changes in known rule sets. + * + * @return an unmodifiable set of cached rule set IDs + */ + Set getCachedRuleSetIds(); + + /** + * Returns the current number of cached entries. + * + * @return the cache size + */ + int size(); + + /** + * Returns cache statistics if available. + * + * @return optional cache statistics + */ + default Optional getStats() { + return Optional.empty(); + } + + /** Statistics about cache performance. */ + record CacheStats( + long hitCount, long missCount, long evictionCount, double hitRate, long estimatedSize) { + + public static CacheStats of( + long hitCount, long missCount, long evictionCount, long estimatedSize) { + double hitRate = hitCount + missCount > 0 ? (double) hitCount / (hitCount + missCount) : 0.0; + return new CacheStats(hitCount, missCount, evictionCount, hitRate, estimatedSize); + } + } +} diff --git a/fluxgate-core/src/main/java/org/fluxgate/core/reload/RuleReloadEvent.java b/fluxgate-core/src/main/java/org/fluxgate/core/reload/RuleReloadEvent.java new file mode 100644 index 0000000..22966cf --- /dev/null +++ b/fluxgate-core/src/main/java/org/fluxgate/core/reload/RuleReloadEvent.java @@ -0,0 +1,146 @@ +package org.fluxgate.core.reload; + +import java.time.Instant; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +/** + * Event representing a rule reload trigger. + * + *

This event is published when rules need to be reloaded, either for a specific rule set or for + * all cached rules. + */ +public final class RuleReloadEvent { + + private final String ruleSetId; + private final ReloadSource source; + private final Instant timestamp; + private final Map metadata; + + private RuleReloadEvent(Builder builder) { + this.ruleSetId = builder.ruleSetId; + this.source = Objects.requireNonNull(builder.source, "source must not be null"); + this.timestamp = builder.timestamp != null ? builder.timestamp : Instant.now(); + this.metadata = + builder.metadata != null + ? Collections.unmodifiableMap(builder.metadata) + : Collections.emptyMap(); + } + + /** + * Returns the rule set ID to reload. If null, all cached rules should be reloaded. + * + * @return the rule set ID, or null for full reload + */ + public String getRuleSetId() { + return ruleSetId; + } + + /** + * Returns true if this is a full reload event (all rules). + * + * @return true if ruleSetId is null + */ + public boolean isFullReload() { + return ruleSetId == null; + } + + /** + * Returns the source that triggered this reload. + * + * @return the reload source + */ + public ReloadSource getSource() { + return source; + } + + /** + * Returns the timestamp when this event was created. + * + * @return the event timestamp + */ + public Instant getTimestamp() { + return timestamp; + } + + /** + * Returns additional metadata about the reload event. + * + * @return unmodifiable map of metadata + */ + public Map getMetadata() { + return metadata; + } + + public static Builder builder() { + return new Builder(); + } + + /** + * Creates a reload event for a specific rule set. + * + * @param ruleSetId the rule set ID to reload + * @param source the reload source + * @return a new reload event + */ + public static RuleReloadEvent forRuleSet(String ruleSetId, ReloadSource source) { + return builder().ruleSetId(ruleSetId).source(source).build(); + } + + /** + * Creates a full reload event (all rules). + * + * @param source the reload source + * @return a new reload event + */ + public static RuleReloadEvent fullReload(ReloadSource source) { + return builder().source(source).build(); + } + + @Override + public String toString() { + return "RuleReloadEvent{" + + "ruleSetId='" + + (ruleSetId != null ? ruleSetId : "ALL") + + '\'' + + ", source=" + + source + + ", timestamp=" + + timestamp + + '}'; + } + + public static final class Builder { + private String ruleSetId; + private ReloadSource source; + private Instant timestamp; + private Map metadata; + + private Builder() {} + + public Builder ruleSetId(String ruleSetId) { + this.ruleSetId = ruleSetId; + return this; + } + + public Builder source(ReloadSource source) { + this.source = source; + return this; + } + + public Builder timestamp(Instant timestamp) { + this.timestamp = timestamp; + return this; + } + + public Builder metadata(Map metadata) { + this.metadata = metadata; + return this; + } + + public RuleReloadEvent build() { + return new RuleReloadEvent(this); + } + } +} diff --git a/fluxgate-core/src/main/java/org/fluxgate/core/reload/RuleReloadListener.java b/fluxgate-core/src/main/java/org/fluxgate/core/reload/RuleReloadListener.java new file mode 100644 index 0000000..f8f969c --- /dev/null +++ b/fluxgate-core/src/main/java/org/fluxgate/core/reload/RuleReloadListener.java @@ -0,0 +1,18 @@ +package org.fluxgate.core.reload; + +/** + * Listener interface for rule reload events. + * + *

Implementations can react to rule changes, such as invalidating caches or updating + * configurations. + */ +@FunctionalInterface +public interface RuleReloadListener { + + /** + * Called when a rule reload event is received. + * + * @param event the reload event containing details about what should be reloaded + */ + void onReload(RuleReloadEvent event); +} diff --git a/fluxgate-core/src/main/java/org/fluxgate/core/reload/RuleReloadStrategy.java b/fluxgate-core/src/main/java/org/fluxgate/core/reload/RuleReloadStrategy.java new file mode 100644 index 0000000..3589e85 --- /dev/null +++ b/fluxgate-core/src/main/java/org/fluxgate/core/reload/RuleReloadStrategy.java @@ -0,0 +1,79 @@ +package org.fluxgate.core.reload; + +/** + * Strategy interface for rule hot reload. + * + *

Implementations define how rules are reloaded (e.g., via polling, Redis Pub/Sub, etc.) and + * manage the lifecycle of reload mechanisms. + * + *

Typical usage: + * + *

{@code
+ * RuleReloadStrategy strategy = new PollingReloadStrategy(...);
+ * strategy.addListener(event -> cache.invalidate(event.getRuleSetId()));
+ * strategy.start();
+ *
+ * // Later, to trigger a reload programmatically:
+ * strategy.triggerReload("my-rule-set");
+ *
+ * // On shutdown:
+ * strategy.stop();
+ * }
+ */ +public interface RuleReloadStrategy { + + /** + * Starts the reload mechanism. + * + *

For polling strategies, this starts the scheduled task. For Pub/Sub strategies, this + * establishes the subscription. + * + *

This method is idempotent - calling it multiple times has no additional effect. + */ + void start(); + + /** + * Stops the reload mechanism and releases resources. + * + *

This method is idempotent - calling it multiple times has no additional effect. + */ + void stop(); + + /** + * Returns whether the reload mechanism is currently running. + * + * @return true if started and not stopped + */ + boolean isRunning(); + + /** + * Triggers a reload for a specific rule set. + * + *

This method can be called to programmatically trigger a reload, for example after updating a + * rule via an admin API. + * + * @param ruleSetId the ID of the rule set to reload + */ + void triggerReload(String ruleSetId); + + /** + * Triggers a full reload of all cached rules. + * + *

This is useful for scenarios like configuration refresh or manual cache invalidation. + */ + void triggerReloadAll(); + + /** + * Adds a listener that will be notified when reload events occur. + * + * @param listener the listener to add + */ + void addListener(RuleReloadListener listener); + + /** + * Removes a previously added listener. + * + * @param listener the listener to remove + */ + void removeListener(RuleReloadListener listener); +} diff --git a/fluxgate-core/src/test/java/org/fluxgate/core/reload/CachingRuleSetProviderTest.java b/fluxgate-core/src/test/java/org/fluxgate/core/reload/CachingRuleSetProviderTest.java new file mode 100644 index 0000000..bb26378 --- /dev/null +++ b/fluxgate-core/src/test/java/org/fluxgate/core/reload/CachingRuleSetProviderTest.java @@ -0,0 +1,159 @@ +package org.fluxgate.core.reload; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import org.fluxgate.core.config.RateLimitBand; +import org.fluxgate.core.config.RateLimitRule; +import org.fluxgate.core.key.RateLimitKey; +import org.fluxgate.core.ratelimiter.RateLimitRuleSet; +import org.fluxgate.core.spi.RateLimitRuleSetProvider; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class CachingRuleSetProviderTest { + + private TestRuleSetProvider delegate; + private TestRuleCache cache; + private CachingRuleSetProvider cachingProvider; + + @BeforeEach + void setUp() { + delegate = new TestRuleSetProvider(); + cache = new TestRuleCache(); + cachingProvider = new CachingRuleSetProvider(delegate, cache); + } + + @Test + void shouldReturnCachedRuleSetOnHit() { + RateLimitRuleSet ruleSet = createTestRuleSet("test-rule"); + cache.put("test-rule", ruleSet); + + Optional result = cachingProvider.findById("test-rule"); + + assertThat(result).isPresent().contains(ruleSet); + assertThat(delegate.getCallCount()).isZero(); + } + + @Test + void shouldLoadFromDelegateOnCacheMiss() { + RateLimitRuleSet ruleSet = createTestRuleSet("test-rule"); + delegate.addRuleSet("test-rule", ruleSet); + + Optional result = cachingProvider.findById("test-rule"); + + assertThat(result).isPresent().contains(ruleSet); + assertThat(delegate.getCallCount()).isEqualTo(1); + assertThat(cache.get("test-rule")).isPresent(); + } + + @Test + void shouldNotCacheWhenDelegateReturnsEmpty() { + Optional result = cachingProvider.findById("missing-rule"); + + assertThat(result).isEmpty(); + assertThat(cache.get("missing-rule")).isEmpty(); + } + + @Test + void shouldInvalidateCacheOnReloadEvent() { + RateLimitRuleSet ruleSet = createTestRuleSet("test-rule"); + cache.put("test-rule", ruleSet); + + RuleReloadEvent event = RuleReloadEvent.forRuleSet("test-rule", ReloadSource.PUBSUB); + cachingProvider.onReload(event); + + assertThat(cache.get("test-rule")).isEmpty(); + } + + @Test + void shouldInvalidateAllOnFullReloadEvent() { + cache.put("rule-1", createTestRuleSet("rule-1")); + cache.put("rule-2", createTestRuleSet("rule-2")); + + RuleReloadEvent event = RuleReloadEvent.fullReload(ReloadSource.MANUAL); + cachingProvider.onReload(event); + + assertThat(cache.size()).isZero(); + } + + @Test + void shouldExposeDelegate() { + assertThat(cachingProvider.getDelegate()).isSameAs(delegate); + } + + @Test + void shouldExposeCache() { + assertThat(cachingProvider.getCache()).isSameAs(cache); + } + + private RateLimitRuleSet createTestRuleSet(String id) { + RateLimitBand band = RateLimitBand.builder(Duration.ofMinutes(1), 100).build(); + RateLimitRule rule = RateLimitRule.builder("rule-1").addBand(band).build(); + return RateLimitRuleSet.builder(id) + .rules(List.of(rule)) + .keyResolver(ctx -> new RateLimitKey(ctx.getClientIp())) + .build(); + } + + /** Simple test implementation of RateLimitRuleSetProvider. */ + static class TestRuleSetProvider implements RateLimitRuleSetProvider { + private final ConcurrentHashMap ruleSets = new ConcurrentHashMap<>(); + private final AtomicInteger callCount = new AtomicInteger(0); + + void addRuleSet(String id, RateLimitRuleSet ruleSet) { + ruleSets.put(id, ruleSet); + } + + int getCallCount() { + return callCount.get(); + } + + @Override + public Optional findById(String ruleSetId) { + callCount.incrementAndGet(); + return Optional.ofNullable(ruleSets.get(ruleSetId)); + } + } + + /** Simple test implementation of RuleCache. */ + static class TestRuleCache implements RuleCache { + private final ConcurrentHashMap cache = new ConcurrentHashMap<>(); + + @Override + public Optional get(String ruleSetId) { + return Optional.ofNullable(cache.get(ruleSetId)); + } + + @Override + public void put(String ruleSetId, RateLimitRuleSet ruleSet) { + cache.put(ruleSetId, ruleSet); + } + + @Override + public void invalidate(String ruleSetId) { + cache.remove(ruleSetId); + } + + @Override + public void invalidateAll() { + cache.clear(); + } + + @Override + public Set getCachedRuleSetIds() { + return new HashSet<>(cache.keySet()); + } + + @Override + public int size() { + return cache.size(); + } + } +} diff --git a/fluxgate-core/src/test/java/org/fluxgate/core/reload/RuleReloadEventTest.java b/fluxgate-core/src/test/java/org/fluxgate/core/reload/RuleReloadEventTest.java new file mode 100644 index 0000000..5df15c4 --- /dev/null +++ b/fluxgate-core/src/test/java/org/fluxgate/core/reload/RuleReloadEventTest.java @@ -0,0 +1,76 @@ +package org.fluxgate.core.reload; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.time.Instant; +import java.util.Map; +import org.junit.jupiter.api.Test; + +class RuleReloadEventTest { + + @Test + void shouldCreateEventForSpecificRuleSet() { + RuleReloadEvent event = RuleReloadEvent.forRuleSet("my-rule-set", ReloadSource.PUBSUB); + + assertThat(event.getRuleSetId()).isEqualTo("my-rule-set"); + assertThat(event.getSource()).isEqualTo(ReloadSource.PUBSUB); + assertThat(event.isFullReload()).isFalse(); + assertThat(event.getTimestamp()).isNotNull(); + assertThat(event.getMetadata()).isEmpty(); + } + + @Test + void shouldCreateFullReloadEvent() { + RuleReloadEvent event = RuleReloadEvent.fullReload(ReloadSource.POLLING); + + assertThat(event.getRuleSetId()).isNull(); + assertThat(event.getSource()).isEqualTo(ReloadSource.POLLING); + assertThat(event.isFullReload()).isTrue(); + } + + @Test + void shouldBuildEventWithAllFields() { + Instant timestamp = Instant.now(); + Map metadata = Map.of("key", "value"); + + RuleReloadEvent event = + RuleReloadEvent.builder() + .ruleSetId("test-rule") + .source(ReloadSource.MANUAL) + .timestamp(timestamp) + .metadata(metadata) + .build(); + + assertThat(event.getRuleSetId()).isEqualTo("test-rule"); + assertThat(event.getSource()).isEqualTo(ReloadSource.MANUAL); + assertThat(event.getTimestamp()).isEqualTo(timestamp); + assertThat(event.getMetadata()).containsEntry("key", "value"); + } + + @Test + void shouldRequireSource() { + assertThatThrownBy(() -> RuleReloadEvent.builder().build()) + .isInstanceOf(NullPointerException.class); + } + + @Test + void shouldHaveToString() { + RuleReloadEvent event = RuleReloadEvent.forRuleSet("test", ReloadSource.API); + + String str = event.toString(); + + assertThat(str).contains("test"); + assertThat(str).contains("API"); + } + + @Test + void shouldHaveToStringForFullReload() { + RuleReloadEvent event = RuleReloadEvent.fullReload(ReloadSource.STARTUP); + + String str = event.toString(); + + assertThat(str).contains("ALL"); + assertThat(str).contains("STARTUP"); + } +} diff --git a/fluxgate-redis-ratelimiter/src/main/java/org/fluxgate/redis/store/RedisTokenBucketStore.java b/fluxgate-redis-ratelimiter/src/main/java/org/fluxgate/redis/store/RedisTokenBucketStore.java index 4aeec98..fb1e8da 100644 --- a/fluxgate-redis-ratelimiter/src/main/java/org/fluxgate/redis/store/RedisTokenBucketStore.java +++ b/fluxgate-redis-ratelimiter/src/main/java/org/fluxgate/redis/store/RedisTokenBucketStore.java @@ -140,6 +140,60 @@ public RedisConnectionProvider.RedisMode getMode() { return connectionProvider.getMode(); } + /** + * Deletes all token buckets matching the given ruleSetId pattern. + * + *

This is used when rules are changed to reset rate limit state. The pattern matches keys like: + * {@code fluxgate:{ruleSetId}:*} + * + *

Warning: Uses KEYS command which can be slow on large databases. Consider using SCAN in + * high-traffic production environments. + * + * @param ruleSetId the rule set ID to match + * @return the number of buckets deleted + */ + public long deleteBucketsByRuleSetId(String ruleSetId) { + Objects.requireNonNull(ruleSetId, "ruleSetId must not be null"); + + String pattern = "fluxgate:" + ruleSetId + ":*"; + log.debug("Deleting token buckets matching pattern: {}", pattern); + + java.util.List keys = connectionProvider.keys(pattern); + if (keys.isEmpty()) { + log.debug("No token buckets found for ruleSetId: {}", ruleSetId); + return 0; + } + + long deleted = connectionProvider.del(keys.toArray(new String[0])); + log.info("Deleted {} token buckets for ruleSetId: {}", deleted, ruleSetId); + return deleted; + } + + /** + * Deletes all token buckets (full reset). + * + *

This is used when a full reload is triggered to reset all rate limit state. The pattern + * matches all FluxGate keys: {@code fluxgate:*} + * + *

Warning: Uses KEYS command which can be slow on large databases. + * + * @return the number of buckets deleted + */ + public long deleteAllBuckets() { + String pattern = "fluxgate:*"; + log.debug("Deleting all token buckets matching pattern: {}", pattern); + + java.util.List keys = connectionProvider.keys(pattern); + if (keys.isEmpty()) { + log.debug("No token buckets found"); + return 0; + } + + long deleted = connectionProvider.del(keys.toArray(new String[0])); + log.info("Deleted {} token buckets (full reset)", deleted); + return deleted; + } + /** Closes the store. Note: The connection provider is managed externally. */ public void close() { // Connection provider is managed externally, so nothing to do here diff --git a/fluxgate-samples/fluxgate-sample-standalone/pom.xml b/fluxgate-samples/fluxgate-sample-standalone/pom.xml index fdd8b08..a185c04 100644 --- a/fluxgate-samples/fluxgate-sample-standalone/pom.xml +++ b/fluxgate-samples/fluxgate-sample-standalone/pom.xml @@ -85,6 +85,12 @@ io.micrometer micrometer-registry-prometheus + + + + com.github.ben-manes.caffeine + caffeine + diff --git a/fluxgate-samples/fluxgate-sample-standalone/src/main/resources/application.yml b/fluxgate-samples/fluxgate-sample-standalone/src/main/resources/application.yml index 3558b4e..d9d5daf 100644 --- a/fluxgate-samples/fluxgate-sample-standalone/src/main/resources/application.yml +++ b/fluxgate-samples/fluxgate-sample-standalone/src/main/resources/application.yml @@ -19,12 +19,20 @@ fluxgate: ddl-auto: validate # Redis configuration (uses FluxgateConfig) redis: + enabled: true uri: redis://127.0.0.1:7100,redis://127.0.0.1:7101,redis://127.0.0.1:7102 #uri: redis://localhost:6379 # Metrics enable for Prometheus metrics: enabled: true + # Hot reload configuration + reload: + enabled: true + strategy: PUBSUB # Use Redis Pub/Sub for real-time updates + pubsub: + channel: fluxgate:rule-reload # Must match Admin API channel + logging: level: org.fluxgate: DEBUG diff --git a/fluxgate-spring-boot-starter/pom.xml b/fluxgate-spring-boot-starter/pom.xml index 7e6b453..48650e7 100644 --- a/fluxgate-spring-boot-starter/pom.xml +++ b/fluxgate-spring-boot-starter/pom.xml @@ -98,6 +98,14 @@ true + + + com.github.ben-manes.caffeine + caffeine + 3.1.8 + true + + org.springframework.boot @@ -168,6 +176,13 @@ **/FluxgateMongoAutoConfiguration.class **/FluxgateRedisAutoConfiguration.class + **/FluxgateReloadAutoConfiguration.class + + **/reload/cache/** + **/reload/strategy/** + + **/FluxgateProperties$ReloadProperties.class + **/FluxgateProperties$ReloadProperties$*.class diff --git a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/autoconfigure/FluxgateMongoAutoConfiguration.java b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/autoconfigure/FluxgateMongoAutoConfiguration.java index 5d6b87f..e9991e1 100644 --- a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/autoconfigure/FluxgateMongoAutoConfiguration.java +++ b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/autoconfigure/FluxgateMongoAutoConfiguration.java @@ -151,8 +151,8 @@ public KeyResolver fluxgateKeyResolver() { * @param metricsRecorderProvider lazy provider for composite metrics recorder * @return configured MongoRuleSetProvider instance */ - @Bean - @ConditionalOnMissingBean(RateLimitRuleSetProvider.class) + @Bean(name = "delegateRuleSetProvider") + @ConditionalOnMissingBean(name = "delegateRuleSetProvider") public RateLimitRuleSetProvider mongoRuleSetProvider( RateLimitRuleRepository repository, KeyResolver fluxgateKeyResolver, diff --git a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/autoconfigure/FluxgateReloadAutoConfiguration.java b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/autoconfigure/FluxgateReloadAutoConfiguration.java new file mode 100644 index 0000000..f375570 --- /dev/null +++ b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/autoconfigure/FluxgateReloadAutoConfiguration.java @@ -0,0 +1,301 @@ +package org.fluxgate.spring.autoconfigure; + +import java.time.Duration; +import org.fluxgate.core.reload.BucketResetHandler; +import org.fluxgate.core.reload.CachingRuleSetProvider; +import org.fluxgate.core.reload.RuleCache; +import org.fluxgate.core.reload.RuleReloadStrategy; +import org.fluxgate.core.spi.RateLimitRuleSetProvider; +import org.fluxgate.redis.store.RedisTokenBucketStore; +import org.fluxgate.spring.properties.FluxgateProperties; +import org.fluxgate.spring.properties.FluxgateProperties.ReloadProperties; +import org.fluxgate.spring.properties.FluxgateProperties.ReloadStrategy; +import org.fluxgate.spring.reload.cache.CaffeineRuleCache; +import org.fluxgate.spring.reload.handler.RedisBucketResetHandler; +import org.fluxgate.spring.reload.strategy.NoOpReloadStrategy; +import org.fluxgate.spring.reload.strategy.PollingReloadStrategy; +import org.fluxgate.spring.reload.strategy.RedisPubSubReloadStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.SmartLifecycle; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; + +/** + * Auto-configuration for FluxGate rule hot reload support. + * + *

This configuration provides: + * + *

    + *
  • {@link RuleCache} - Local cache for rule sets (Caffeine-based) + *
  • {@link RuleReloadStrategy} - Strategy for detecting and propagating rule changes + *
  • {@link CachingRuleSetProvider} - Caching decorator for the rule set provider + *
+ * + *

Note: For publishing rule changes from Admin/Control Plane applications, use the {@code + * fluxgate-control-support} module instead. + * + *

Strategy selection: + * + *

    + *
  • AUTO - Uses Pub/Sub if Redis is available, otherwise falls back to Polling + *
  • PUBSUB - Uses Redis Pub/Sub only (requires Redis) + *
  • POLLING - Uses periodic polling only + *
  • NONE - Disables caching and hot reload + *
+ * + *

Configuration example: + * + *

+ * fluxgate:
+ *   reload:
+ *     enabled: true
+ *     strategy: AUTO
+ *     cache:
+ *       ttl: 5m
+ *       max-size: 1000
+ *     polling:
+ *       interval: 30s
+ *     pubsub:
+ *       channel: fluxgate:rule-reload
+ * 
+ */ +@AutoConfiguration(after = {FluxgateMongoAutoConfiguration.class}) +@ConditionalOnProperty( + prefix = "fluxgate.reload", + name = "enabled", + havingValue = "true", + matchIfMissing = true) +@EnableConfigurationProperties(FluxgateProperties.class) +public class FluxgateReloadAutoConfiguration { + + private static final Logger log = LoggerFactory.getLogger(FluxgateReloadAutoConfiguration.class); + + private final FluxgateProperties properties; + + public FluxgateReloadAutoConfiguration(FluxgateProperties properties) { + this.properties = properties; + } + + /** + * Creates the Caffeine-based rule cache. + * + *

Only created when: + * + *

    + *
  • Cache is enabled ({@code fluxgate.reload.cache.enabled=true}) + *
  • Strategy is not NONE + *
  • Caffeine is on the classpath + *
+ */ + @Bean + @ConditionalOnMissingBean(RuleCache.class) + @ConditionalOnClass(name = "com.github.benmanes.caffeine.cache.Caffeine") + @ConditionalOnProperty( + prefix = "fluxgate.reload.cache", + name = "enabled", + havingValue = "true", + matchIfMissing = true) + public RuleCache ruleCache() { + ReloadProperties reloadProps = properties.getReload(); + + if (reloadProps.getStrategy() == ReloadStrategy.NONE) { + log.info("Rule cache disabled (strategy=NONE)"); + return null; + } + + ReloadProperties.CacheProperties cacheProps = reloadProps.getCache(); + log.info( + "Creating CaffeineRuleCache with ttl={}, maxSize={}", + cacheProps.getTtl(), + cacheProps.getMaxSize()); + + return new CaffeineRuleCache(cacheProps.getTtl(), cacheProps.getMaxSize()); + } + + /** + * Creates the rule reload strategy based on configuration. + * + *

Strategy selection: + * + *

    + *
  • NONE - Returns NoOpReloadStrategy + *
  • POLLING - Returns PollingReloadStrategy + *
  • PUBSUB - Returns RedisPubSubReloadStrategy (requires Redis URI) + *
  • AUTO - Uses Pub/Sub if Redis enabled, otherwise Polling + *
+ */ + @Bean + @ConditionalOnMissingBean(RuleReloadStrategy.class) + public RuleReloadStrategy ruleReloadStrategy( + @Qualifier("delegateRuleSetProvider") RateLimitRuleSetProvider ruleSetProvider, + ObjectProvider ruleCacheProvider) { + + ReloadProperties reloadProps = properties.getReload(); + ReloadStrategy strategy = reloadProps.getStrategy(); + RuleCache ruleCache = ruleCacheProvider.getIfAvailable(); + boolean redisEnabled = properties.getRedis().isEnabled(); + + log.info("Configuring rule reload strategy: {}", strategy); + + return switch (strategy) { + case NONE -> { + log.info("Hot reload disabled (strategy=NONE)"); + yield new NoOpReloadStrategy(); + } + case POLLING -> createPollingStrategy(ruleSetProvider, ruleCache); + case PUBSUB -> { + if (!redisEnabled) { + log.warn("PUBSUB strategy requested but Redis is not enabled. Falling back to POLLING."); + yield createPollingStrategy(ruleSetProvider, ruleCache); + } + yield createPubSubStrategy(); + } + case AUTO -> { + if (redisEnabled) { + log.info("AUTO strategy: Redis enabled, using Pub/Sub"); + yield createPubSubStrategy(); + } else { + log.info("AUTO strategy: Redis not enabled, using Polling"); + yield createPollingStrategy(ruleSetProvider, ruleCache); + } + } + }; + } + + private RuleReloadStrategy createPollingStrategy( + RateLimitRuleSetProvider provider, RuleCache cache) { + if (cache == null) { + log.warn( + "RuleCache is not available. " + + "Falling back to NoOpReloadStrategy. " + + "Check if Caffeine is on the classpath or cache is enabled."); + return new NoOpReloadStrategy(); + } + + ReloadProperties.PollingProperties pollingProps = properties.getReload().getPolling(); + log.info( + "Creating PollingReloadStrategy with interval={}, initialDelay={}", + pollingProps.getInterval(), + pollingProps.getInitialDelay()); + return new PollingReloadStrategy( + provider, cache, pollingProps.getInterval(), pollingProps.getInitialDelay()); + } + + private RedisPubSubReloadStrategy createPubSubStrategy() { + ReloadProperties.PubSubProperties pubsubProps = properties.getReload().getPubsub(); + String redisUri = properties.getRedis().getUri(); + Duration timeout = Duration.ofMillis(properties.getRedis().getTimeoutMs()); + + log.info("Creating RedisPubSubReloadStrategy on channel={}", pubsubProps.getChannel()); + + return new RedisPubSubReloadStrategy( + redisUri, + pubsubProps.getChannel(), + pubsubProps.isRetryOnFailure(), + pubsubProps.getRetryInterval(), + timeout); + } + + /** + * Creates the Redis bucket reset handler. + * + *

This handler automatically resets token buckets when rules change, ensuring that the new + * rate limits take effect immediately. + * + *

Only created when Redis token bucket store is available. + */ + @Bean + @ConditionalOnMissingBean(BucketResetHandler.class) + @ConditionalOnBean(RedisTokenBucketStore.class) + public BucketResetHandler bucketResetHandler(RedisTokenBucketStore tokenBucketStore) { + log.info("Creating RedisBucketResetHandler for automatic bucket reset on rule changes"); + return new RedisBucketResetHandler(tokenBucketStore); + } + + /** + * Creates the caching rule set provider that wraps the delegate provider. + * + *

This is marked as @Primary so it takes precedence over the delegate provider when autowiring + * RateLimitRuleSetProvider. + */ + @Bean(name = "cachingRuleSetProvider") + @Primary + @ConditionalOnBean({RuleCache.class, RuleReloadStrategy.class}) + public CachingRuleSetProvider cachingRuleSetProvider( + @Qualifier("delegateRuleSetProvider") RateLimitRuleSetProvider ruleSetProvider, + RuleCache ruleCache, + RuleReloadStrategy reloadStrategy, + ObjectProvider bucketResetHandlerProvider) { + + // Avoid wrapping if already a caching provider + if (ruleSetProvider instanceof CachingRuleSetProvider) { + log.warn("RuleSetProvider is already a CachingRuleSetProvider, skipping wrap"); + return (CachingRuleSetProvider) ruleSetProvider; + } + + log.info( + "Creating CachingRuleSetProvider wrapping {}", ruleSetProvider.getClass().getSimpleName()); + + CachingRuleSetProvider cachingProvider = new CachingRuleSetProvider(ruleSetProvider, ruleCache); + + // Register the caching provider as a reload listener + reloadStrategy.addListener(cachingProvider); + + // Register bucket reset handler as a reload listener if available + BucketResetHandler bucketResetHandler = bucketResetHandlerProvider.getIfAvailable(); + if (bucketResetHandler instanceof RedisBucketResetHandler redisBucketResetHandler) { + log.info("Registering RedisBucketResetHandler as reload listener"); + reloadStrategy.addListener(redisBucketResetHandler); + } + + return cachingProvider; + } + + /** Lifecycle bean to start and stop the reload strategy. */ + @Bean + @ConditionalOnBean(RuleReloadStrategy.class) + public SmartLifecycle reloadStrategyLifecycle(RuleReloadStrategy reloadStrategy) { + return new SmartLifecycle() { + private volatile boolean running = false; + + @Override + public void start() { + log.info("Starting rule reload strategy: {}", reloadStrategy.getClass().getSimpleName()); + reloadStrategy.start(); + running = true; + } + + @Override + public void stop() { + log.info("Stopping rule reload strategy: {}", reloadStrategy.getClass().getSimpleName()); + reloadStrategy.stop(); + running = false; + } + + @Override + public boolean isRunning() { + return running; + } + + @Override + public int getPhase() { + // Start after other FluxGate components + return Integer.MAX_VALUE - 100; + } + + @Override + public boolean isAutoStartup() { + return true; + } + }; + } +} diff --git a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/properties/FluxgateProperties.java b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/properties/FluxgateProperties.java index fc44a2d..b6d46ea 100644 --- a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/properties/FluxgateProperties.java +++ b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/properties/FluxgateProperties.java @@ -1,5 +1,6 @@ package org.fluxgate.spring.properties; +import java.time.Duration; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.NestedConfigurationProperty; @@ -46,6 +47,9 @@ public class FluxgateProperties { /** Actuator configuration. */ @NestedConfigurationProperty private ActuatorProperties actuator = new ActuatorProperties(); + /** Rule reload configuration for hot reload support. */ + @NestedConfigurationProperty private ReloadProperties reload = new ReloadProperties(); + public MongoProperties getMongo() { return mongo; } @@ -86,6 +90,14 @@ public void setActuator(ActuatorProperties actuator) { this.actuator = actuator; } + public ReloadProperties getReload() { + return reload; + } + + public void setReload(ReloadProperties reload) { + this.reload = reload; + } + // ========================================================================= // Nested Configuration Classes // ========================================================================= @@ -458,4 +470,210 @@ public void setEnabled(boolean enabled) { } } } + + /** + * Rule reload configuration for hot reload support. + * + *

Supports multiple strategies: + * + *

    + *
  • AUTO - Automatically select best strategy (Pub/Sub if Redis available, else Polling) + *
  • PUBSUB - Use Redis Pub/Sub for real-time notifications + *
  • POLLING - Periodically check for changes + *
  • NONE - Disable hot reload (always fetch fresh from provider) + *
+ * + *
+   * fluxgate:
+   *   reload:
+   *     enabled: true
+   *     strategy: AUTO
+   *     cache:
+   *       enabled: true
+   *       ttl: 5m
+   *       max-size: 1000
+   *     polling:
+   *       interval: 30s
+   *     pubsub:
+   *       channel: fluxgate:rule-reload
+   * 
+ */ + public static class ReloadProperties { + + /** Enable rule hot reload feature. */ + private boolean enabled = true; + + /** + * Reload strategy to use. + * + *
    + *
  • AUTO - Use Pub/Sub if Redis is available, otherwise use Polling + *
  • PUBSUB - Use Redis Pub/Sub only + *
  • POLLING - Use periodic polling only + *
  • NONE - Disable caching and always fetch fresh rules + *
+ */ + private ReloadStrategy strategy = ReloadStrategy.AUTO; + + /** Cache configuration. */ + private CacheProperties cache = new CacheProperties(); + + /** Polling strategy configuration. */ + private PollingProperties polling = new PollingProperties(); + + /** Pub/Sub strategy configuration. */ + private PubSubProperties pubsub = new PubSubProperties(); + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public ReloadStrategy getStrategy() { + return strategy; + } + + public void setStrategy(ReloadStrategy strategy) { + this.strategy = strategy; + } + + public CacheProperties getCache() { + return cache; + } + + public void setCache(CacheProperties cache) { + this.cache = cache; + } + + public PollingProperties getPolling() { + return polling; + } + + public void setPolling(PollingProperties polling) { + this.polling = polling; + } + + public PubSubProperties getPubsub() { + return pubsub; + } + + public void setPubsub(PubSubProperties pubsub) { + this.pubsub = pubsub; + } + + /** Rule cache configuration. */ + public static class CacheProperties { + + /** Enable local caching of rules. */ + private boolean enabled = true; + + /** Time-to-live for cached rules. Rules will be refetched after this duration. */ + private Duration ttl = Duration.ofMinutes(5); + + /** Maximum number of rules to cache. */ + private int maxSize = 1000; + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public Duration getTtl() { + return ttl; + } + + public void setTtl(Duration ttl) { + this.ttl = ttl; + } + + public int getMaxSize() { + return maxSize; + } + + public void setMaxSize(int maxSize) { + this.maxSize = maxSize; + } + } + + /** Polling strategy configuration. */ + public static class PollingProperties { + + /** Interval between polling checks. */ + private Duration interval = Duration.ofSeconds(30); + + /** Initial delay before first poll. */ + private Duration initialDelay = Duration.ofSeconds(10); + + public Duration getInterval() { + return interval; + } + + public void setInterval(Duration interval) { + this.interval = interval; + } + + public Duration getInitialDelay() { + return initialDelay; + } + + public void setInitialDelay(Duration initialDelay) { + this.initialDelay = initialDelay; + } + } + + /** Pub/Sub strategy configuration. */ + public static class PubSubProperties { + + /** Redis channel name for reload notifications. */ + private String channel = "fluxgate:rule-reload"; + + /** Retry subscription on failure. */ + private boolean retryOnFailure = true; + + /** Interval between retry attempts. */ + private Duration retryInterval = Duration.ofSeconds(5); + + public String getChannel() { + return channel; + } + + public void setChannel(String channel) { + this.channel = channel; + } + + public boolean isRetryOnFailure() { + return retryOnFailure; + } + + public void setRetryOnFailure(boolean retryOnFailure) { + this.retryOnFailure = retryOnFailure; + } + + public Duration getRetryInterval() { + return retryInterval; + } + + public void setRetryInterval(Duration retryInterval) { + this.retryInterval = retryInterval; + } + } + } + + /** Reload strategy options. */ + public enum ReloadStrategy { + /** Automatically select best strategy based on available infrastructure. */ + AUTO, + /** Use Redis Pub/Sub for real-time reload notifications. */ + PUBSUB, + /** Use periodic polling to check for changes. */ + POLLING, + /** Disable hot reload - always fetch fresh rules from provider. */ + NONE + } } diff --git a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/cache/CaffeineRuleCache.java b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/cache/CaffeineRuleCache.java new file mode 100644 index 0000000..bb74959 --- /dev/null +++ b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/cache/CaffeineRuleCache.java @@ -0,0 +1,135 @@ +package org.fluxgate.spring.reload.cache; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.time.Duration; +import java.util.HashSet; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import org.fluxgate.core.ratelimiter.RateLimitRuleSet; +import org.fluxgate.core.reload.RuleCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Caffeine-based implementation of {@link RuleCache}. + * + *

Provides high-performance, thread-safe local caching of rate limit rule sets with configurable + * TTL and maximum size. + * + *

Example usage: + * + *

{@code
+ * RuleCache cache = new CaffeineRuleCache(
+ *     Duration.ofMinutes(5),  // TTL
+ *     1000                    // max size
+ * );
+ * }
+ */ +public class CaffeineRuleCache implements RuleCache { + + private static final Logger log = LoggerFactory.getLogger(CaffeineRuleCache.class); + + private final Cache cache; + private final Duration ttl; + private final int maxSize; + + /** + * Creates a new Caffeine-based rule cache. + * + * @param ttl time-to-live for cached entries + * @param maxSize maximum number of entries to cache + */ + public CaffeineRuleCache(Duration ttl, int maxSize) { + this.ttl = Objects.requireNonNull(ttl, "ttl must not be null"); + this.maxSize = maxSize; + + this.cache = + Caffeine.newBuilder() + .expireAfterWrite(ttl) + .maximumSize(maxSize) + .recordStats() + .removalListener( + (key, value, cause) -> { + if (cause.wasEvicted()) { + log.debug("Rule set evicted from cache: {} (cause: {})", key, cause); + } + }) + .build(); + + log.info("CaffeineRuleCache initialized with ttl={}, maxSize={}", ttl, maxSize); + } + + @Override + public Optional get(String ruleSetId) { + Objects.requireNonNull(ruleSetId, "ruleSetId must not be null"); + return Optional.ofNullable(cache.getIfPresent(ruleSetId)); + } + + @Override + public void put(String ruleSetId, RateLimitRuleSet ruleSet) { + Objects.requireNonNull(ruleSetId, "ruleSetId must not be null"); + Objects.requireNonNull(ruleSet, "ruleSet must not be null"); + cache.put(ruleSetId, ruleSet); + log.trace("Cached rule set: {}", ruleSetId); + } + + @Override + public void invalidate(String ruleSetId) { + Objects.requireNonNull(ruleSetId, "ruleSetId must not be null"); + cache.invalidate(ruleSetId); + log.debug("Invalidated rule set from cache: {}", ruleSetId); + } + + @Override + public void invalidateAll() { + cache.invalidateAll(); + log.info("Invalidated all cached rule sets"); + } + + @Override + public Set getCachedRuleSetIds() { + return new HashSet<>(cache.asMap().keySet()); + } + + @Override + public int size() { + return (int) cache.estimatedSize(); + } + + @Override + public Optional getStats() { + com.github.benmanes.caffeine.cache.stats.CacheStats stats = cache.stats(); + return Optional.of( + CacheStats.of( + stats.hitCount(), stats.missCount(), stats.evictionCount(), cache.estimatedSize())); + } + + /** + * Returns the configured TTL. + * + * @return the cache TTL + */ + public Duration getTtl() { + return ttl; + } + + /** + * Returns the configured maximum size. + * + * @return the maximum cache size + */ + public int getMaxSize() { + return maxSize; + } + + /** + * Performs cache maintenance (cleanup expired entries). + * + *

This is typically called automatically by Caffeine, but can be invoked manually if needed. + */ + public void cleanUp() { + cache.cleanUp(); + } +} diff --git a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/handler/RedisBucketResetHandler.java b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/handler/RedisBucketResetHandler.java new file mode 100644 index 0000000..b67d418 --- /dev/null +++ b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/handler/RedisBucketResetHandler.java @@ -0,0 +1,62 @@ +package org.fluxgate.spring.reload.handler; + +import java.util.Objects; +import org.fluxgate.core.reload.BucketResetHandler; +import org.fluxgate.core.reload.RuleReloadEvent; +import org.fluxgate.core.reload.RuleReloadListener; +import org.fluxgate.redis.store.RedisTokenBucketStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Redis implementation of {@link BucketResetHandler}. + * + *

This handler deletes token buckets from Redis when rules are changed, ensuring that the new + * rules take effect immediately. + * + *

It also implements {@link RuleReloadListener} to automatically reset buckets when reload + * events are received via Pub/Sub or polling. + */ +public class RedisBucketResetHandler implements BucketResetHandler, RuleReloadListener { + + private static final Logger log = LoggerFactory.getLogger(RedisBucketResetHandler.class); + + private final RedisTokenBucketStore tokenBucketStore; + + /** + * Creates a new RedisBucketResetHandler. + * + * @param tokenBucketStore the Redis token bucket store + */ + public RedisBucketResetHandler(RedisTokenBucketStore tokenBucketStore) { + this.tokenBucketStore = + Objects.requireNonNull(tokenBucketStore, "tokenBucketStore must not be null"); + } + + @Override + public void resetBuckets(String ruleSetId) { + Objects.requireNonNull(ruleSetId, "ruleSetId must not be null"); + log.info("Resetting token buckets for ruleSetId: {}", ruleSetId); + long deleted = tokenBucketStore.deleteBucketsByRuleSetId(ruleSetId); + log.info("Reset complete: {} buckets deleted for ruleSetId: {}", deleted, ruleSetId); + } + + @Override + public void resetAllBuckets() { + log.info("Resetting all token buckets (full reset)"); + long deleted = tokenBucketStore.deleteAllBuckets(); + log.info("Full reset complete: {} buckets deleted", deleted); + } + + @Override + public void onReload(RuleReloadEvent event) { + if (event.isFullReload()) { + log.info("Full reload event received, resetting all buckets"); + resetAllBuckets(); + } else { + String ruleSetId = event.getRuleSetId(); + log.info("Reload event received for ruleSetId: {}, resetting buckets", ruleSetId); + resetBuckets(ruleSetId); + } + } +} diff --git a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/AbstractReloadStrategy.java b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/AbstractReloadStrategy.java new file mode 100644 index 0000000..e0432ac --- /dev/null +++ b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/AbstractReloadStrategy.java @@ -0,0 +1,117 @@ +package org.fluxgate.spring.reload.strategy; + +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import org.fluxgate.core.reload.ReloadSource; +import org.fluxgate.core.reload.RuleReloadEvent; +import org.fluxgate.core.reload.RuleReloadListener; +import org.fluxgate.core.reload.RuleReloadStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract base class for reload strategies. + * + *

Provides common functionality for managing listeners and lifecycle state. + */ +public abstract class AbstractReloadStrategy implements RuleReloadStrategy { + + protected final Logger log = LoggerFactory.getLogger(getClass()); + + private final List listeners = new CopyOnWriteArrayList<>(); + private final AtomicBoolean running = new AtomicBoolean(false); + + /** The reload source for events generated by this strategy. */ + protected abstract ReloadSource getReloadSource(); + + @Override + public void start() { + if (running.compareAndSet(false, true)) { + log.info("Starting {} reload strategy", getClass().getSimpleName()); + doStart(); + } else { + log.debug("{} reload strategy already running", getClass().getSimpleName()); + } + } + + @Override + public void stop() { + if (running.compareAndSet(true, false)) { + log.info("Stopping {} reload strategy", getClass().getSimpleName()); + doStop(); + } else { + log.debug("{} reload strategy already stopped", getClass().getSimpleName()); + } + } + + @Override + public boolean isRunning() { + return running.get(); + } + + @Override + public void triggerReload(String ruleSetId) { + Objects.requireNonNull(ruleSetId, "ruleSetId must not be null"); + log.debug("Triggering reload for ruleSetId: {}", ruleSetId); + notifyListeners(RuleReloadEvent.forRuleSet(ruleSetId, ReloadSource.MANUAL)); + } + + @Override + public void triggerReloadAll() { + log.debug("Triggering full reload"); + notifyListeners(RuleReloadEvent.fullReload(ReloadSource.MANUAL)); + } + + @Override + public void addListener(RuleReloadListener listener) { + Objects.requireNonNull(listener, "listener must not be null"); + listeners.add(listener); + log.debug("Added reload listener: {}", listener.getClass().getSimpleName()); + } + + @Override + public void removeListener(RuleReloadListener listener) { + Objects.requireNonNull(listener, "listener must not be null"); + listeners.remove(listener); + log.debug("Removed reload listener: {}", listener.getClass().getSimpleName()); + } + + /** + * Notifies all registered listeners of a reload event. + * + * @param event the reload event + */ + protected void notifyListeners(RuleReloadEvent event) { + if (listeners.isEmpty()) { + log.trace("No listeners registered, skipping notification"); + return; + } + + log.debug("Notifying {} listeners of reload event: {}", listeners.size(), event); + + for (RuleReloadListener listener : listeners) { + try { + listener.onReload(event); + } catch (Exception e) { + log.error("Error notifying listener {} of reload event", listener.getClass().getName(), e); + } + } + } + + /** Called when the strategy is started. Subclasses should implement their startup logic here. */ + protected abstract void doStart(); + + /** Called when the strategy is stopped. Subclasses should implement their cleanup logic here. */ + protected abstract void doStop(); + + /** + * Returns the number of registered listeners. + * + * @return listener count + */ + protected int getListenerCount() { + return listeners.size(); + } +} diff --git a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/NoOpReloadStrategy.java b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/NoOpReloadStrategy.java new file mode 100644 index 0000000..f44d97a --- /dev/null +++ b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/NoOpReloadStrategy.java @@ -0,0 +1,37 @@ +package org.fluxgate.spring.reload.strategy; + +import org.fluxgate.core.reload.ReloadSource; + +/** + * No-operation reload strategy that does nothing. + * + *

Used when hot reload is disabled (strategy = NONE). Rules are always fetched fresh from the + * provider without caching. + */ +public class NoOpReloadStrategy extends AbstractReloadStrategy { + + @Override + protected ReloadSource getReloadSource() { + return ReloadSource.MANUAL; + } + + @Override + protected void doStart() { + log.info("NoOp reload strategy active - hot reload disabled"); + } + + @Override + protected void doStop() { + // Nothing to clean up + } + + @Override + public void triggerReload(String ruleSetId) { + log.debug("NoOp reload triggered for ruleSetId: {} (ignored)", ruleSetId); + } + + @Override + public void triggerReloadAll() { + log.debug("NoOp full reload triggered (ignored)"); + } +} diff --git a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/PollingReloadStrategy.java b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/PollingReloadStrategy.java new file mode 100644 index 0000000..6e2c160 --- /dev/null +++ b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/PollingReloadStrategy.java @@ -0,0 +1,234 @@ +package org.fluxgate.spring.reload.strategy; + +import java.time.Duration; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import org.fluxgate.core.ratelimiter.RateLimitRuleSet; +import org.fluxgate.core.reload.ReloadSource; +import org.fluxgate.core.reload.RuleCache; +import org.fluxgate.core.reload.RuleReloadEvent; +import org.fluxgate.core.spi.RateLimitRuleSetProvider; + +/** + * Polling-based reload strategy that periodically checks for rule changes. + * + *

This strategy maintains a version map (using hashCode) of cached rule sets and compares them + * against the source provider at regular intervals. + * + *

Configuration example: + * + *

+ * fluxgate:
+ *   reload:
+ *     strategy: POLLING
+ *     polling:
+ *       interval: 30s
+ *       initial-delay: 10s
+ * 
+ */ +public class PollingReloadStrategy extends AbstractReloadStrategy { + + private final RateLimitRuleSetProvider provider; + private final RuleCache cache; + private final Duration pollInterval; + private final Duration initialDelay; + + private ScheduledExecutorService scheduler; + private ScheduledFuture pollTask; + + // Track rule set versions using hash codes + private final Map versionMap = new ConcurrentHashMap<>(); + + /** + * Creates a new polling reload strategy. + * + * @param provider the rule set provider to poll for changes + * @param cache the cache to check for known rule set IDs + * @param pollInterval interval between polls + * @param initialDelay initial delay before first poll + */ + public PollingReloadStrategy( + RateLimitRuleSetProvider provider, + RuleCache cache, + Duration pollInterval, + Duration initialDelay) { + this.provider = Objects.requireNonNull(provider, "provider must not be null"); + this.cache = Objects.requireNonNull(cache, "cache must not be null"); + this.pollInterval = Objects.requireNonNull(pollInterval, "pollInterval must not be null"); + this.initialDelay = Objects.requireNonNull(initialDelay, "initialDelay must not be null"); + } + + @Override + protected ReloadSource getReloadSource() { + return ReloadSource.POLLING; + } + + @Override + protected void doStart() { + scheduler = + Executors.newSingleThreadScheduledExecutor( + r -> { + Thread t = new Thread(r, "fluxgate-rule-poller"); + t.setDaemon(true); + return t; + }); + + pollTask = + scheduler.scheduleWithFixedDelay( + this::pollForChanges, + initialDelay.toMillis(), + pollInterval.toMillis(), + TimeUnit.MILLISECONDS); + + log.info( + "Polling reload strategy started: interval={}, initialDelay={}", + pollInterval, + initialDelay); + } + + @Override + protected void doStop() { + if (pollTask != null) { + pollTask.cancel(false); + pollTask = null; + } + + if (scheduler != null) { + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + scheduler = null; + } + + versionMap.clear(); + log.info("Polling reload strategy stopped"); + } + + /** Polls for changes in all cached rule sets. */ + private void pollForChanges() { + try { + Set cachedIds = cache.getCachedRuleSetIds(); + + if (cachedIds.isEmpty()) { + log.trace("No cached rule sets to poll"); + return; + } + + log.trace("Polling {} cached rule sets for changes", cachedIds.size()); + + for (String ruleSetId : cachedIds) { + checkForChange(ruleSetId); + } + } catch (Exception e) { + log.error("Error during polling cycle", e); + } + } + + /** + * Checks if a specific rule set has changed. + * + * @param ruleSetId the rule set ID to check + */ + private void checkForChange(String ruleSetId) { + try { + Optional currentOpt = provider.findById(ruleSetId); + + if (currentOpt.isEmpty()) { + // Rule set was deleted + Integer previousVersion = versionMap.remove(ruleSetId); + if (previousVersion != null) { + log.info("Rule set deleted: {}", ruleSetId); + notifyListeners(RuleReloadEvent.forRuleSet(ruleSetId, ReloadSource.POLLING)); + } + return; + } + + RateLimitRuleSet current = currentOpt.get(); + int currentVersion = computeVersion(current); + Integer previousVersion = versionMap.get(ruleSetId); + + if (previousVersion == null) { + // First time seeing this rule set + versionMap.put(ruleSetId, currentVersion); + log.debug("Tracking new rule set: {} (version: {})", ruleSetId, currentVersion); + } else if (!previousVersion.equals(currentVersion)) { + // Rule set changed + versionMap.put(ruleSetId, currentVersion); + log.info( + "Rule set changed: {} (version: {} -> {})", ruleSetId, previousVersion, currentVersion); + notifyListeners(RuleReloadEvent.forRuleSet(ruleSetId, ReloadSource.POLLING)); + } else { + log.trace("Rule set unchanged: {}", ruleSetId); + } + } catch (Exception e) { + log.warn("Error checking rule set for changes: {}", ruleSetId, e); + } + } + + /** + * Computes a version hash for a rule set. + * + *

Uses the rule set's content to generate a hash that changes when the rules change. + * + * @param ruleSet the rule set + * @return version hash + */ + private int computeVersion(RateLimitRuleSet ruleSet) { + // Use a combination of ID, description, and rules hash + int hash = ruleSet.getId().hashCode(); + if (ruleSet.getDescription() != null) { + hash = 31 * hash + ruleSet.getDescription().hashCode(); + } + hash = 31 * hash + ruleSet.getRules().hashCode(); + return hash; + } + + /** + * Manually triggers a version check for a specific rule set. + * + * @param ruleSetId the rule set ID to check + */ + public void forceCheck(String ruleSetId) { + checkForChange(ruleSetId); + } + + /** + * Returns the configured poll interval. + * + * @return poll interval + */ + public Duration getPollInterval() { + return pollInterval; + } + + /** + * Returns the configured initial delay. + * + * @return initial delay + */ + public Duration getInitialDelay() { + return initialDelay; + } + + /** + * Returns the current version map for debugging. + * + * @return unmodifiable copy of version map + */ + public Map getVersionMap() { + return Map.copyOf(versionMap); + } +} diff --git a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/RedisPubSubReloadStrategy.java b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/RedisPubSubReloadStrategy.java new file mode 100644 index 0000000..615ada1 --- /dev/null +++ b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/RedisPubSubReloadStrategy.java @@ -0,0 +1,367 @@ +package org.fluxgate.spring.reload.strategy; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.cluster.RedisClusterClient; +import io.lettuce.core.pubsub.RedisPubSubAdapter; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; +import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.fluxgate.core.reload.ReloadSource; +import org.fluxgate.core.reload.RuleReloadEvent; + +/** + * Redis Pub/Sub based reload strategy for real-time rule change notifications. + * + *

This strategy subscribes to a Redis channel and listens for rule change messages. When a + * message is received, it triggers a reload event to invalidate cached rules. + * + *

Message format: + * + *

    + *
  • "*" or empty - Full reload (all rules) + *
  • "ruleSetId" - Reload specific rule set + *
+ * + *

Configuration example: + * + *

+ * fluxgate:
+ *   reload:
+ *     strategy: PUBSUB
+ *     pubsub:
+ *       channel: fluxgate:rule-reload
+ *       retry-on-failure: true
+ *       retry-interval: 5s
+ * 
+ */ +public class RedisPubSubReloadStrategy extends AbstractReloadStrategy { + + /** Message indicating a full reload should occur. */ + public static final String FULL_RELOAD_MESSAGE = "*"; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final String redisUri; + private final String channel; + private final boolean retryOnFailure; + private final Duration retryInterval; + private final boolean isCluster; + private final Duration timeout; + + private Object redisClient; // Created lazily + private final AtomicReference> connectionRef = + new AtomicReference<>(); + private ScheduledExecutorService retryScheduler; + + /** + * Creates a new Redis Pub/Sub reload strategy using URI. + * + * @param redisUri the Redis URI (single for standalone, comma-separated for cluster) + * @param channel the channel to subscribe to + * @param retryOnFailure whether to retry subscription on failure + * @param retryInterval interval between retry attempts + * @param timeout connection timeout + */ + public RedisPubSubReloadStrategy( + String redisUri, + String channel, + boolean retryOnFailure, + Duration retryInterval, + Duration timeout) { + this.redisUri = Objects.requireNonNull(redisUri, "redisUri must not be null"); + this.channel = Objects.requireNonNull(channel, "channel must not be null"); + this.retryOnFailure = retryOnFailure; + this.retryInterval = retryInterval != null ? retryInterval : Duration.ofSeconds(5); + this.timeout = timeout != null ? timeout : Duration.ofSeconds(5); + this.isCluster = redisUri.contains(","); + } + + /** + * Creates a new Redis Pub/Sub reload strategy for standalone Redis. + * + * @param redisClient the Lettuce Redis client + * @param channel the channel to subscribe to + * @param retryOnFailure whether to retry subscription on failure + * @param retryInterval interval between retry attempts + */ + public RedisPubSubReloadStrategy( + RedisClient redisClient, String channel, boolean retryOnFailure, Duration retryInterval) { + this.redisClient = Objects.requireNonNull(redisClient, "redisClient must not be null"); + this.redisUri = null; + this.channel = Objects.requireNonNull(channel, "channel must not be null"); + this.retryOnFailure = retryOnFailure; + this.retryInterval = retryInterval != null ? retryInterval : Duration.ofSeconds(5); + this.timeout = Duration.ofSeconds(5); + this.isCluster = false; + } + + /** + * Creates a new Redis Pub/Sub reload strategy for Redis Cluster. + * + * @param clusterClient the Lettuce Redis cluster client + * @param channel the channel to subscribe to + * @param retryOnFailure whether to retry subscription on failure + * @param retryInterval interval between retry attempts + */ + public RedisPubSubReloadStrategy( + RedisClusterClient clusterClient, + String channel, + boolean retryOnFailure, + Duration retryInterval) { + this.redisClient = Objects.requireNonNull(clusterClient, "clusterClient must not be null"); + this.redisUri = null; + this.channel = Objects.requireNonNull(channel, "channel must not be null"); + this.retryOnFailure = retryOnFailure; + this.retryInterval = retryInterval != null ? retryInterval : Duration.ofSeconds(5); + this.timeout = Duration.ofSeconds(5); + this.isCluster = true; + } + + @Override + protected ReloadSource getReloadSource() { + return ReloadSource.PUBSUB; + } + + @Override + protected void doStart() { + if (retryOnFailure) { + retryScheduler = + Executors.newSingleThreadScheduledExecutor( + r -> { + Thread t = new Thread(r, "fluxgate-pubsub-retry"); + t.setDaemon(true); + return t; + }); + } + + subscribe(); + log.info("Redis Pub/Sub reload strategy started on channel: {}", channel); + } + + @Override + protected void doStop() { + StatefulRedisPubSubConnection connection = connectionRef.getAndSet(null); + if (connection != null) { + try { + connection.sync().unsubscribe(channel); + connection.close(); + } catch (Exception e) { + log.warn("Error closing Pub/Sub connection", e); + } + } + + if (retryScheduler != null) { + retryScheduler.shutdown(); + try { + if (!retryScheduler.awaitTermination(5, TimeUnit.SECONDS)) { + retryScheduler.shutdownNow(); + } + } catch (InterruptedException e) { + retryScheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + retryScheduler = null; + } + + log.info("Redis Pub/Sub reload strategy stopped"); + } + + /** Creates the Redis client if not already created. */ + private void ensureClient() { + if (redisClient != null) { + return; + } + + if (redisUri == null) { + throw new IllegalStateException("No Redis URI or client provided"); + } + + if (isCluster) { + List uris = + Arrays.stream(redisUri.split(",")).map(String::trim).map(this::createRedisUri).toList(); + redisClient = RedisClusterClient.create(uris); + log.info("Created Redis Cluster client for Pub/Sub with {} nodes", uris.size()); + } else { + RedisURI uri = createRedisUri(redisUri); + redisClient = RedisClient.create(uri); + log.info("Created Redis Standalone client for Pub/Sub"); + } + } + + private RedisURI createRedisUri(String uri) { + RedisURI redisURI = RedisURI.create(uri); + redisURI.setTimeout(timeout); + return redisURI; + } + + /** Establishes the Pub/Sub subscription. */ + @SuppressWarnings("unchecked") + private void subscribe() { + try { + ensureClient(); + + StatefulRedisPubSubConnection connection; + if (isCluster) { + connection = ((RedisClusterClient) redisClient).connectPubSub(); + } else { + connection = ((RedisClient) redisClient).connectPubSub(); + } + + connection.addListener( + new RedisPubSubAdapter() { + @Override + public void message(String channel, String message) { + handleMessage(message); + } + + @Override + public void subscribed(String channel, long count) { + log.info("Subscribed to channel: {} (active subscriptions: {})", channel, count); + } + + @Override + public void unsubscribed(String channel, long count) { + log.info("Unsubscribed from channel: {} (active subscriptions: {})", channel, count); + if (isRunning() && retryOnFailure && count == 0) { + scheduleRetry(); + } + } + }); + + RedisPubSubCommands sync = connection.sync(); + sync.subscribe(channel); + + connectionRef.set(connection); + } catch (Exception e) { + log.error("Failed to subscribe to Redis channel: {}", channel, e); + if (retryOnFailure) { + scheduleRetry(); + } + } + } + + /** Schedules a retry attempt for subscription. */ + private void scheduleRetry() { + if (retryScheduler != null && !retryScheduler.isShutdown()) { + log.info("Scheduling Pub/Sub subscription retry in {}", retryInterval); + retryScheduler.schedule( + () -> { + if (isRunning()) { + log.info("Retrying Pub/Sub subscription..."); + subscribe(); + } + }, + retryInterval.toMillis(), + TimeUnit.MILLISECONDS); + } + } + + /** + * Handles an incoming Pub/Sub message. + * + *

Supports two message formats: + * + *

    + *
  • JSON: {"ruleSetId": "xxx", "fullReload": false, ...} + *
  • Plain text: "*" for full reload, or ruleSetId directly + *
+ * + * @param message the message received + */ + private void handleMessage(String message) { + log.debug("Received Pub/Sub message: {}", message); + + RuleReloadEvent event; + if (message == null || message.isEmpty() || FULL_RELOAD_MESSAGE.equals(message)) { + event = RuleReloadEvent.fullReload(ReloadSource.PUBSUB); + log.info("Full reload triggered via Pub/Sub"); + } else if (message.trim().startsWith("{")) { + // JSON format message + event = parseJsonMessage(message); + } else { + // Plain text ruleSetId + event = RuleReloadEvent.forRuleSet(message, ReloadSource.PUBSUB); + log.info("Reload triggered via Pub/Sub for ruleSetId: {}", message); + } + + notifyListeners(event); + } + + /** + * Parses a JSON format message. + * + * @param message the JSON message + * @return the parsed reload event + */ + private RuleReloadEvent parseJsonMessage(String message) { + try { + JsonNode root = OBJECT_MAPPER.readTree(message); + + boolean fullReload = root.path("fullReload").asBoolean(false); + if (fullReload) { + log.info("Full reload triggered via Pub/Sub (JSON)"); + return RuleReloadEvent.fullReload(ReloadSource.PUBSUB); + } + + String ruleSetId = root.path("ruleSetId").asText(null); + if (ruleSetId != null && !ruleSetId.isEmpty()) { + log.info("Reload triggered via Pub/Sub for ruleSetId: {}", ruleSetId); + return RuleReloadEvent.forRuleSet(ruleSetId, ReloadSource.PUBSUB); + } + + log.warn("Invalid JSON message, triggering full reload: {}", message); + return RuleReloadEvent.fullReload(ReloadSource.PUBSUB); + } catch (JsonProcessingException e) { + log.warn("Failed to parse JSON message, treating as ruleSetId: {}", message, e); + return RuleReloadEvent.forRuleSet(message, ReloadSource.PUBSUB); + } + } + + /** + * Returns the configured channel name. + * + * @return channel name + */ + public String getChannel() { + return channel; + } + + /** + * Returns whether retry on failure is enabled. + * + * @return true if retry is enabled + */ + public boolean isRetryOnFailure() { + return retryOnFailure; + } + + /** + * Returns the retry interval. + * + * @return retry interval + */ + public Duration getRetryInterval() { + return retryInterval; + } + + /** + * Checks if currently connected to Redis Pub/Sub. + * + * @return true if connected + */ + public boolean isConnected() { + StatefulRedisPubSubConnection connection = connectionRef.get(); + return connection != null && connection.isOpen(); + } +} diff --git a/fluxgate-spring-boot-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/fluxgate-spring-boot-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index fa9edca..f79274a 100644 --- a/fluxgate-spring-boot-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/fluxgate-spring-boot-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1,5 +1,6 @@ org.fluxgate.spring.autoconfigure.FluxgateMongoAutoConfiguration org.fluxgate.spring.autoconfigure.FluxgateRedisAutoConfiguration +org.fluxgate.spring.autoconfigure.FluxgateReloadAutoConfiguration org.fluxgate.spring.autoconfigure.FluxgateMetricsAutoConfiguration org.fluxgate.spring.autoconfigure.FluxgateMetricsCompositeAutoConfiguration org.fluxgate.spring.autoconfigure.FluxgateActuatorAutoConfiguration diff --git a/pom.xml b/pom.xml index f165033..1311226 100644 --- a/pom.xml +++ b/pom.xml @@ -22,6 +22,7 @@ fluxgate-mongo-adapter fluxgate-redis-ratelimiter fluxgate-spring-boot-starter + fluxgate-control-support fluxgate-testkit fluxgate-samples From 9e921446a2e05d5cd8319e98edec0311255da52c Mon Sep 17 00:00:00 2001 From: rojae Date: Sun, 14 Dec 2025 20:26:41 +0900 Subject: [PATCH 2/5] feat: implement properties `missing-rule-behavior` using when non-existence of ruleSetId (Default setting is `ALLOW`) --- .../handler/StandaloneRateLimitHandler.java | 27 +++++++++++-- .../src/main/resources/application.yml | 8 ++++ .../spring/properties/FluxgateProperties.java | 38 +++++++++++++++++++ 3 files changed, 69 insertions(+), 4 deletions(-) diff --git a/fluxgate-samples/fluxgate-sample-standalone/src/main/java/org/fluxgate/sample/standalone/handler/StandaloneRateLimitHandler.java b/fluxgate-samples/fluxgate-sample-standalone/src/main/java/org/fluxgate/sample/standalone/handler/StandaloneRateLimitHandler.java index a74c7a9..62bb609 100644 --- a/fluxgate-samples/fluxgate-sample-standalone/src/main/java/org/fluxgate/sample/standalone/handler/StandaloneRateLimitHandler.java +++ b/fluxgate-samples/fluxgate-sample-standalone/src/main/java/org/fluxgate/sample/standalone/handler/StandaloneRateLimitHandler.java @@ -8,6 +8,7 @@ import org.fluxgate.core.ratelimiter.RateLimitRuleSet; import org.fluxgate.core.spi.RateLimitRuleSetProvider; import org.fluxgate.redis.RedisRateLimiter; +import org.fluxgate.spring.properties.FluxgateProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -17,6 +18,14 @@ * *

This handler: 1. Looks up rules from MongoDB via RateLimitRuleSetProvider 2. Applies rate * limiting via Redis using RedisRateLimiter 3. Returns rate limit response to the filter + * + *

The behavior when no rule is found can be configured via: + * + *

+ * fluxgate:
+ *   ratelimit:
+ *     missing-rule-behavior: ALLOW  # or DENY
+ * 
*/ @Component public class StandaloneRateLimitHandler implements FluxgateRateLimitHandler { @@ -25,12 +34,17 @@ public class StandaloneRateLimitHandler implements FluxgateRateLimitHandler { private final RateLimitRuleSetProvider ruleSetProvider; private final RedisRateLimiter rateLimiter; + private final boolean denyWhenRuleMissing; public StandaloneRateLimitHandler( - RateLimitRuleSetProvider ruleSetProvider, RedisRateLimiter rateLimiter) { + RateLimitRuleSetProvider ruleSetProvider, + RedisRateLimiter rateLimiter, + FluxgateProperties properties) { this.ruleSetProvider = ruleSetProvider; this.rateLimiter = rateLimiter; - log.info("StandaloneRateLimitHandler initialized"); + this.denyWhenRuleMissing = properties.getRatelimit().isDenyWhenRuleMissing(); + log.info( + "StandaloneRateLimitHandler initialized (denyWhenRuleMissing={})", denyWhenRuleMissing); } @Override @@ -41,8 +55,13 @@ public RateLimitResponse tryConsume(RequestContext context, String ruleSetId) { // Look up the ruleset from MongoDB Optional ruleSetOpt = ruleSetProvider.findById(ruleSetId); if (ruleSetOpt.isEmpty()) { - log.warn("RuleSet not found: {}, allowing request", ruleSetId); - return RateLimitResponse.allowed(-1, 0); + if (denyWhenRuleMissing) { + log.warn("RuleSet not found: {}, denying request (missing-rule-behavior=DENY)", ruleSetId); + return RateLimitResponse.rejected(0); + } else { + log.warn("RuleSet not found: {}, allowing request (missing-rule-behavior=ALLOW)", ruleSetId); + return RateLimitResponse.allowed(-1, 0); + } } RateLimitRuleSet ruleSet = ruleSetOpt.get(); diff --git a/fluxgate-samples/fluxgate-sample-standalone/src/main/resources/application.yml b/fluxgate-samples/fluxgate-sample-standalone/src/main/resources/application.yml index d9d5daf..32141f2 100644 --- a/fluxgate-samples/fluxgate-sample-standalone/src/main/resources/application.yml +++ b/fluxgate-samples/fluxgate-sample-standalone/src/main/resources/application.yml @@ -26,6 +26,14 @@ fluxgate: metrics: enabled: true + # Rate limiting behavior configuration + ratelimit: + enabled: true + # Behavior when no matching rule is found: ALLOW (default) or DENY + # DENY: Strict mode - reject requests if no rule exists (fail-closed) + # ALLOW: Permissive mode - allow requests if no rule exists + missing-rule-behavior: ALLOW + # Hot reload configuration reload: enabled: true diff --git a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/properties/FluxgateProperties.java b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/properties/FluxgateProperties.java index b6d46ea..bd81ee4 100644 --- a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/properties/FluxgateProperties.java +++ b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/properties/FluxgateProperties.java @@ -316,6 +316,19 @@ public static class RateLimitProperties { /** Default rule set ID to use when no specific rule set is matched. */ private String defaultRuleSetId; + /** + * Behavior when no matching rule set is found. + * + *
    + *
  • ALLOW - Allow the request (default, permissive mode) + *
  • DENY - Deny the request (strict mode, fail-closed) + *
+ * + *

In production environments with strict security requirements, consider setting this to + * DENY to ensure all requests are rate-limited. + */ + private MissingRuleBehavior missingRuleBehavior = MissingRuleBehavior.ALLOW; + /** * Filter order (lower = higher priority). Default is high priority to run before other filters. */ @@ -410,6 +423,31 @@ public boolean isIncludeHeaders() { public void setIncludeHeaders(boolean includeHeaders) { this.includeHeaders = includeHeaders; } + + public MissingRuleBehavior getMissingRuleBehavior() { + return missingRuleBehavior; + } + + public void setMissingRuleBehavior(MissingRuleBehavior missingRuleBehavior) { + this.missingRuleBehavior = missingRuleBehavior; + } + + /** + * Check if requests should be denied when no rule is found. + * + * @return true if missing rules should result in denial + */ + public boolean isDenyWhenRuleMissing() { + return missingRuleBehavior == MissingRuleBehavior.DENY; + } + } + + /** Behavior when no matching rate limit rule is found. */ + public enum MissingRuleBehavior { + /** Allow the request to proceed (permissive mode). */ + ALLOW, + /** Deny the request (strict mode, fail-closed). */ + DENY } /** Metrics configuration for Prometheus/Micrometer integration. */ From bfae83fcde188610fc2276392feb403634f0211b Mon Sep 17 00:00:00 2001 From: rojae Date: Sun, 14 Dec 2025 20:37:03 +0900 Subject: [PATCH 3/5] fix: Added coverage test --- fluxgate-spring-boot-starter/pom.xml | 3 + .../properties/FluxgatePropertiesTest.java | 58 +++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/fluxgate-spring-boot-starter/pom.xml b/fluxgate-spring-boot-starter/pom.xml index 48650e7..3bf9d9a 100644 --- a/fluxgate-spring-boot-starter/pom.xml +++ b/fluxgate-spring-boot-starter/pom.xml @@ -180,9 +180,12 @@ **/reload/cache/** **/reload/strategy/** + **/reload/handler/** **/FluxgateProperties$ReloadProperties.class **/FluxgateProperties$ReloadProperties$*.class + + **/FluxgateProperties$MissingRuleBehavior.class diff --git a/fluxgate-spring-boot-starter/src/test/java/org/fluxgate/spring/properties/FluxgatePropertiesTest.java b/fluxgate-spring-boot-starter/src/test/java/org/fluxgate/spring/properties/FluxgatePropertiesTest.java index 544c1c4..d8d1023 100644 --- a/fluxgate-spring-boot-starter/src/test/java/org/fluxgate/spring/properties/FluxgatePropertiesTest.java +++ b/fluxgate-spring-boot-starter/src/test/java/org/fluxgate/spring/properties/FluxgatePropertiesTest.java @@ -160,6 +160,64 @@ void shouldSetRateLimitProperties() { assertThat(rateLimit.isIncludeHeaders()).isFalse(); } + @Test + void shouldHaveDefaultMissingRuleBehavior() { + FluxgateProperties properties = new FluxgateProperties(); + FluxgateProperties.RateLimitProperties rateLimit = properties.getRatelimit(); + + // Default should be ALLOW + assertThat(rateLimit.getMissingRuleBehavior()) + .isEqualTo(FluxgateProperties.MissingRuleBehavior.ALLOW); + assertThat(rateLimit.isDenyWhenRuleMissing()).isFalse(); + } + + @Test + void shouldSetMissingRuleBehaviorToDeny() { + FluxgateProperties properties = new FluxgateProperties(); + FluxgateProperties.RateLimitProperties rateLimit = properties.getRatelimit(); + + rateLimit.setMissingRuleBehavior(FluxgateProperties.MissingRuleBehavior.DENY); + + assertThat(rateLimit.getMissingRuleBehavior()) + .isEqualTo(FluxgateProperties.MissingRuleBehavior.DENY); + assertThat(rateLimit.isDenyWhenRuleMissing()).isTrue(); + } + + @Test + void shouldSetMissingRuleBehaviorToAllow() { + FluxgateProperties properties = new FluxgateProperties(); + FluxgateProperties.RateLimitProperties rateLimit = properties.getRatelimit(); + + // First set to DENY + rateLimit.setMissingRuleBehavior(FluxgateProperties.MissingRuleBehavior.DENY); + assertThat(rateLimit.isDenyWhenRuleMissing()).isTrue(); + + // Then set back to ALLOW + rateLimit.setMissingRuleBehavior(FluxgateProperties.MissingRuleBehavior.ALLOW); + assertThat(rateLimit.getMissingRuleBehavior()) + .isEqualTo(FluxgateProperties.MissingRuleBehavior.ALLOW); + assertThat(rateLimit.isDenyWhenRuleMissing()).isFalse(); + } + + @Test + void missingRuleBehaviorEnumShouldHaveCorrectValues() { + FluxgateProperties.MissingRuleBehavior[] values = + FluxgateProperties.MissingRuleBehavior.values(); + + assertThat(values).hasSize(2); + assertThat(values).contains( + FluxgateProperties.MissingRuleBehavior.ALLOW, + FluxgateProperties.MissingRuleBehavior.DENY); + } + + @Test + void missingRuleBehaviorEnumValueOf() { + assertThat(FluxgateProperties.MissingRuleBehavior.valueOf("ALLOW")) + .isEqualTo(FluxgateProperties.MissingRuleBehavior.ALLOW); + assertThat(FluxgateProperties.MissingRuleBehavior.valueOf("DENY")) + .isEqualTo(FluxgateProperties.MissingRuleBehavior.DENY); + } + @Test void shouldSetMetricsProperties() { FluxgateProperties properties = new FluxgateProperties(); From d5db3b5f86232533d265b899af289399de7b081a Mon Sep 17 00:00:00 2001 From: rojae Date: Sun, 14 Dec 2025 20:38:47 +0900 Subject: [PATCH 4/5] fix: spotless:apply --- .../org/fluxgate/redis/store/RedisTokenBucketStore.java | 4 ++-- .../standalone/handler/StandaloneRateLimitHandler.java | 3 ++- .../spring/reload/strategy/RedisPubSubReloadStrategy.java | 6 +++--- .../fluxgate/spring/properties/FluxgatePropertiesTest.java | 7 ++++--- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/fluxgate-redis-ratelimiter/src/main/java/org/fluxgate/redis/store/RedisTokenBucketStore.java b/fluxgate-redis-ratelimiter/src/main/java/org/fluxgate/redis/store/RedisTokenBucketStore.java index fb1e8da..e2454b2 100644 --- a/fluxgate-redis-ratelimiter/src/main/java/org/fluxgate/redis/store/RedisTokenBucketStore.java +++ b/fluxgate-redis-ratelimiter/src/main/java/org/fluxgate/redis/store/RedisTokenBucketStore.java @@ -143,8 +143,8 @@ public RedisConnectionProvider.RedisMode getMode() { /** * Deletes all token buckets matching the given ruleSetId pattern. * - *

This is used when rules are changed to reset rate limit state. The pattern matches keys like: - * {@code fluxgate:{ruleSetId}:*} + *

This is used when rules are changed to reset rate limit state. The pattern matches keys + * like: {@code fluxgate:{ruleSetId}:*} * *

Warning: Uses KEYS command which can be slow on large databases. Consider using SCAN in * high-traffic production environments. diff --git a/fluxgate-samples/fluxgate-sample-standalone/src/main/java/org/fluxgate/sample/standalone/handler/StandaloneRateLimitHandler.java b/fluxgate-samples/fluxgate-sample-standalone/src/main/java/org/fluxgate/sample/standalone/handler/StandaloneRateLimitHandler.java index 62bb609..3fc4052 100644 --- a/fluxgate-samples/fluxgate-sample-standalone/src/main/java/org/fluxgate/sample/standalone/handler/StandaloneRateLimitHandler.java +++ b/fluxgate-samples/fluxgate-sample-standalone/src/main/java/org/fluxgate/sample/standalone/handler/StandaloneRateLimitHandler.java @@ -59,7 +59,8 @@ public RateLimitResponse tryConsume(RequestContext context, String ruleSetId) { log.warn("RuleSet not found: {}, denying request (missing-rule-behavior=DENY)", ruleSetId); return RateLimitResponse.rejected(0); } else { - log.warn("RuleSet not found: {}, allowing request (missing-rule-behavior=ALLOW)", ruleSetId); + log.warn( + "RuleSet not found: {}, allowing request (missing-rule-behavior=ALLOW)", ruleSetId); return RateLimitResponse.allowed(-1, 0); } } diff --git a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/RedisPubSubReloadStrategy.java b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/RedisPubSubReloadStrategy.java index 615ada1..8360c94 100644 --- a/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/RedisPubSubReloadStrategy.java +++ b/fluxgate-spring-boot-starter/src/main/java/org/fluxgate/spring/reload/strategy/RedisPubSubReloadStrategy.java @@ -1,14 +1,14 @@ package org.fluxgate.spring.reload.strategy; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import io.lettuce.core.RedisClient; import io.lettuce.core.RedisURI; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.pubsub.RedisPubSubAdapter; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import java.time.Duration; import java.util.Arrays; import java.util.List; diff --git a/fluxgate-spring-boot-starter/src/test/java/org/fluxgate/spring/properties/FluxgatePropertiesTest.java b/fluxgate-spring-boot-starter/src/test/java/org/fluxgate/spring/properties/FluxgatePropertiesTest.java index d8d1023..54d5511 100644 --- a/fluxgate-spring-boot-starter/src/test/java/org/fluxgate/spring/properties/FluxgatePropertiesTest.java +++ b/fluxgate-spring-boot-starter/src/test/java/org/fluxgate/spring/properties/FluxgatePropertiesTest.java @@ -205,9 +205,10 @@ void missingRuleBehaviorEnumShouldHaveCorrectValues() { FluxgateProperties.MissingRuleBehavior.values(); assertThat(values).hasSize(2); - assertThat(values).contains( - FluxgateProperties.MissingRuleBehavior.ALLOW, - FluxgateProperties.MissingRuleBehavior.DENY); + assertThat(values) + .contains( + FluxgateProperties.MissingRuleBehavior.ALLOW, + FluxgateProperties.MissingRuleBehavior.DENY); } @Test From cf80080288e161b2bdf59617df53ead138d0cdc0 Mon Sep 17 00:00:00 2001 From: rojae Date: Sun, 14 Dec 2025 21:13:32 +0900 Subject: [PATCH 5/5] chore: Added `docker` directory for local(dev/test) setup - /docker/elk.yml - /docker/mongo.yml - /docker/redis-standalone.yml - /docker/redis-cluster.yml - /docker/full.yml (integrated docker-compose file `*.yml`) --- .../elk-docker-compose.yml => docker/elk.yml | 0 docker/full.yml | 96 +++++++++++++++++++ docker/mongo.yml | 17 ++++ docker/redis-cluster.yml | 11 +++ docker/redis-standalone.yml | 8 ++ 5 files changed, 132 insertions(+) rename fluxgate-samples/fluxgate-sample-standalone/elk-docker-compose.yml => docker/elk.yml (100%) create mode 100644 docker/full.yml create mode 100644 docker/mongo.yml create mode 100644 docker/redis-cluster.yml create mode 100644 docker/redis-standalone.yml diff --git a/fluxgate-samples/fluxgate-sample-standalone/elk-docker-compose.yml b/docker/elk.yml similarity index 100% rename from fluxgate-samples/fluxgate-sample-standalone/elk-docker-compose.yml rename to docker/elk.yml diff --git a/docker/full.yml b/docker/full.yml new file mode 100644 index 0000000..d71696d --- /dev/null +++ b/docker/full.yml @@ -0,0 +1,96 @@ +version: "3.8" + +# Full FluxGate Development Environment +# Includes: MongoDB, Redis (standalone), ELK Stack +# +# Usage: +# docker-compose -f docker/full.yml up -d +# docker-compose -f docker/full.yml down +# +# Services: +# - MongoDB: localhost:27017 (user: fluxgate, password: fluxgate123) +# - Redis: localhost:6379 +# - Elasticsearch: localhost:9200 +# - Kibana: localhost:5601 +# - Logstash: localhost:5044 (TCP input), localhost:9600 (monitoring) + +services: + # ============================================ + # MongoDB - Rule Storage + # ============================================ + mongo: + image: mongo:7.0 + container_name: fluxgate-mongo + ports: + - "27017:27017" + environment: + MONGO_INITDB_ROOT_USERNAME: fluxgate + MONGO_INITDB_ROOT_PASSWORD: fluxgate123 + MONGO_INITDB_DATABASE: fluxgate + volumes: + - mongo-data:/data/db + + # ============================================ + # Redis Standalone - Rate Limiting + # ============================================ + redis: + image: redis:7.2-alpine + container_name: fluxgate-redis + ports: + - "6379:6379" + + # ============================================ + # ELK Stack - Logging & Monitoring + # ============================================ + elasticsearch: + image: docker.elastic.co/elasticsearch/elasticsearch:8.12.2 + container_name: elk-elasticsearch + environment: + - discovery.type=single-node + - xpack.security.enabled=false + - ES_JAVA_OPTS=-Xms1g -Xmx1g + ports: + - "9200:9200" + volumes: + - es-data:/usr/share/elasticsearch/data + + kibana: + image: docker.elastic.co/kibana/kibana:8.12.2 + container_name: elk-kibana + depends_on: + - elasticsearch + environment: + - ELASTICSEARCH_HOSTS=http://elasticsearch:9200 + ports: + - "5601:5601" + + logstash: + image: docker.elastic.co/logstash/logstash:8.12.2 + container_name: elk-logstash + depends_on: + - elasticsearch + ports: + - "5044:5044" # TCP input + - "9600:9600" # Logstash monitoring API + environment: + - XPACK_MONITORING_ENABLED=false + - PIPELINE_WORKERS=1 + command: > + logstash -e ' + input { + tcp { + port => 5044 + codec => json_lines + } + } + output { + elasticsearch { + hosts => ["http://elasticsearch:9200"] + index => "fluxgate-logs-%{+YYYY.MM.dd}" + } + } + ' + +volumes: + mongo-data: + es-data: diff --git a/docker/mongo.yml b/docker/mongo.yml new file mode 100644 index 0000000..5e2f94e --- /dev/null +++ b/docker/mongo.yml @@ -0,0 +1,17 @@ +version: "3.8" + +services: + mongo: + image: mongo:7.0 + container_name: fluxgate-mongo + ports: + - "27017:27017" + environment: + MONGO_INITDB_ROOT_USERNAME: fluxgate + MONGO_INITDB_ROOT_PASSWORD: fluxgate123 + MONGO_INITDB_DATABASE: fluxgate + volumes: + - mongo-data:/data/db + +volumes: + mongo-data: diff --git a/docker/redis-cluster.yml b/docker/redis-cluster.yml new file mode 100644 index 0000000..453e07b --- /dev/null +++ b/docker/redis-cluster.yml @@ -0,0 +1,11 @@ +version: "3.8" + +services: + redis-cluster: + image: grokzen/redis-cluster:7.0.10 + container_name: redis-cluster-test + environment: + IP: 0.0.0.0 + INITIAL_PORT: 7100 + ports: + - "7100-7105:7100-7105" diff --git a/docker/redis-standalone.yml b/docker/redis-standalone.yml new file mode 100644 index 0000000..6557be6 --- /dev/null +++ b/docker/redis-standalone.yml @@ -0,0 +1,8 @@ +version: "3.8" + +services: + redis: + image: redis:7.2-alpine + container_name: fluxgate-redis + ports: + - "6379:6379"