diff --git a/apps/threat-detection/pom.xml b/apps/threat-detection/pom.xml index bd143c2dc5..e5e5cd3e2c 100644 --- a/apps/threat-detection/pom.xml +++ b/apps/threat-detection/pom.xml @@ -95,11 +95,6 @@ 6.4.0.RELEASE - - com.github.ben-manes.caffeine - caffeine - 2.9.3 - com.fasterxml.jackson.core jackson-databind diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java index 0ea77e9470..888b2eba3b 100644 --- a/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/Main.java @@ -57,7 +57,9 @@ public static void main(String[] args) { .setValueSerializer(Serializer.BYTE_ARRAY) .build(); - new MaliciousTrafficDetectorTask(trafficKafka, internalKafka, createRedisClient()).run(); + RedisClient localRedis = createLocalRedisClient(); + + new MaliciousTrafficDetectorTask(trafficKafka, internalKafka, localRedis).run(); new FlushSampleDataTask( sessionFactory, internalKafka, KafkaTopic.ThreatDetection.MALICIOUS_EVENTS) .run(); @@ -67,8 +69,8 @@ public static void main(String[] args) { new CleanupTask(sessionFactory).run(); } - public static RedisClient createRedisClient() { - return RedisClient.create(System.getenv("AKTO_THREAT_DETECTION_REDIS_URI")); + public static RedisClient createLocalRedisClient() { + return RedisClient.create(System.getenv("AKTO_THREAT_DETECTION_LOCAL_REDIS_URI")); } public static void runMigrations() { diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisBackedCounterCache.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisBackedCounterCache.java deleted file mode 100644 index 2182be09c6..0000000000 --- a/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisBackedCounterCache.java +++ /dev/null @@ -1,126 +0,0 @@ -package com.akto.threat.detection.cache; - -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import io.lettuce.core.ExpireArgs; -import io.lettuce.core.RedisClient; -import io.lettuce.core.api.StatefulRedisConnection; -import java.util.HashSet; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.*; - -public class RedisBackedCounterCache implements CounterCache { - - static class Op { - private final String key; - private final long value; - - public Op(String key, long value) { - this.key = key; - this.value = value; - } - - public String getKey() { - return key; - } - - public long getValue() { - return value; - } - } - - private final StatefulRedisConnection redis; - - private final Cache localCache; - - private final ConcurrentLinkedQueue pendingIncOps; - private final ConcurrentMap deletedKeys; - private final String prefix; - - public RedisBackedCounterCache(RedisClient redisClient, String prefix) { - this.prefix = prefix; - this.redis = redisClient.connect(new LongValueCodec()); - this.localCache = Caffeine.newBuilder().maximumSize(100000).expireAfterWrite(3, TimeUnit.HOURS).build(); - - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - executor.scheduleAtFixedRate(this::syncToRedis, 60, 5, TimeUnit.SECONDS); - - this.pendingIncOps = new ConcurrentLinkedQueue<>(); - this.deletedKeys = new ConcurrentHashMap<>(); - } - - private String addPrefixToKey(String key) { - return new StringBuilder().append(prefix).append("|").append(key).toString(); - } - - @Override - public void increment(String key) { - incrementBy(key, 1); - } - - @Override - public void incrementBy(String key, long val) { - String _key = addPrefixToKey(key); - localCache.asMap().merge(_key, val, Long::sum); - pendingIncOps.add(new Op(_key, val)); - } - - @Override - public long get(String key) { - return Optional.ofNullable(this.localCache.getIfPresent(addPrefixToKey(key))).orElse(0L); - } - - @Override - public boolean exists(String key) { - return localCache.asMap().containsKey(addPrefixToKey(key)); - } - - @Override - public void clear(String key) { - String _key = addPrefixToKey(key); - localCache.invalidate(_key); - this.deletedKeys.put(_key, true); - redis.async().del(_key); - } - - private void setExpiryIfNotSet(String key, long seconds) { - // We only set expiry for redis entry. For local cache we have lower expiry for - // all entries. - ExpireArgs args = ExpireArgs.Builder.nx(); - redis.async().expire(addPrefixToKey(key), seconds, args); - } - - private void syncToRedis() { - Set _keys = new HashSet<>(); - while (!pendingIncOps.isEmpty()) { - Op op = pendingIncOps.poll(); - String key = op.getKey(); - long val = op.getValue(); - - if (this.deletedKeys.containsKey(key)) { - continue; - } - - redis - .async() - .incrby(key, val) - .whenComplete( - (result, ex) -> { - if (ex != null) { - ex.printStackTrace(); - } - - _keys.add(key); - - if (result != null) { - localCache.asMap().put(key, result); - } - }); - } - - _keys.forEach(key -> setExpiryIfNotSet(key, 3 * 60 * 60)); - - this.deletedKeys.clear(); - } -} diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisCounterCache.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisCounterCache.java new file mode 100644 index 0000000000..63d20312d8 --- /dev/null +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisCounterCache.java @@ -0,0 +1,64 @@ +package com.akto.threat.detection.cache; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.api.StatefulRedisConnection; + +public class RedisCounterCache implements CounterCache { + + static class Op { + private final String key; + private final long value; + + public Op(String key, long value) { + this.key = key; + this.value = value; + } + + public String getKey() { + return key; + } + + public long getValue() { + return value; + } + } + + private final StatefulRedisConnection redis; + + private final String prefix; + + public RedisCounterCache(RedisClient redisClient, String prefix) { + this.prefix = prefix; + this.redis = redisClient.connect(new LongValueCodec()); + } + + private String addPrefixToKey(String key) { + return new StringBuilder().append(prefix).append("|").append(key).toString(); + } + + @Override + public void increment(String key) { + incrementBy(key, 1); + } + + @Override + public void incrementBy(String key, long val) { + redis.async().incrby(addPrefixToKey(key), val); + } + + @Override + public long get(String key) { + return redis.sync().get(addPrefixToKey(key)); + } + + @Override + public boolean exists(String key) { + return redis.sync().exists(addPrefixToKey(key)) > 0; + } + + @Override + public void clear(String key) { + redis.async().del(addPrefixToKey(key)); + } + +} diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java index 37672bd831..8905338ab1 100644 --- a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java @@ -26,7 +26,7 @@ import com.akto.test_editor.execution.VariableResolver; import com.akto.test_editor.filter.data_operands_impl.ValidationResult; import com.akto.threat.detection.actor.SourceIPActorGenerator; -import com.akto.threat.detection.cache.RedisBackedCounterCache; +import com.akto.threat.detection.cache.RedisCounterCache; import com.akto.threat.detection.constants.KafkaTopic; import com.akto.threat.detection.kafka.KafkaProtoProducer; import com.akto.threat.detection.smart_event_detector.window_based.WindowBasedThresholdNotifier; @@ -82,14 +82,14 @@ public MaliciousTrafficDetectorTask( this.windowBasedThresholdNotifier = new WindowBasedThresholdNotifier( - new RedisBackedCounterCache(redisClient, "wbt"), + new RedisCounterCache(redisClient, "wbt"), new WindowBasedThresholdNotifier.Config(100, 10 * 60)); this.internalKafka = new KafkaProtoProducer(internalConfig); } public void run() { - this.kafkaConsumer.subscribe(Collections.singletonList("akto.api.logs")); + this.kafkaConsumer.subscribe(Collections.singletonList("akto.api.logs2")); ExecutorService pollingExecutor = Executors.newSingleThreadExecutor(); pollingExecutor.execute( () -> {