From b95bed630064b6678328bdfb61c067819b7ca5df Mon Sep 17 00:00:00 2001
From: Ajinkya <109141486+ag060@users.noreply.github.com>
Date: Thu, 16 Jan 2025 12:03:52 +0530
Subject: [PATCH] using local redis instead of centralized redis (#1979)
Earlier we were using local caffine cache and syncing the changes to
centralized redis.
Now we are using local redis only
Assumption is now that we get all the data for particular user key on
one machine.
This will be handled by producer
---
apps/threat-detection/pom.xml | 5 -
.../java/com/akto/threat/detection/Main.java | 8 +-
.../cache/RedisBackedCounterCache.java | 126 ------------------
.../detection/cache/RedisCounterCache.java | 64 +++++++++
.../tasks/MaliciousTrafficDetectorTask.java | 4 +-
5 files changed, 71 insertions(+), 136 deletions(-)
delete mode 100644 apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisBackedCounterCache.java
create mode 100644 apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisCounterCache.java
diff --git a/apps/threat-detection/pom.xml b/apps/threat-detection/pom.xml
index bd143c2dc5..e5e5cd3e2c 100644
--- a/apps/threat-detection/pom.xml
+++ b/apps/threat-detection/pom.xml
@@ -95,11 +95,6 @@
6.4.0.RELEASE
-
- com.github.ben-manes.caffeine
- caffeine
- 2.9.3
-
com.fasterxml.jackson.core
jackson-databind
diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java
index 0ea77e9470..c5012eaec3 100644
--- a/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java
+++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java
@@ -57,7 +57,9 @@ public static void main(String[] args) {
.setValueSerializer(Serializer.BYTE_ARRAY)
.build();
- new MaliciousTrafficDetectorTask(trafficKafka, internalKafka, createRedisClient()).run();
+ RedisClient localRedis = createLocalRedisClient();
+
+ new MaliciousTrafficDetectorTask(trafficKafka, internalKafka, localRedis).run();
new FlushSampleDataTask(
sessionFactory, internalKafka, KafkaTopic.ThreatDetection.MALICIOUS_EVENTS)
.run();
@@ -67,8 +69,8 @@ public static void main(String[] args) {
new CleanupTask(sessionFactory).run();
}
- public static RedisClient createRedisClient() {
- return RedisClient.create(System.getenv("AKTO_THREAT_DETECTION_REDIS_URI"));
+ public static RedisClient createLocalRedisClient() {
+ return RedisClient.create("redis://127.0.0.1:6379");
}
public static void runMigrations() {
diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisBackedCounterCache.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisBackedCounterCache.java
deleted file mode 100644
index 2182be09c6..0000000000
--- a/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisBackedCounterCache.java
+++ /dev/null
@@ -1,126 +0,0 @@
-package com.akto.threat.detection.cache;
-
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import io.lettuce.core.ExpireArgs;
-import io.lettuce.core.RedisClient;
-import io.lettuce.core.api.StatefulRedisConnection;
-import java.util.HashSet;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.*;
-
-public class RedisBackedCounterCache implements CounterCache {
-
- static class Op {
- private final String key;
- private final long value;
-
- public Op(String key, long value) {
- this.key = key;
- this.value = value;
- }
-
- public String getKey() {
- return key;
- }
-
- public long getValue() {
- return value;
- }
- }
-
- private final StatefulRedisConnection redis;
-
- private final Cache localCache;
-
- private final ConcurrentLinkedQueue pendingIncOps;
- private final ConcurrentMap deletedKeys;
- private final String prefix;
-
- public RedisBackedCounterCache(RedisClient redisClient, String prefix) {
- this.prefix = prefix;
- this.redis = redisClient.connect(new LongValueCodec());
- this.localCache = Caffeine.newBuilder().maximumSize(100000).expireAfterWrite(3, TimeUnit.HOURS).build();
-
- ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
- executor.scheduleAtFixedRate(this::syncToRedis, 60, 5, TimeUnit.SECONDS);
-
- this.pendingIncOps = new ConcurrentLinkedQueue<>();
- this.deletedKeys = new ConcurrentHashMap<>();
- }
-
- private String addPrefixToKey(String key) {
- return new StringBuilder().append(prefix).append("|").append(key).toString();
- }
-
- @Override
- public void increment(String key) {
- incrementBy(key, 1);
- }
-
- @Override
- public void incrementBy(String key, long val) {
- String _key = addPrefixToKey(key);
- localCache.asMap().merge(_key, val, Long::sum);
- pendingIncOps.add(new Op(_key, val));
- }
-
- @Override
- public long get(String key) {
- return Optional.ofNullable(this.localCache.getIfPresent(addPrefixToKey(key))).orElse(0L);
- }
-
- @Override
- public boolean exists(String key) {
- return localCache.asMap().containsKey(addPrefixToKey(key));
- }
-
- @Override
- public void clear(String key) {
- String _key = addPrefixToKey(key);
- localCache.invalidate(_key);
- this.deletedKeys.put(_key, true);
- redis.async().del(_key);
- }
-
- private void setExpiryIfNotSet(String key, long seconds) {
- // We only set expiry for redis entry. For local cache we have lower expiry for
- // all entries.
- ExpireArgs args = ExpireArgs.Builder.nx();
- redis.async().expire(addPrefixToKey(key), seconds, args);
- }
-
- private void syncToRedis() {
- Set _keys = new HashSet<>();
- while (!pendingIncOps.isEmpty()) {
- Op op = pendingIncOps.poll();
- String key = op.getKey();
- long val = op.getValue();
-
- if (this.deletedKeys.containsKey(key)) {
- continue;
- }
-
- redis
- .async()
- .incrby(key, val)
- .whenComplete(
- (result, ex) -> {
- if (ex != null) {
- ex.printStackTrace();
- }
-
- _keys.add(key);
-
- if (result != null) {
- localCache.asMap().put(key, result);
- }
- });
- }
-
- _keys.forEach(key -> setExpiryIfNotSet(key, 3 * 60 * 60));
-
- this.deletedKeys.clear();
- }
-}
diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisCounterCache.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisCounterCache.java
new file mode 100644
index 0000000000..63d20312d8
--- /dev/null
+++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisCounterCache.java
@@ -0,0 +1,64 @@
+package com.akto.threat.detection.cache;
+
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.api.StatefulRedisConnection;
+
+public class RedisCounterCache implements CounterCache {
+
+ static class Op {
+ private final String key;
+ private final long value;
+
+ public Op(String key, long value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public long getValue() {
+ return value;
+ }
+ }
+
+ private final StatefulRedisConnection redis;
+
+ private final String prefix;
+
+ public RedisCounterCache(RedisClient redisClient, String prefix) {
+ this.prefix = prefix;
+ this.redis = redisClient.connect(new LongValueCodec());
+ }
+
+ private String addPrefixToKey(String key) {
+ return new StringBuilder().append(prefix).append("|").append(key).toString();
+ }
+
+ @Override
+ public void increment(String key) {
+ incrementBy(key, 1);
+ }
+
+ @Override
+ public void incrementBy(String key, long val) {
+ redis.async().incrby(addPrefixToKey(key), val);
+ }
+
+ @Override
+ public long get(String key) {
+ return redis.sync().get(addPrefixToKey(key));
+ }
+
+ @Override
+ public boolean exists(String key) {
+ return redis.sync().exists(addPrefixToKey(key)) > 0;
+ }
+
+ @Override
+ public void clear(String key) {
+ redis.async().del(addPrefixToKey(key));
+ }
+
+}
diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java
index 37672bd831..61e6feaefb 100644
--- a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java
+++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java
@@ -26,7 +26,7 @@
import com.akto.test_editor.execution.VariableResolver;
import com.akto.test_editor.filter.data_operands_impl.ValidationResult;
import com.akto.threat.detection.actor.SourceIPActorGenerator;
-import com.akto.threat.detection.cache.RedisBackedCounterCache;
+import com.akto.threat.detection.cache.RedisCounterCache;
import com.akto.threat.detection.constants.KafkaTopic;
import com.akto.threat.detection.kafka.KafkaProtoProducer;
import com.akto.threat.detection.smart_event_detector.window_based.WindowBasedThresholdNotifier;
@@ -82,7 +82,7 @@ public MaliciousTrafficDetectorTask(
this.windowBasedThresholdNotifier =
new WindowBasedThresholdNotifier(
- new RedisBackedCounterCache(redisClient, "wbt"),
+ new RedisCounterCache(redisClient, "wbt"),
new WindowBasedThresholdNotifier.Config(100, 10 * 60));
this.internalKafka = new KafkaProtoProducer(internalConfig);