Skip to content

Commit

Permalink
using local redis instead of centralized redis (#1979)
Browse files Browse the repository at this point in the history
Earlier we were using local caffine cache and syncing the changes to
centralized redis.
Now we are using local redis only
Assumption is now that we get all the data for particular user key on
one machine.
This will be handled by producer
  • Loading branch information
ag060 committed Jan 16, 2025
1 parent fe90061 commit 56ce22a
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 137 deletions.
5 changes: 0 additions & 5 deletions apps/threat-detection/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@
<version>6.4.0.RELEASE</version>
</dependency>

<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ public static void main(String[] args) {
.setValueSerializer(Serializer.BYTE_ARRAY)
.build();

new MaliciousTrafficDetectorTask(trafficKafka, internalKafka, createRedisClient()).run();
RedisClient localRedis = createLocalRedisClient();

new MaliciousTrafficDetectorTask(trafficKafka, internalKafka, localRedis).run();
new FlushSampleDataTask(
sessionFactory, internalKafka, KafkaTopic.ThreatDetection.MALICIOUS_EVENTS)
.run();
Expand All @@ -67,8 +69,8 @@ public static void main(String[] args) {
new CleanupTask(sessionFactory).run();
}

public static RedisClient createRedisClient() {
return RedisClient.create(System.getenv("AKTO_THREAT_DETECTION_REDIS_URI"));
public static RedisClient createLocalRedisClient() {
return RedisClient.create(System.getenv("AKTO_THREAT_DETECTION_LOCAL_REDIS_URI"));
}

public static void runMigrations() {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.akto.threat.detection.cache;

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;

public class RedisCounterCache implements CounterCache {

static class Op {
private final String key;
private final long value;

public Op(String key, long value) {
this.key = key;
this.value = value;
}

public String getKey() {
return key;
}

public long getValue() {
return value;
}
}

private final StatefulRedisConnection<String, Long> redis;

private final String prefix;

public RedisCounterCache(RedisClient redisClient, String prefix) {
this.prefix = prefix;
this.redis = redisClient.connect(new LongValueCodec());
}

private String addPrefixToKey(String key) {
return new StringBuilder().append(prefix).append("|").append(key).toString();
}

@Override
public void increment(String key) {
incrementBy(key, 1);
}

@Override
public void incrementBy(String key, long val) {
redis.async().incrby(addPrefixToKey(key), val);
}

@Override
public long get(String key) {
return redis.sync().get(addPrefixToKey(key));
}

@Override
public boolean exists(String key) {
return redis.sync().exists(addPrefixToKey(key)) > 0;
}

@Override
public void clear(String key) {
redis.async().del(addPrefixToKey(key));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import com.akto.test_editor.execution.VariableResolver;
import com.akto.test_editor.filter.data_operands_impl.ValidationResult;
import com.akto.threat.detection.actor.SourceIPActorGenerator;
import com.akto.threat.detection.cache.RedisBackedCounterCache;
import com.akto.threat.detection.cache.RedisCounterCache;
import com.akto.threat.detection.constants.KafkaTopic;
import com.akto.threat.detection.kafka.KafkaProtoProducer;
import com.akto.threat.detection.smart_event_detector.window_based.WindowBasedThresholdNotifier;
Expand Down Expand Up @@ -82,14 +82,14 @@ public MaliciousTrafficDetectorTask(

this.windowBasedThresholdNotifier =
new WindowBasedThresholdNotifier(
new RedisBackedCounterCache(redisClient, "wbt"),
new RedisCounterCache(redisClient, "wbt"),
new WindowBasedThresholdNotifier.Config(100, 10 * 60));

this.internalKafka = new KafkaProtoProducer(internalConfig);
}

public void run() {
this.kafkaConsumer.subscribe(Collections.singletonList("akto.api.logs"));
this.kafkaConsumer.subscribe(Collections.singletonList("akto.api.logs2"));
ExecutorService pollingExecutor = Executors.newSingleThreadExecutor();
pollingExecutor.execute(
() -> {
Expand Down

0 comments on commit 56ce22a

Please sign in to comment.