Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

using redis local #1979

Merged
merged 1 commit into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions apps/threat-detection/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@
<version>6.4.0.RELEASE</version>
</dependency>

<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ public static void main(String[] args) {
.setValueSerializer(Serializer.BYTE_ARRAY)
.build();

new MaliciousTrafficDetectorTask(trafficKafka, internalKafka, createRedisClient()).run();
RedisClient localRedis = createLocalRedisClient();

new MaliciousTrafficDetectorTask(trafficKafka, internalKafka, localRedis).run();
new FlushSampleDataTask(
sessionFactory, internalKafka, KafkaTopic.ThreatDetection.MALICIOUS_EVENTS)
.run();
Expand All @@ -67,8 +69,8 @@ public static void main(String[] args) {
new CleanupTask(sessionFactory).run();
}

public static RedisClient createRedisClient() {
return RedisClient.create(System.getenv("AKTO_THREAT_DETECTION_REDIS_URI"));
public static RedisClient createLocalRedisClient() {
return RedisClient.create("redis://127.0.0.1:6379");
}

public static void runMigrations() {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.akto.threat.detection.cache;

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;

public class RedisCounterCache implements CounterCache {

static class Op {
private final String key;
private final long value;

public Op(String key, long value) {
this.key = key;
this.value = value;
}

public String getKey() {
return key;
}

public long getValue() {
return value;
}
}

private final StatefulRedisConnection<String, Long> redis;

private final String prefix;

public RedisCounterCache(RedisClient redisClient, String prefix) {
this.prefix = prefix;
this.redis = redisClient.connect(new LongValueCodec());
}

private String addPrefixToKey(String key) {
return new StringBuilder().append(prefix).append("|").append(key).toString();
}

@Override
public void increment(String key) {
incrementBy(key, 1);
}

@Override
public void incrementBy(String key, long val) {
redis.async().incrby(addPrefixToKey(key), val);
}

@Override
public long get(String key) {
return redis.sync().get(addPrefixToKey(key));
}

@Override
public boolean exists(String key) {
return redis.sync().exists(addPrefixToKey(key)) > 0;
}

@Override
public void clear(String key) {
redis.async().del(addPrefixToKey(key));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import com.akto.test_editor.execution.VariableResolver;
import com.akto.test_editor.filter.data_operands_impl.ValidationResult;
import com.akto.threat.detection.actor.SourceIPActorGenerator;
import com.akto.threat.detection.cache.RedisBackedCounterCache;
import com.akto.threat.detection.cache.RedisCounterCache;
import com.akto.threat.detection.constants.KafkaTopic;
import com.akto.threat.detection.kafka.KafkaProtoProducer;
import com.akto.threat.detection.smart_event_detector.window_based.WindowBasedThresholdNotifier;
Expand Down Expand Up @@ -82,7 +82,7 @@ public MaliciousTrafficDetectorTask(

this.windowBasedThresholdNotifier =
new WindowBasedThresholdNotifier(
new RedisBackedCounterCache(redisClient, "wbt"),
new RedisCounterCache(redisClient, "wbt"),
new WindowBasedThresholdNotifier.Config(100, 10 * 60));

this.internalKafka = new KafkaProtoProducer(internalConfig);
Expand Down
Loading