From aa4e4a19206f95bf36341ba61ede8df7cf4319a9 Mon Sep 17 00:00:00 2001 From: Ankush Jain Date: Fri, 18 Apr 2025 12:20:35 -0700 Subject: [PATCH 1/7] add service to service graph --- .../java/com/akto/runtime/APICatalogSync.java | 18 ++ .../akto/hybrid_runtime/APICatalogSync.java | 12 + libs/dao/pom.xml | 6 + .../akto/dao/graph/SvcToSvcGraphEdgesDao.java | 22 ++ .../akto/dao/graph/SvcToSvcGraphNodesDao.java | 22 ++ .../java/com/akto/dto/HttpResponseParams.java | 17 +- .../dto/graph/K8sDaemonsetGraphParams.java | 26 ++ .../com/akto/dto/graph/SvcToSvcGraph.java | 14 ++ .../com/akto/dto/graph/SvcToSvcGraphEdge.java | 36 +++ .../com/akto/dto/graph/SvcToSvcGraphNode.java | 32 +++ .../akto/dto/graph/SvcToSvcGraphParams.java | 10 + libs/utils/pom.xml | 6 + .../java/com/akto/data_actor/ClientActor.java | 113 +++++++++ .../java/com/akto/data_actor/DataActor.java | 64 +++++ .../java/com/akto/data_actor/DbActor.java | 22 ++ .../java/com/akto/data_actor/DbLayer.java | 55 ++++- .../akto/runtime/SvcToSvcGraphManager.java | 229 ++++++++++++++++++ .../com/akto/runtime/parser/SampleParser.java | 17 +- 18 files changed, 713 insertions(+), 8 deletions(-) create mode 100644 libs/dao/src/main/java/com/akto/dao/graph/SvcToSvcGraphEdgesDao.java create mode 100644 libs/dao/src/main/java/com/akto/dao/graph/SvcToSvcGraphNodesDao.java create mode 100644 libs/dao/src/main/java/com/akto/dto/graph/K8sDaemonsetGraphParams.java create mode 100644 libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraph.java create mode 100644 libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraphEdge.java create mode 100644 libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraphNode.java create mode 100644 libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraphParams.java create mode 100644 libs/utils/src/main/java/com/akto/runtime/SvcToSvcGraphManager.java diff --git a/apps/api-runtime/src/main/java/com/akto/runtime/APICatalogSync.java b/apps/api-runtime/src/main/java/com/akto/runtime/APICatalogSync.java index 83203ddef8..dcf77f75d1 100644 --- a/apps/api-runtime/src/main/java/com/akto/runtime/APICatalogSync.java +++ b/apps/api-runtime/src/main/java/com/akto/runtime/APICatalogSync.java @@ -8,6 +8,9 @@ import com.akto.dao.context.Context; import com.akto.dao.monitoring.FilterYamlTemplateDao; import com.akto.dao.runtime_filters.AdvancedTrafficFiltersDao; +import com.akto.data_actor.DataActor; +import com.akto.data_actor.DataActorFactory; +import com.akto.data_actor.DbActor; import com.akto.dao.filter.MergedUrlsDao; import com.akto.dto.*; import com.akto.dto.billing.SyncLimit; @@ -27,6 +30,8 @@ import com.akto.log.LoggerMaker; import com.akto.log.LoggerMaker.LogDb; import com.akto.util.filter.DictionaryFilter; +import com.akto.runtime.APICatalogSync.ApiMergerResult; +import com.akto.runtime.APICatalogSync.DbUpdateReturn; import com.akto.runtime.merge.MergeOnHostOnly; import com.akto.runtime.policies.AktoPolicyNew; import com.akto.task.Cluster; @@ -66,6 +71,8 @@ public class APICatalogSync { public Map dbState; public Map delta; public AktoPolicyNew aktoPolicyNew; + public SvcToSvcGraphManager svcToSvcGraphManager = null; + public DataActor dataActor = DataActorFactory.fetchInstance(); public Map sensitiveParamInfoBooleanMap; public static boolean mergeAsyncOutside = true; public BloomFilter existingAPIsInDb = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), 1_000_000, 0.001 ); @@ -88,6 +95,7 @@ public APICatalogSync(String userIdentifier, int thresh, boolean fetchAllSTI, bo mergedUrls = new HashSet<>(); if (buildFromDb) { buildFromDB(false, fetchAllSTI); + this.svcToSvcGraphManager = SvcToSvcGraphManager.createFromEdgesAndNodes(dataActor); } } @@ -214,6 +222,9 @@ public void computeDelta(URLAggregator origAggregator, boolean triggerTemplateGe for (HttpResponseParams responseParams: value) { try { aktoPolicyNew.process(responseParams); + if (svcToSvcGraphManager != null) { + svcToSvcGraphManager.processRecord(responseParams.getSvcToSvcGraphParams()); + } } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); @@ -1890,7 +1901,14 @@ public void syncWithDB(boolean syncImmediately, boolean fetchAllSTI, SyncLimit s } loggerMaker.infoAndAddToDb("starting build from db inside syncWithDb", LogDb.RUNTIME); + buildFromDB(true, fetchAllSTI); + if (svcToSvcGraphManager != null){ + svcToSvcGraphManager.updateWithNewDataAndReturnDelta(dataActor); + if (svcToSvcGraphManager.getLastFetchFromDb() < Context.now() - 6 * 60 * 60) { + svcToSvcGraphManager = SvcToSvcGraphManager.createFromEdgesAndNodes(dataActor); + } + } loggerMaker.infoAndAddToDb("Finished syncing with db", LogDb.RUNTIME); } diff --git a/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/APICatalogSync.java b/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/APICatalogSync.java index b82a5b8e93..8ea7785691 100644 --- a/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/APICatalogSync.java +++ b/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/APICatalogSync.java @@ -29,6 +29,7 @@ import com.akto.log.LoggerMaker; import com.akto.log.LoggerMaker.LogDb; import com.akto.metrics.AllMetrics; +import com.akto.runtime.SvcToSvcGraphManager; import com.akto.data_actor.DataActor; import com.akto.data_actor.DataActorFactory; import com.akto.hybrid_runtime.policies.AktoPolicyNew; @@ -61,6 +62,7 @@ public class APICatalogSync { public Map dbState; public Map delta; public AktoPolicyNew aktoPolicyNew; + public SvcToSvcGraphManager svcToSvcGraphManager = null; public Map sensitiveParamInfoBooleanMap; public static boolean mergeAsyncOutside = true; public int lastStiFetchTs = 0; @@ -86,6 +88,7 @@ public APICatalogSync(String userIdentifier, int thresh, boolean fetchAllSTI, bo this.mergedUrls = new HashSet<>(); if (buildFromDb) { buildFromDB(false, fetchAllSTI); + this.svcToSvcGraphManager = SvcToSvcGraphManager.createFromEdgesAndNodes(dataActor); } } @@ -212,6 +215,9 @@ public void computeDelta(URLAggregator origAggregator, boolean triggerTemplateGe for (HttpResponseParams responseParams: value) { try { aktoPolicyNew.process(responseParams); + if (svcToSvcGraphManager != null) { + svcToSvcGraphManager.processRecord(responseParams.getSvcToSvcGraphParams()); + } } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); @@ -1501,6 +1507,12 @@ public void syncWithDB(boolean syncImmediately, boolean fetchAllSTI, SyncLimit s now = Context.now(); loggerMaker.infoAndAddToDb("Finished syncing with db at : " + now, LogDb.RUNTIME); lastBuildFromDb = now; + if (svcToSvcGraphManager != null){ + svcToSvcGraphManager.updateWithNewDataAndReturnDelta(dataActor); + if (svcToSvcGraphManager.getLastFetchFromDb() < Context.now() - 6 * 60 * 60) { + svcToSvcGraphManager = SvcToSvcGraphManager.createFromEdgesAndNodes(dataActor); + } + } } } diff --git a/libs/dao/pom.xml b/libs/dao/pom.xml index b01cd1c927..0a85016abc 100644 --- a/libs/dao/pom.xml +++ b/libs/dao/pom.xml @@ -182,6 +182,12 @@ fastjson2 2.0.51 + + org.projectlombok + lombok + 1.18.36 + provided + diff --git a/libs/dao/src/main/java/com/akto/dao/graph/SvcToSvcGraphEdgesDao.java b/libs/dao/src/main/java/com/akto/dao/graph/SvcToSvcGraphEdgesDao.java new file mode 100644 index 0000000000..10f7e88c09 --- /dev/null +++ b/libs/dao/src/main/java/com/akto/dao/graph/SvcToSvcGraphEdgesDao.java @@ -0,0 +1,22 @@ +package com.akto.dao.graph; + +import com.akto.dao.AccountsContextDao; +import com.akto.dto.graph.SvcToSvcGraphEdge; + +public class SvcToSvcGraphEdgesDao extends AccountsContextDao { + + public static final SvcToSvcGraphEdgesDao instance = new SvcToSvcGraphEdgesDao(); + + private SvcToSvcGraphEdgesDao() {} + + @Override + public String getCollName() { + return "svc_to_svc_graph_edges"; + } + + @Override + public Class getClassT() { + return SvcToSvcGraphEdge.class; + } + +} diff --git a/libs/dao/src/main/java/com/akto/dao/graph/SvcToSvcGraphNodesDao.java b/libs/dao/src/main/java/com/akto/dao/graph/SvcToSvcGraphNodesDao.java new file mode 100644 index 0000000000..26dfdd73f9 --- /dev/null +++ b/libs/dao/src/main/java/com/akto/dao/graph/SvcToSvcGraphNodesDao.java @@ -0,0 +1,22 @@ +package com.akto.dao.graph; + +import com.akto.dao.AccountsContextDao; +import com.akto.dto.graph.SvcToSvcGraphNode; + +public class SvcToSvcGraphNodesDao extends AccountsContextDao { + + public static final SvcToSvcGraphNodesDao instance = new SvcToSvcGraphNodesDao(); + + private SvcToSvcGraphNodesDao() {} + + @Override + public String getCollName() { + return "svc_to_svc_graph_nodes"; + } + + @Override + public Class getClassT() { + return SvcToSvcGraphNode.class; + } + +} diff --git a/libs/dao/src/main/java/com/akto/dto/HttpResponseParams.java b/libs/dao/src/main/java/com/akto/dto/HttpResponseParams.java index b5b96db20b..f6da76e133 100644 --- a/libs/dao/src/main/java/com/akto/dto/HttpResponseParams.java +++ b/libs/dao/src/main/java/com/akto/dto/HttpResponseParams.java @@ -2,6 +2,7 @@ import com.akto.dao.context.Context; +import com.akto.dto.graph.SvcToSvcGraphParams; import java.util.HashMap; import java.util.List; @@ -27,6 +28,8 @@ public enum Source { String sourceIP; String destIP; String direction; + SvcToSvcGraphParams svcToSvcGraphParams; + public HttpResponseParams() {} @@ -34,12 +37,12 @@ public HttpResponseParams(String type, int statusCode, String status, Map> headers, String payload, HttpRequestParams requestParams, int time, String accountId, boolean isPending, Source source, - String orig, String sourceIP, String destIP, String direction) { + String orig, String sourceIP, String destIP, String direction, SvcToSvcGraphParams svcToSvcGraphParams) { this.type = type; this.statusCode = statusCode; this.status = status; @@ -54,6 +57,7 @@ public HttpResponseParams(String type, int statusCode, String status, Map edges; + private List nodes; + private int lastFetchFromDb; +} diff --git a/libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraphEdge.java b/libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraphEdge.java new file mode 100644 index 0000000000..753c0ff284 --- /dev/null +++ b/libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraphEdge.java @@ -0,0 +1,36 @@ +package com.akto.dto.graph; + +import org.bson.codecs.pojo.annotations.BsonId; + +import com.akto.dao.context.Context; + +import lombok.*; + +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +@ToString +@EqualsAndHashCode +public class SvcToSvcGraphEdge { + @BsonId + private String id; + + private String source; + + private String target; + + public static final String CREATTION_EPOCH = "creationEpoch"; + private int creationEpoch; + + private SvcToSvcGraphParams.Type type; + + private int lastSeenEpoch; + + private int counter; + + public static SvcToSvcGraphEdge createFromK8s(String source, String target) { + int ts = Context.now(); + return new SvcToSvcGraphEdge(source + "_" + target, source, target, ts, SvcToSvcGraphParams.Type.K8S, ts, 0); + } +} diff --git a/libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraphNode.java b/libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraphNode.java new file mode 100644 index 0000000000..2f1ae7bb82 --- /dev/null +++ b/libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraphNode.java @@ -0,0 +1,32 @@ +package com.akto.dto.graph; + +import org.bson.codecs.pojo.annotations.BsonId; + +import com.akto.dao.context.Context; + +import lombok.*; + +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +@ToString +@EqualsAndHashCode + +public class SvcToSvcGraphNode { + @BsonId + private String id; + + private int creationEpoch; + + private SvcToSvcGraphParams.Type type; + + private int lastSeenEpoch; + + private int counter; + + public static SvcToSvcGraphNode createFromK8s(String name) { + int ts = Context.now(); + return new SvcToSvcGraphNode(name, ts, SvcToSvcGraphParams.Type.K8S, ts, 0); + } +} diff --git a/libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraphParams.java b/libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraphParams.java new file mode 100644 index 0000000000..4ae64032e1 --- /dev/null +++ b/libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraphParams.java @@ -0,0 +1,10 @@ +package com.akto.dto.graph; + +public abstract class SvcToSvcGraphParams { + + public enum Type { + K8S + } + + public abstract Type getType(); +} diff --git a/libs/utils/pom.xml b/libs/utils/pom.xml index a36c242a08..f95b2269c4 100644 --- a/libs/utils/pom.xml +++ b/libs/utils/pom.xml @@ -117,6 +117,12 @@ graphql-java 20.0 + + org.projectlombok + lombok + 1.18.36 + provided + diff --git a/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java b/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java index c5c2f16f90..c86d2399e2 100644 --- a/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java +++ b/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java @@ -2,6 +2,8 @@ import com.akto.DaoInit; import com.akto.dto.filter.MergedUrls; +import com.akto.dto.graph.SvcToSvcGraphEdge; +import com.akto.dto.graph.SvcToSvcGraphNode; import com.akto.dto.settings.DataControlSettings; import com.akto.testing.ApiExecutor; import com.auth0.jwt.JWT; @@ -54,6 +56,7 @@ import com.mongodb.BasicDBList; import com.mongodb.BasicDBObject; +import org.apache.kafka.common.protocol.types.Field.Str; import org.bson.BsonReader; import org.bson.Document; import org.bson.codecs.Codec; @@ -3540,4 +3543,114 @@ public TestingRunResultSummary findLatestTestingRunResultSummary(Bson filter) { return null; } } + + + public List findSvcToSvcGraphEdges(int startTs, int endTs, int skip, int limit) { + Map> headers = buildHeaders(); + BasicDBObject obj = new BasicDBObject(); + obj.put("startTs", startTs); + obj.put("endTs", endTs); + obj.put("skip", skip); + obj.put("limit", limit); + OriginalHttpRequest request = new OriginalHttpRequest(url + "/findAllSvcToSvcGraphEdges", "", "POST", obj.toString(), headers, ""); + try { + OriginalHttpResponse response = ApiExecutor.sendRequest(request, true, null, false, null); + String responsePayload = response.getBody(); + if (response.getStatusCode() != 200 || responsePayload == null) { + loggerMaker.errorAndAddToDb("non 2xx response in findAllSvcToSvcGraphEdges", LoggerMaker.LogDb.RUNTIME); + return null; + } + BasicDBObject payloadObj; + try { + payloadObj = BasicDBObject.parse(responsePayload); + BasicDBList edges = (BasicDBList) payloadObj.get("edges"); + List edgesList = new ArrayList<>(); + for (Object edge: edges) { + BasicDBObject edgeObj = (BasicDBObject) edge; + SvcToSvcGraphEdge svcToSvcGraphEdge = objectMapper.readValue(edgeObj.toJson(), SvcToSvcGraphEdge.class); + edgesList.add(svcToSvcGraphEdge); + } + return edgesList; + } catch(Exception e) { + return null; + } + } catch (Exception e) { + loggerMaker.errorAndAddToDb("error in findAllSvcToSvcGraphEdges" + e, LoggerMaker.LogDb.RUNTIME); + return null; + } + } + + public List findSvcToSvcGraphNodes(int startTs, int endTs, int skip, int limit) { + Map> headers = buildHeaders(); + BasicDBObject obj = new BasicDBObject(); + obj.put("startTs", startTs); + obj.put("endTs", endTs); + obj.put("skip", skip); + obj.put("limit", limit); + OriginalHttpRequest request = new OriginalHttpRequest(url + "/findAllSvcToSvcGraphNodes", "", "POST", obj.toString(), headers, ""); + try { + OriginalHttpResponse response = ApiExecutor.sendRequest(request, true, null, false, null); + String responsePayload = response.getBody(); + if (response.getStatusCode() != 200 || responsePayload == null) { + loggerMaker.errorAndAddToDb("non 2xx response in findAllSvcToSvcGraphNodes", LoggerMaker.LogDb.RUNTIME); + return null; + } + BasicDBObject payloadObj; + try { + payloadObj = BasicDBObject.parse(responsePayload); + BasicDBList nodes = (BasicDBList) payloadObj.get("nodes"); + List nodesList = new ArrayList<>(); + for (Object node: nodes) { + BasicDBObject nodeObj = (BasicDBObject) node; + SvcToSvcGraphNode svcToSvcGraphNode = objectMapper.readValue(nodeObj.toJson(), SvcToSvcGraphNode.class); + nodesList.add(svcToSvcGraphNode); + } + return nodesList; + } catch(Exception e) { + return null; + } + } catch (Exception e) { + loggerMaker.errorAndAddToDb("error in findAllSvcToSvcGraphNodes" + e, LoggerMaker.LogDb.RUNTIME); + return null; + } + } + + @Override + public void updateSvcToSvcGraphEdges(List edges) { + Map> headers = buildHeaders(); + BasicDBObject obj = new BasicDBObject(); + obj.put("edges", edges); + OriginalHttpRequest request = new OriginalHttpRequest(url + "/updateSvcToSvcGraphEdges", "", "POST", obj.toString(), headers, ""); + try { + OriginalHttpResponse response = ApiExecutor.sendRequest(request, true, null, false, null); + String responsePayload = response.getBody(); + if (response.getStatusCode() != 200 || responsePayload == null) { + loggerMaker.errorAndAddToDb("non 2xx response in updateSvcToSvcGraphEdges", LoggerMaker.LogDb.RUNTIME); + return; + } + } catch (Exception e) { + loggerMaker.errorAndAddToDb("error in updateSvcToSvcGraphEdges" + e, LoggerMaker.LogDb.RUNTIME); + return; + } + } + + @Override + public void updateSvcToSvcGraphNodes(List nodes) { + Map> headers = buildHeaders(); + BasicDBObject obj = new BasicDBObject(); + obj.put("nodes", nodes); + OriginalHttpRequest request = new OriginalHttpRequest(url + "/updateSvcToSvcGraphNodes", "", "POST", obj.toString(), headers, ""); + + try { + OriginalHttpResponse response = ApiExecutor.sendRequest(request, true, null, false, null); + String responsePayload = response.getBody(); + if (response.getStatusCode() != 200 || responsePayload == null) { + loggerMaker.errorAndAddToDb("non 2xx response in updateSvcToSvcGraphNodes", LoggerMaker.LogDb.RUNTIME); + return; + } + } catch (Exception e) { + loggerMaker.errorAndAddToDb("error in updateSvcToSvcGraphNodes" + e, LoggerMaker.LogDb.RUNTIME); + return; + } + } } diff --git a/libs/utils/src/main/java/com/akto/data_actor/DataActor.java b/libs/utils/src/main/java/com/akto/data_actor/DataActor.java index 08aa9dfbec..e6577665a5 100644 --- a/libs/utils/src/main/java/com/akto/data_actor/DataActor.java +++ b/libs/utils/src/main/java/com/akto/data_actor/DataActor.java @@ -5,6 +5,8 @@ import com.akto.dto.billing.Tokens; import com.akto.dto.dependency_flow.Node; import com.akto.dto.filter.MergedUrls; +import com.akto.dto.graph.SvcToSvcGraphEdge; +import com.akto.dto.graph.SvcToSvcGraphNode; import com.akto.dto.runtime_filters.RuntimeFilter; import com.akto.dto.settings.DataControlSettings; import com.akto.dto.test_editor.YamlTemplate; @@ -30,6 +32,7 @@ import com.akto.dto.usage.MetricTypes; import com.mongodb.BasicDBObject; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -270,4 +273,65 @@ public abstract class DataActor { public abstract TestingRunResultSummary findLatestTestingRunResultSummary(Bson filter); + + public List findAllSvcToSvcGraphEdges(int startTs, int endTs) { + List ret = new ArrayList<>(); + int skip = 0; + int limit = 1000; + while (true) { + List newList = findSvcToSvcGraphEdges(startTs, endTs, skip, limit); + + ret.addAll(newList); + skip += limit; + if (newList.size() < limit) { + return ret; + } + } + } + + protected abstract List findSvcToSvcGraphEdges(int startTs, int endTs, int skip, int limit); + + public List findAllSvcToSvcGraphNodes(int startTs, int endTs) { + List ret = new ArrayList<>(); + int skip = 0; + int limit = 1000; + while (true) { + List newList = findSvcToSvcGraphNodes(startTs, endTs, skip, limit); + + ret.addAll(newList); + skip += limit; + if (newList.size() < limit) { + return ret; + } + } + } + + protected abstract List findSvcToSvcGraphNodes(int startTs, int endTs, int skip, int limit); + + public abstract void updateSvcToSvcGraphEdges(List edges); + public abstract void updateSvcToSvcGraphNodes(List nodes); + + public void updateNewEdgesInBatches(List updateEdges) { + if (updateEdges.isEmpty()) return; + + int start = 0; + + do { + updateSvcToSvcGraphEdges(updateEdges.subList(start, Math.min(start + 1000, updateEdges.size()))); + start += 1000; + } while (start < updateEdges.size()); + } + + + public void updateNewNodesInBatches(List updateNodes) { + if (updateNodes.isEmpty()) return; + + int start = 0; + + do { + updateSvcToSvcGraphNodes(updateNodes.subList(start, Math.min(start + 1000, updateNodes.size()))); + start += 1000; + } while (start < updateNodes.size()); + } + } diff --git a/libs/utils/src/main/java/com/akto/data_actor/DbActor.java b/libs/utils/src/main/java/com/akto/data_actor/DbActor.java index a2f174bf9e..466163b278 100644 --- a/libs/utils/src/main/java/com/akto/data_actor/DbActor.java +++ b/libs/utils/src/main/java/com/akto/data_actor/DbActor.java @@ -9,6 +9,8 @@ import com.akto.dto.billing.Tokens; import com.akto.dto.dependency_flow.Node; import com.akto.dto.filter.MergedUrls; +import com.akto.dto.graph.SvcToSvcGraphEdge; +import com.akto.dto.graph.SvcToSvcGraphNode; import com.akto.dto.runtime_filters.RuntimeFilter; import com.akto.dto.settings.DataControlSettings; import com.akto.dto.test_editor.YamlTemplate; @@ -560,4 +562,24 @@ public TestingRunResultSummary findLatestTestingRunResultSummary(Bson filter){ return DbLayer.findLatestTestingRunResultSummary(filter); } + @Override + protected List findSvcToSvcGraphEdges(int startTs, int endTs, int skip, int limit) { + return DbLayer.findSvcToSvcGraphEdges(startTs, endTs, skip, limit); + } + + @Override + protected List findSvcToSvcGraphNodes(int startTs, int endTs, int skip, int limit) { + return DbLayer.findSvcToSvcGraphNodes(startTs, endTs, skip, limit); + } + + @Override + public void updateSvcToSvcGraphEdges(List edges) { + DbLayer.updateSvcToSvcGraphEdges(edges); + } + + @Override + public void updateSvcToSvcGraphNodes(List nodes) { + DbLayer.updateSvcToSvcGraphNodes(nodes); + } + } \ No newline at end of file diff --git a/libs/utils/src/main/java/com/akto/data_actor/DbLayer.java b/libs/utils/src/main/java/com/akto/data_actor/DbLayer.java index a9649f77c3..71e5bc295f 100644 --- a/libs/utils/src/main/java/com/akto/data_actor/DbLayer.java +++ b/libs/utils/src/main/java/com/akto/data_actor/DbLayer.java @@ -13,10 +13,14 @@ import com.akto.bulk_update_util.ApiInfoBulkUpdate; import com.akto.dao.*; import com.akto.dao.filter.MergedUrlsDao; +import com.akto.dao.graph.SvcToSvcGraphEdgesDao; +import com.akto.dao.graph.SvcToSvcGraphNodesDao; import com.akto.dao.settings.DataControlSettingsDao; import com.akto.dependency_analyser.DependencyAnalyserUtils; import com.akto.dto.*; import com.akto.dto.filter.MergedUrls; +import com.akto.dto.graph.SvcToSvcGraphEdge; +import com.akto.dto.graph.SvcToSvcGraphNode; import com.akto.dto.settings.DataControlSettings; import com.mongodb.client.model.*; import org.bson.conversions.Bson; @@ -308,9 +312,9 @@ public static List fetchEndpointsInCollection() { int apiCollectionId = -1; List pipeline = new ArrayList<>(); BasicDBObject groupedId = - new BasicDBObject("apiCollectionId", "$apiCollectionId") - .append("url", "$url") - .append("method", "$method"); + new BasicDBObject("apiCollectionId", "") + .append("url", "") + .append("method", ""); if (apiCollectionId != -1) { pipeline.add(Aggregates.match(Filters.eq("apiCollectionId", apiCollectionId))); @@ -748,7 +752,7 @@ public static List fetchSampleData(Set apiCollectionIds, in * sending only the last sample data to send minimal data. */ Bson projection = Projections.computed(SampleData.SAMPLES, - Projections.computed("$slice", Arrays.asList("$" + SampleData.SAMPLES, -1))); + Projections.computed("", Arrays.asList("$" + SampleData.SAMPLES, -1))); return SampleDataDao.instance.findAll(filterQ, skip, SAMPLE_DATA_LIMIT, Sorts.descending(Constants.ID), projection); } @@ -1054,4 +1058,47 @@ public static TestingRunResultSummary findLatestTestingRunResultSummary(Bson fil return TestingRunResultSummariesDao.instance.findLatestOne(filter); } + public static List findSvcToSvcGraphEdges(int startTs, int endTs, int skip, int limit) { + return SvcToSvcGraphEdgesDao.instance.findAll(Filters.and( + Filters.gte(SvcToSvcGraphEdge.CREATTION_EPOCH, startTs), + Filters.lte(SvcToSvcGraphEdge.CREATTION_EPOCH, endTs) + ), skip, limit, Sorts.ascending(SvcToSvcGraphEdge.CREATTION_EPOCH)); + } + + public static List findSvcToSvcGraphNodes(int startTs, int endTs, int skip, int limit) { + return SvcToSvcGraphNodesDao.instance.findAll(Filters.and( + Filters.gte(SvcToSvcGraphEdge.CREATTION_EPOCH, startTs), + Filters.lte(SvcToSvcGraphEdge.CREATTION_EPOCH, endTs) + ), skip, limit, Sorts.ascending(SvcToSvcGraphEdge.CREATTION_EPOCH)); + } + + public static void updateSvcToSvcGraphEdges(List edges) { + if (edges == null || edges.isEmpty()) { + return; + } + + BulkWriteOptions options = new BulkWriteOptions().ordered(false).bypassDocumentValidation(true); + List> bulkList = new ArrayList<>(); + for(SvcToSvcGraphEdge edge: edges) { + bulkList.add(new InsertOneModel(edge)); + } + + SvcToSvcGraphEdgesDao.instance.bulkWrite(bulkList, options); + } + + public static void updateSvcToSvcGraphNodes(List nodes) { + + if (nodes == null || nodes.isEmpty()) { + return; + } + + BulkWriteOptions options = new BulkWriteOptions().ordered(false).bypassDocumentValidation(true); + List> bulkList = new ArrayList<>(); + for(SvcToSvcGraphNode node: nodes) { + bulkList.add(new InsertOneModel(node)); + } + + SvcToSvcGraphNodesDao.instance.bulkWrite(bulkList, options); + } + } diff --git a/libs/utils/src/main/java/com/akto/runtime/SvcToSvcGraphManager.java b/libs/utils/src/main/java/com/akto/runtime/SvcToSvcGraphManager.java new file mode 100644 index 0000000000..753a0caf38 --- /dev/null +++ b/libs/utils/src/main/java/com/akto/runtime/SvcToSvcGraphManager.java @@ -0,0 +1,229 @@ +package com.akto.runtime; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.akto.dao.context.Context; +import com.akto.data_actor.DataActor; +import com.akto.dto.graph.K8sDaemonsetGraphParams; +import com.akto.dto.graph.SvcToSvcGraph; +import com.akto.dto.graph.SvcToSvcGraphEdge; +import com.akto.dto.graph.SvcToSvcGraphNode; +import com.akto.dto.graph.SvcToSvcGraphParams; + +import lombok.*; + +@Setter +@Getter +@NoArgsConstructor(access = AccessLevel.PRIVATE) +@AllArgsConstructor(access = AccessLevel.PRIVATE) +public class SvcToSvcGraphManager { + + Map> edges; + Set nodes; + + Map> newEdges; + Set newNodes; + + Map mapCompleteProcessIdToServiceNames; + int lastFetchFromDb; + + public static SvcToSvcGraphManager createFromEdgesAndNodes(DataActor dataActor) { + int now = Context.now(); + List edges = dataActor.findAllSvcToSvcGraphEdges(0, now); + List nodes = dataActor.findAllSvcToSvcGraphNodes(0, now); + SvcToSvcGraphManager instance = new SvcToSvcGraphManager(createEdgesMap(edges), createNodesSet(nodes), new HashMap<>(), new HashSet<>(), new HashMap<>(), now); + return instance; + + } + + private static Map> createEdgesMap(List edges) { + Map> ret = new HashMap<>(); + for (SvcToSvcGraphEdge svcToSvcGraphEdge : edges) { + String source = svcToSvcGraphEdge.getSource(); + String target = svcToSvcGraphEdge.getTarget(); + Set currEdges = ret.get(source); + + if (currEdges == null) { + currEdges = new HashSet<>(); + ret.put(source, currEdges); + } + + currEdges.add(target); + } + + return ret; + + } + + private static Set createNodesSet(List nodes) { + Set ret = new HashSet<>(); + for (SvcToSvcGraphNode svcToSvcGraphNode : nodes) { + + ret.add(svcToSvcGraphNode.getId()); + } + + return ret; + + } + + private boolean addToMap(Map> edges, String source, String target) { + Set currEdges = edges.get(source); + + if (currEdges == null) { + currEdges = new HashSet<>(); + edges.put(source, currEdges); + } + + if (currEdges.contains(target)) return false; + + currEdges.add(target); + + return true; + } + + public void processRecord(SvcToSvcGraphParams svcToSvcGraphParams) { + if (svcToSvcGraphParams == null) { + return; + } + + switch (svcToSvcGraphParams.getType()) { + case K8S: + K8sDaemonsetGraphParams k8sDaemonsetGraphParams = (K8sDaemonsetGraphParams) svcToSvcGraphParams; + + String completeProcessId = k8sDaemonsetGraphParams.getDaemonsetId() + "_" + k8sDaemonsetGraphParams.getProcessId(); + String serviceName = k8sDaemonsetGraphParams.getHostInApiRequest(); + + addNode(serviceName); + + switch (k8sDaemonsetGraphParams.getDirection()) { + case K8sDaemonsetGraphParams.DIRECTION_INCOMING: + addCompleteProcessId(completeProcessId, serviceName); + // addEdge (from other tools or from other services using ip address) + break; + case K8sDaemonsetGraphParams.DIRECTION_OUTGOING: + addEdge(completeProcessId, serviceName); + break; + default: + break; + } + + break; + default: + throw new RuntimeException("Unknown type: " + svcToSvcGraphParams.getType()); + } + } + + private void addCompleteProcessId(String completeProcessId, String serviceName) { + if (mapCompleteProcessIdToServiceNames.containsKey(completeProcessId)) { + String existingServiceName = mapCompleteProcessIdToServiceNames.get(completeProcessId); + if (existingServiceName.equalsIgnoreCase(serviceName)) { + return; + } else { + // this is strange. Process id is same but service name is different + } + } + + mapCompleteProcessIdToServiceNames.put(completeProcessId, serviceName); + + } + + private void addEdge(String completeProcessId, String serviceName) { + String source = mapCompleteProcessIdToServiceNames.get(completeProcessId); + if (source == null) return; + + String target = serviceName; + + boolean isNewEdge = addToMap(edges, source, target); + + if (isNewEdge) { + addToMap(newEdges, source, target); + } + } + + private boolean addNode(String serviceName) { + boolean isNewNode = nodes.add(serviceName); + + if (isNewNode) { + newNodes.add(serviceName); + return true; + } + + return false; + } + + private List updateWithNewEdgesAndReturnDelta(List incrEdges) { + List updates = new ArrayList<>(); + for (SvcToSvcGraphEdge svcToSvcGraphEdge : incrEdges) { + String newSrc = svcToSvcGraphEdge.getSource(); + String newTgt = svcToSvcGraphEdge.getTarget(); + boolean isNewEdge = addToMap(edges, newSrc, newTgt); + + if (!isNewEdge) { + continue; + } + + if (newEdges.containsKey(newSrc)) { + newEdges.get(newSrc).remove(newTgt); + continue; + } + + } + + for (String src: newEdges.keySet()) { + Set targets = newEdges.get(src); + for (String tgt: targets) { + SvcToSvcGraphEdge svcToSvcGraphEdge = SvcToSvcGraphEdge.createFromK8s(src, tgt); + updates.add(svcToSvcGraphEdge); + } + } + + return updates; + } + + private List updateWithNewNodesAndReturnDelta(List incrNodes) { + List updates = new ArrayList<>(); + for (String node: newNodes) { + boolean isNewNode = addNode(node); + + if (!isNewNode) { + continue; + } + + newNodes.remove(node); + + } + + for (String node: newNodes) { + SvcToSvcGraphNode svcToSvcGraphNode = SvcToSvcGraphNode.createFromK8s(node); + updates.add(svcToSvcGraphNode); + } + + return updates; + } + + private void resetWithNewTs(int fetchFromDbTs) { + newEdges.clear(); + newNodes.clear(); + this.lastFetchFromDb = fetchFromDbTs; + } + + public SvcToSvcGraph updateWithNewDataAndReturnDelta(DataActor dataActor) { + int now = Context.now(); + + + List updateEdges = updateWithNewEdgesAndReturnDelta(dataActor.findAllSvcToSvcGraphEdges(lastFetchFromDb, now)); + List updateNodes = updateWithNewNodesAndReturnDelta(dataActor.findAllSvcToSvcGraphNodes(lastFetchFromDb, now)); + resetWithNewTs(now); + + dataActor.updateNewEdgesInBatches(updateEdges); + dataActor.updateNewNodesInBatches(updateNodes); + + return new SvcToSvcGraph(updateEdges, updateNodes, now); + } + +} diff --git a/libs/utils/src/main/java/com/akto/runtime/parser/SampleParser.java b/libs/utils/src/main/java/com/akto/runtime/parser/SampleParser.java index 7464225b64..f299d9b85e 100644 --- a/libs/utils/src/main/java/com/akto/runtime/parser/SampleParser.java +++ b/libs/utils/src/main/java/com/akto/runtime/parser/SampleParser.java @@ -1,5 +1,6 @@ package com.akto.runtime.parser; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -8,6 +9,8 @@ import com.akto.dto.HttpRequestParams; import com.akto.dto.HttpResponseParams; import com.akto.dto.OriginalHttpRequest; +import com.akto.dto.graph.K8sDaemonsetGraphParams; +import com.akto.dto.graph.SvcToSvcGraphParams; import com.akto.util.HttpRequestResponseUtils; import com.akto.util.JSONUtils; import com.google.gson.Gson; @@ -58,8 +61,20 @@ public static HttpResponseParams parseSampleMessage(String message) throws Excep String sourceStr = (String) json.getOrDefault("source", HttpResponseParams.Source.OTHER.name()); HttpResponseParams.Source source = HttpResponseParams.Source.valueOf(sourceStr); + String enableGraph = (String) json.getOrDefault("enable_graph", "false"); + SvcToSvcGraphParams graphParams = null; + if (enableGraph.equals("true")) { + List hostNameList = requestHeaders.getOrDefault("host", requestHeaders.getOrDefault(":authority", new ArrayList<>())); + if (hostNameList != null && hostNameList.size()>0) { + String processId = (String) json.get("process_id"); + String socketId = (String) json.get("socket_id"); + String daemonsetId = (String) json.get("daemonset_id"); + graphParams = new K8sDaemonsetGraphParams(hostNameList.get(0), processId, socketId, daemonsetId, direction); + } + + } return new HttpResponseParams( - type,statusCode, status, responseHeaders, payload, requestParams, time, accountId, isPending, source, message, sourceIP, destIP, direction + type,statusCode, status, responseHeaders, payload, requestParams, time, accountId, isPending, source, message, sourceIP, destIP, direction, graphParams ); } From e04526ea906424bff6bebac76507dd70ca178b36 Mon Sep 17 00:00:00 2001 From: Ankush Jain <91221068+ankush-jain-akto@users.noreply.github.com> Date: Fri, 18 Apr 2025 13:08:31 -0700 Subject: [PATCH 2/7] Update DbAction.java with all APIs --- .../main/java/com/akto/action/DbAction.java | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/apps/database-abstractor/src/main/java/com/akto/action/DbAction.java b/apps/database-abstractor/src/main/java/com/akto/action/DbAction.java index 2c95b43312..8f0083a419 100644 --- a/apps/database-abstractor/src/main/java/com/akto/action/DbAction.java +++ b/apps/database-abstractor/src/main/java/com/akto/action/DbAction.java @@ -11,6 +11,9 @@ import com.akto.dto.bulk_updates.BulkUpdates; import com.akto.dto.bulk_updates.UpdatePayload; import com.akto.dto.filter.MergedUrls; +import com.akto.dto.graph.SvcToSvcGraph; +import com.akto.dto.graph.SvcToSvcGraphEdge; +import com.akto.dto.graph.SvcToSvcGraphNode; import com.akto.dto.runtime_filters.RuntimeFilter; import com.akto.dto.settings.DataControlSettings; import com.akto.dto.test_editor.YamlTemplate; @@ -1740,6 +1743,54 @@ public String findLatestTestingRunResultSummary(){ return Action.SUCCESS.toUpperCase(); } + public List svcToSvcGraphEdges; + public List svcToSvcGraphNodes; + + public String findSvcToSvcGraphNodes() { + try { + this.svcToSvcGraphNodes = DbLayer.findSvcToSvcGraphNodes(startTimestamp, endTimestamp, skip, limit); + } catch (Exception e) { + System.out.println("Error in findSvcToSvcGraphNodes " + e.toString()); + return Action.ERROR.toUpperCase(); + } + + return Action.SUCCESS.toUpperCase(); + + } + + public String findSvcToSvcGraphEdges() { + try { + this.svcToSvcGraphEdges = DbLayer.findSvcToSvcGraphEdges(startTimestamp, endTimestamp, skip, limit); + } catch (Exception e) { + System.out.println("Error in findSvcToSvcGraphEdges " + e.toString()); + return Action.ERROR.toUpperCase(); + } + + return Action.SUCCESS.toUpperCase(); + } + + public String updateSvcToSvcGraphEdges() { + try { + DbLayer.updateSvcToSvcGraphEdges(this.svcToSvcGraphEdges); + } catch (Exception e) { + System.out.println("Error in updateSvcToSvcGraphEdges " + e.toString()); + return Action.ERROR.toUpperCase(); + } + + return Action.SUCCESS.toUpperCase(); + } + + public String updateSvcToSvcGraphNodes() { + try { + DbLayer.updateSvcToSvcGraphNodes(this.svcToSvcGraphNodes); + } catch (Exception e) { + System.out.println("Error in updateSvcToSvcGraphNodes " + e.toString()); + return Action.ERROR.toUpperCase(); + } + + return Action.SUCCESS.toUpperCase(); + } + public List getCustomDataTypes() { return customDataTypes; } From a9f7500e8b9096fb6ea8579b2a0af659f2d4cf70 Mon Sep 17 00:00:00 2001 From: Ankush Jain <91221068+ankush-jain-akto@users.noreply.github.com> Date: Fri, 18 Apr 2025 13:09:14 -0700 Subject: [PATCH 3/7] Update struts.xml with graph APIs --- .../src/main/resources/struts.xml | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/apps/database-abstractor/src/main/resources/struts.xml b/apps/database-abstractor/src/main/resources/struts.xml index b8cedaeca5..fb2a0a9b75 100644 --- a/apps/database-abstractor/src/main/resources/struts.xml +++ b/apps/database-abstractor/src/main/resources/struts.xml @@ -1233,6 +1233,50 @@ ^actionErrors.* + + + + + + 422 + false + ^actionErrors.* + + + + + + + + + 422 + false + ^actionErrors.* + + + + + + + + + 422 + false + ^actionErrors.* + + + + + + + + + 422 + false + ^actionErrors.* + + + From 6ffb893930484f2efc19c1b11f4570929daed9b9 Mon Sep 17 00:00:00 2001 From: Ankush Jain Date: Sat, 19 Apr 2025 02:24:14 -0700 Subject: [PATCH 4/7] use correct name --- .../src/main/java/com/akto/data_actor/ClientActor.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java b/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java index c86d2399e2..34a3fdec60 100644 --- a/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java +++ b/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java @@ -3548,8 +3548,8 @@ public TestingRunResultSummary findLatestTestingRunResultSummary(Bson filter) { public List findSvcToSvcGraphEdges(int startTs, int endTs, int skip, int limit) { Map> headers = buildHeaders(); BasicDBObject obj = new BasicDBObject(); - obj.put("startTs", startTs); - obj.put("endTs", endTs); + obj.put("startTimestamp", startTs); + obj.put("endTimestamp", endTs); obj.put("skip", skip); obj.put("limit", limit); OriginalHttpRequest request = new OriginalHttpRequest(url + "/findAllSvcToSvcGraphEdges", "", "POST", obj.toString(), headers, ""); @@ -3583,8 +3583,8 @@ public List findSvcToSvcGraphEdges(int startTs, int endTs, in public List findSvcToSvcGraphNodes(int startTs, int endTs, int skip, int limit) { Map> headers = buildHeaders(); BasicDBObject obj = new BasicDBObject(); - obj.put("startTs", startTs); - obj.put("endTs", endTs); + obj.put("startTimestamp", startTs); + obj.put("endTimestamp", endTs); obj.put("skip", skip); obj.put("limit", limit); OriginalHttpRequest request = new OriginalHttpRequest(url + "/findAllSvcToSvcGraphNodes", "", "POST", obj.toString(), headers, ""); From 5cd061cd010f6f3fff9a27b89b286cd539d9613a Mon Sep 17 00:00:00 2001 From: Ankush Jain Date: Sat, 19 Apr 2025 06:03:29 -0700 Subject: [PATCH 5/7] fix api name and parameter name --- .../java/com/akto/data_actor/ClientActor.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java b/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java index 34a3fdec60..b981f5053c 100644 --- a/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java +++ b/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java @@ -3552,18 +3552,18 @@ public List findSvcToSvcGraphEdges(int startTs, int endTs, in obj.put("endTimestamp", endTs); obj.put("skip", skip); obj.put("limit", limit); - OriginalHttpRequest request = new OriginalHttpRequest(url + "/findAllSvcToSvcGraphEdges", "", "POST", obj.toString(), headers, ""); + OriginalHttpRequest request = new OriginalHttpRequest(url + "/findSvcToSvcGraphEdges", "", "POST", obj.toString(), headers, ""); try { OriginalHttpResponse response = ApiExecutor.sendRequest(request, true, null, false, null); String responsePayload = response.getBody(); if (response.getStatusCode() != 200 || responsePayload == null) { - loggerMaker.errorAndAddToDb("non 2xx response in findAllSvcToSvcGraphEdges", LoggerMaker.LogDb.RUNTIME); + loggerMaker.errorAndAddToDb("non 2xx response in findSvcToSvcGraphEdges", LoggerMaker.LogDb.RUNTIME); return null; } BasicDBObject payloadObj; try { payloadObj = BasicDBObject.parse(responsePayload); - BasicDBList edges = (BasicDBList) payloadObj.get("edges"); + BasicDBList edges = (BasicDBList) payloadObj.get("svcToSvcGraphEdges"); List edgesList = new ArrayList<>(); for (Object edge: edges) { BasicDBObject edgeObj = (BasicDBObject) edge; @@ -3575,7 +3575,7 @@ public List findSvcToSvcGraphEdges(int startTs, int endTs, in return null; } } catch (Exception e) { - loggerMaker.errorAndAddToDb("error in findAllSvcToSvcGraphEdges" + e, LoggerMaker.LogDb.RUNTIME); + loggerMaker.errorAndAddToDb("error in findSvcToSvcGraphEdges" + e, LoggerMaker.LogDb.RUNTIME); return null; } } @@ -3587,18 +3587,18 @@ public List findSvcToSvcGraphNodes(int startTs, int endTs, in obj.put("endTimestamp", endTs); obj.put("skip", skip); obj.put("limit", limit); - OriginalHttpRequest request = new OriginalHttpRequest(url + "/findAllSvcToSvcGraphNodes", "", "POST", obj.toString(), headers, ""); + OriginalHttpRequest request = new OriginalHttpRequest(url + "/findSvcToSvcGraphNodes", "", "POST", obj.toString(), headers, ""); try { OriginalHttpResponse response = ApiExecutor.sendRequest(request, true, null, false, null); String responsePayload = response.getBody(); if (response.getStatusCode() != 200 || responsePayload == null) { - loggerMaker.errorAndAddToDb("non 2xx response in findAllSvcToSvcGraphNodes", LoggerMaker.LogDb.RUNTIME); + loggerMaker.errorAndAddToDb("non 2xx response in findSvcToSvcGraphNodes", LoggerMaker.LogDb.RUNTIME); return null; } BasicDBObject payloadObj; try { payloadObj = BasicDBObject.parse(responsePayload); - BasicDBList nodes = (BasicDBList) payloadObj.get("nodes"); + BasicDBList nodes = (BasicDBList) payloadObj.get("svcToSvcGraphNodes"); List nodesList = new ArrayList<>(); for (Object node: nodes) { BasicDBObject nodeObj = (BasicDBObject) node; @@ -3610,7 +3610,7 @@ public List findSvcToSvcGraphNodes(int startTs, int endTs, in return null; } } catch (Exception e) { - loggerMaker.errorAndAddToDb("error in findAllSvcToSvcGraphNodes" + e, LoggerMaker.LogDb.RUNTIME); + loggerMaker.errorAndAddToDb("error in findSvcToSvcGraphNodes" + e, LoggerMaker.LogDb.RUNTIME); return null; } } @@ -3619,8 +3619,8 @@ public List findSvcToSvcGraphNodes(int startTs, int endTs, in public void updateSvcToSvcGraphEdges(List edges) { Map> headers = buildHeaders(); BasicDBObject obj = new BasicDBObject(); - obj.put("edges", edges); - OriginalHttpRequest request = new OriginalHttpRequest(url + "/updateSvcToSvcGraphEdges", "", "POST", obj.toString(), headers, ""); + obj.put("svcToSvcGraphEdges", edges); + OriginalHttpRequest request = new OriginalHttpRequest(url + "/updateSvcToSvcGraphEdges", "", "POST", gson.toJson(obj), headers, ""); try { OriginalHttpResponse response = ApiExecutor.sendRequest(request, true, null, false, null); String responsePayload = response.getBody(); @@ -3638,8 +3638,8 @@ public void updateSvcToSvcGraphEdges(List edges) { public void updateSvcToSvcGraphNodes(List nodes) { Map> headers = buildHeaders(); BasicDBObject obj = new BasicDBObject(); - obj.put("nodes", nodes); - OriginalHttpRequest request = new OriginalHttpRequest(url + "/updateSvcToSvcGraphNodes", "", "POST", obj.toString(), headers, ""); + obj.put("svcToSvcGraphNodes", nodes); + OriginalHttpRequest request = new OriginalHttpRequest(url + "/updateSvcToSvcGraphNodes", "", "POST", gson.toJson(obj), headers, ""); try { OriginalHttpResponse response = ApiExecutor.sendRequest(request, true, null, false, null); From 837e39fc1432f3fcf690f2966eaae77b5a563066 Mon Sep 17 00:00:00 2001 From: Ankush Jain Date: Sat, 19 Apr 2025 06:45:02 -0700 Subject: [PATCH 6/7] fix pom.xml and add test --- libs/dao/pom.xml | 1 - .../java/com/akto/dto/HttpResponseParams.java | 4 ++ libs/utils/pom.xml | 1 - .../com/akto/runtime/TestSvcToSvcGraph.java | 53 +++++++++++++++++++ 4 files changed, 57 insertions(+), 2 deletions(-) create mode 100644 libs/utils/src/test/java/com/akto/runtime/TestSvcToSvcGraph.java diff --git a/libs/dao/pom.xml b/libs/dao/pom.xml index 0a85016abc..0a66ff73ab 100644 --- a/libs/dao/pom.xml +++ b/libs/dao/pom.xml @@ -186,7 +186,6 @@ org.projectlombok lombok 1.18.36 - provided diff --git a/libs/dao/src/main/java/com/akto/dto/HttpResponseParams.java b/libs/dao/src/main/java/com/akto/dto/HttpResponseParams.java index f6da76e133..2534fda3cd 100644 --- a/libs/dao/src/main/java/com/akto/dto/HttpResponseParams.java +++ b/libs/dao/src/main/java/com/akto/dto/HttpResponseParams.java @@ -172,4 +172,8 @@ public void setRequestParams(HttpRequestParams requestParams) { public SvcToSvcGraphParams getSvcToSvcGraphParams() { return svcToSvcGraphParams; } + + public void setSvcToSvcGraphParams(SvcToSvcGraphParams svcToSvcGraphParams) { + this.svcToSvcGraphParams = svcToSvcGraphParams; + } } diff --git a/libs/utils/pom.xml b/libs/utils/pom.xml index f95b2269c4..6a6b290364 100644 --- a/libs/utils/pom.xml +++ b/libs/utils/pom.xml @@ -121,7 +121,6 @@ org.projectlombok lombok 1.18.36 - provided diff --git a/libs/utils/src/test/java/com/akto/runtime/TestSvcToSvcGraph.java b/libs/utils/src/test/java/com/akto/runtime/TestSvcToSvcGraph.java new file mode 100644 index 0000000000..c25032cfaa --- /dev/null +++ b/libs/utils/src/test/java/com/akto/runtime/TestSvcToSvcGraph.java @@ -0,0 +1,53 @@ +package com.akto.runtime; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.Test; + +import com.akto.MongoBasedTest; +import com.akto.data_actor.DataActor; +import com.akto.data_actor.DataActorFactory; +import com.akto.dto.graph.K8sDaemonsetGraphParams; +import com.akto.dto.graph.SvcToSvcGraph; +import com.akto.dto.graph.SvcToSvcGraphParams; + +public class TestSvcToSvcGraph extends MongoBasedTest { + + + private SvcToSvcGraphParams in(String serviceName) { + return new K8sDaemonsetGraphParams(serviceName, serviceName, "1324", "some_id", "1"); + } + + private SvcToSvcGraphParams out(String sourceServiceName, String targetServiceName) { + return new K8sDaemonsetGraphParams(targetServiceName, sourceServiceName, "1324", "some_id", "2"); + } + + @Test + public void testAlgo() { + DataActor dataActor = DataActorFactory.fetchInstance(); + SvcToSvcGraphManager svcToSvcGraphManager = SvcToSvcGraphManager.createFromEdgesAndNodes(dataActor); + + + svcToSvcGraphManager.processRecord(in("api.gateway.com")); + svcToSvcGraphManager.processRecord(in("api.details.com")); + svcToSvcGraphManager.processRecord(in("api.reviews.com")); + svcToSvcGraphManager.processRecord(in("api.google.com")); + svcToSvcGraphManager.processRecord(out("api.gateway.com", "api.details.com")); + svcToSvcGraphManager.processRecord(out("api.gateway.com", "api.reviews.com")); + svcToSvcGraphManager.processRecord(out("api.reviews.com", "api.google.com")); + + SvcToSvcGraph changes = svcToSvcGraphManager.updateWithNewDataAndReturnDelta(dataActor); + + changes.getNodes().forEach(node -> { + System.out.println("Node: " + node.getId()); + }); + + changes.getEdges().forEach(edge -> { + System.out.println("Edge: " + edge.getSource() + " -> " + edge.getTarget()); + }); + + assertEquals(4, changes.getNodes().size()); + assertEquals(3, changes.getEdges().size()); + } + +} From 4bfb68b0a295987897592bcce0768a469e09d61d Mon Sep 17 00:00:00 2001 From: Ankush Jain Date: Sat, 19 Apr 2025 11:52:15 -0700 Subject: [PATCH 7/7] draw graph for non-ip hosts only --- .../src/main/java/com/akto/runtime/parser/SampleParser.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/libs/utils/src/main/java/com/akto/runtime/parser/SampleParser.java b/libs/utils/src/main/java/com/akto/runtime/parser/SampleParser.java index f299d9b85e..c38833a6ab 100644 --- a/libs/utils/src/main/java/com/akto/runtime/parser/SampleParser.java +++ b/libs/utils/src/main/java/com/akto/runtime/parser/SampleParser.java @@ -69,7 +69,10 @@ public static HttpResponseParams parseSampleMessage(String message) throws Excep String processId = (String) json.get("process_id"); String socketId = (String) json.get("socket_id"); String daemonsetId = (String) json.get("daemonset_id"); - graphParams = new K8sDaemonsetGraphParams(hostNameList.get(0), processId, socketId, daemonsetId, direction); + String hostname = hostNameList.get(0); + if (hostname.charAt(0) >= 'a' && hostname.charAt(0) <= 'z') { + graphParams = new K8sDaemonsetGraphParams(hostNameList.get(0), processId, socketId, daemonsetId, direction); + } } }