diff --git a/apps/threat-detection/pom.xml b/apps/threat-detection/pom.xml
index bd143c2dc5..e5e5cd3e2c 100644
--- a/apps/threat-detection/pom.xml
+++ b/apps/threat-detection/pom.xml
@@ -95,11 +95,6 @@
6.4.0.RELEASE
-
- com.github.ben-manes.caffeine
- caffeine
- 2.9.3
-
com.fasterxml.jackson.core
jackson-databind
diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java
index 0ea77e9470..c5012eaec3 100644
--- a/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java
+++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java
@@ -57,7 +57,9 @@ public static void main(String[] args) {
.setValueSerializer(Serializer.BYTE_ARRAY)
.build();
- new MaliciousTrafficDetectorTask(trafficKafka, internalKafka, createRedisClient()).run();
+ RedisClient localRedis = createLocalRedisClient();
+
+ new MaliciousTrafficDetectorTask(trafficKafka, internalKafka, localRedis).run();
new FlushSampleDataTask(
sessionFactory, internalKafka, KafkaTopic.ThreatDetection.MALICIOUS_EVENTS)
.run();
@@ -67,8 +69,8 @@ public static void main(String[] args) {
new CleanupTask(sessionFactory).run();
}
- public static RedisClient createRedisClient() {
- return RedisClient.create(System.getenv("AKTO_THREAT_DETECTION_REDIS_URI"));
+ public static RedisClient createLocalRedisClient() {
+ return RedisClient.create("redis://127.0.0.1:6379");
}
public static void runMigrations() {
diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisBackedCounterCache.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisBackedCounterCache.java
deleted file mode 100644
index 2182be09c6..0000000000
--- a/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisBackedCounterCache.java
+++ /dev/null
@@ -1,126 +0,0 @@
-package com.akto.threat.detection.cache;
-
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import io.lettuce.core.ExpireArgs;
-import io.lettuce.core.RedisClient;
-import io.lettuce.core.api.StatefulRedisConnection;
-import java.util.HashSet;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.*;
-
-public class RedisBackedCounterCache implements CounterCache {
-
- static class Op {
- private final String key;
- private final long value;
-
- public Op(String key, long value) {
- this.key = key;
- this.value = value;
- }
-
- public String getKey() {
- return key;
- }
-
- public long getValue() {
- return value;
- }
- }
-
- private final StatefulRedisConnection redis;
-
- private final Cache localCache;
-
- private final ConcurrentLinkedQueue pendingIncOps;
- private final ConcurrentMap deletedKeys;
- private final String prefix;
-
- public RedisBackedCounterCache(RedisClient redisClient, String prefix) {
- this.prefix = prefix;
- this.redis = redisClient.connect(new LongValueCodec());
- this.localCache = Caffeine.newBuilder().maximumSize(100000).expireAfterWrite(3, TimeUnit.HOURS).build();
-
- ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
- executor.scheduleAtFixedRate(this::syncToRedis, 60, 5, TimeUnit.SECONDS);
-
- this.pendingIncOps = new ConcurrentLinkedQueue<>();
- this.deletedKeys = new ConcurrentHashMap<>();
- }
-
- private String addPrefixToKey(String key) {
- return new StringBuilder().append(prefix).append("|").append(key).toString();
- }
-
- @Override
- public void increment(String key) {
- incrementBy(key, 1);
- }
-
- @Override
- public void incrementBy(String key, long val) {
- String _key = addPrefixToKey(key);
- localCache.asMap().merge(_key, val, Long::sum);
- pendingIncOps.add(new Op(_key, val));
- }
-
- @Override
- public long get(String key) {
- return Optional.ofNullable(this.localCache.getIfPresent(addPrefixToKey(key))).orElse(0L);
- }
-
- @Override
- public boolean exists(String key) {
- return localCache.asMap().containsKey(addPrefixToKey(key));
- }
-
- @Override
- public void clear(String key) {
- String _key = addPrefixToKey(key);
- localCache.invalidate(_key);
- this.deletedKeys.put(_key, true);
- redis.async().del(_key);
- }
-
- private void setExpiryIfNotSet(String key, long seconds) {
- // We only set expiry for redis entry. For local cache we have lower expiry for
- // all entries.
- ExpireArgs args = ExpireArgs.Builder.nx();
- redis.async().expire(addPrefixToKey(key), seconds, args);
- }
-
- private void syncToRedis() {
- Set _keys = new HashSet<>();
- while (!pendingIncOps.isEmpty()) {
- Op op = pendingIncOps.poll();
- String key = op.getKey();
- long val = op.getValue();
-
- if (this.deletedKeys.containsKey(key)) {
- continue;
- }
-
- redis
- .async()
- .incrby(key, val)
- .whenComplete(
- (result, ex) -> {
- if (ex != null) {
- ex.printStackTrace();
- }
-
- _keys.add(key);
-
- if (result != null) {
- localCache.asMap().put(key, result);
- }
- });
- }
-
- _keys.forEach(key -> setExpiryIfNotSet(key, 3 * 60 * 60));
-
- this.deletedKeys.clear();
- }
-}
diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisCounterCache.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisCounterCache.java
new file mode 100644
index 0000000000..63d20312d8
--- /dev/null
+++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisCounterCache.java
@@ -0,0 +1,64 @@
+package com.akto.threat.detection.cache;
+
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.api.StatefulRedisConnection;
+
+public class RedisCounterCache implements CounterCache {
+
+ static class Op {
+ private final String key;
+ private final long value;
+
+ public Op(String key, long value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public long getValue() {
+ return value;
+ }
+ }
+
+ private final StatefulRedisConnection redis;
+
+ private final String prefix;
+
+ public RedisCounterCache(RedisClient redisClient, String prefix) {
+ this.prefix = prefix;
+ this.redis = redisClient.connect(new LongValueCodec());
+ }
+
+ private String addPrefixToKey(String key) {
+ return new StringBuilder().append(prefix).append("|").append(key).toString();
+ }
+
+ @Override
+ public void increment(String key) {
+ incrementBy(key, 1);
+ }
+
+ @Override
+ public void incrementBy(String key, long val) {
+ redis.async().incrby(addPrefixToKey(key), val);
+ }
+
+ @Override
+ public long get(String key) {
+ return redis.sync().get(addPrefixToKey(key));
+ }
+
+ @Override
+ public boolean exists(String key) {
+ return redis.sync().exists(addPrefixToKey(key)) > 0;
+ }
+
+ @Override
+ public void clear(String key) {
+ redis.async().del(addPrefixToKey(key));
+ }
+
+}
diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java
index 37672bd831..61e6feaefb 100644
--- a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java
+++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java
@@ -26,7 +26,7 @@
import com.akto.test_editor.execution.VariableResolver;
import com.akto.test_editor.filter.data_operands_impl.ValidationResult;
import com.akto.threat.detection.actor.SourceIPActorGenerator;
-import com.akto.threat.detection.cache.RedisBackedCounterCache;
+import com.akto.threat.detection.cache.RedisCounterCache;
import com.akto.threat.detection.constants.KafkaTopic;
import com.akto.threat.detection.kafka.KafkaProtoProducer;
import com.akto.threat.detection.smart_event_detector.window_based.WindowBasedThresholdNotifier;
@@ -82,7 +82,7 @@ public MaliciousTrafficDetectorTask(
this.windowBasedThresholdNotifier =
new WindowBasedThresholdNotifier(
- new RedisBackedCounterCache(redisClient, "wbt"),
+ new RedisCounterCache(redisClient, "wbt"),
new WindowBasedThresholdNotifier.Config(100, 10 * 60));
this.internalKafka = new KafkaProtoProducer(internalConfig);