From 0656aa57300ef720ea5a82069a6d33466e13210b Mon Sep 17 00:00:00 2001 From: Joakim Loxdal Date: Tue, 30 Sep 2025 17:03:36 +0200 Subject: [PATCH 1/8] Add max iter to cli args --- malsim/__main__.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/malsim/__main__.py b/malsim/__main__.py index 406ece09..1a5df9de 100644 --- a/malsim/__main__.py +++ b/malsim/__main__.py @@ -8,9 +8,9 @@ MalSimulator, MalSimulatorSettings, run_simulation, - load_scenario, + load_scenario ) -from .mal_simulator import TTCMode +from .mal_simulator import TTCMode, ITERATIONS_LIMIT logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -33,6 +33,10 @@ def main() -> None: '-s', '--seed', type=int, help="If set to a seed, simulator will use it as setting", ) + parser.add_argument( + '-m', '--max-iters', type=int, default=None, + help="Max number of steps in the simulation", + ) parser.add_argument( '-t', '--ttc-mode', type=int, help=( @@ -48,8 +52,9 @@ def main() -> None: scenario, MalSimulatorSettings( seed=args.seed, ttc_mode=TTCMode(args.ttc_mode), - attack_surface_skip_unnecessary=False - ) + attack_surface_skip_unnecessary=False, + ), + max_iter=args.max_iters or ITERATIONS_LIMIT ) if args.output_attack_graph: From 529b8e0766543903133b6c63d451f49b95652f07 Mon Sep 17 00:00:00 2001 From: Joakim Loxdal Date: Tue, 30 Sep 2025 17:04:27 +0200 Subject: [PATCH 2/8] Better log messages --- malsim/mal_simulator.py | 4 ++-- malsim/scenario.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/malsim/mal_simulator.py b/malsim/mal_simulator.py index 305af678..65e0451f 100644 --- a/malsim/mal_simulator.py +++ b/malsim/mal_simulator.py @@ -1004,7 +1004,7 @@ def _attacker_step( else: logger.warning( - "Attacker could not compromise %s", node.full_name + "Attacker could not compromise untraversable %s", node.full_name ) return successful_compromises, attempted_compromises @@ -1286,7 +1286,7 @@ def run_simulation( actions[agent_name] = [agent_action] print( f'Agent {agent_name} chose action: ' - f'{agent_action.full_name}' + f'{agent_action.full_name} ({agent_action.type})' ) # Store agent action diff --git a/malsim/scenario.py b/malsim/scenario.py index a899e90d..d386f8fb 100644 --- a/malsim/scenario.py +++ b/malsim/scenario.py @@ -464,7 +464,7 @@ def load_simulator_agents( if agent_class_name and agent_class_name not in agent_class_name_to_class: raise LookupError( f"Agent class '{agent_class_name}' not supported.\n" - f"Must be one of: {agent_class_name_to_class.values()}" + f"Must be one of: {list(agent_class_name_to_class.keys())}" ) if agent_type == AgentType.ATTACKER: From ca80e9ce60d813a110a370b106c1f503f29ae61c Mon Sep 17 00:00:00 2001 From: Joakim Loxdal Date: Tue, 30 Sep 2025 17:04:55 +0200 Subject: [PATCH 3/8] Add test for sandor path finding --- tests/agents/test_path_finders.py | 36 +++++++++++++++++++++++++------ tests/agents/test_ttc_avoider.py | 3 +-- 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/tests/agents/test_path_finders.py b/tests/agents/test_path_finders.py index 7434f0f4..3cf1092c 100644 --- a/tests/agents/test_path_finders.py +++ b/tests/agents/test_path_finders.py @@ -2,6 +2,7 @@ MalSimulator, MalSimulatorSettings, TTCMode ) from malsim.agents import get_shortest_path_to +from malsim.agents.utils.greedy_a_star.algo import greedy_a_star_attack from malsim.scenario import load_scenario def test_path_finding() -> None: @@ -32,9 +33,6 @@ def test_path_finding() -> None: def test_path_finding_ttc_lang() -> None: - r""" - - """ scenario_file = ( "tests/testdata/scenarios/ttc_lang_scenario.yml" ) @@ -56,8 +54,6 @@ def test_path_finding_ttc_lang() -> None: if n.type in ('or', 'and') } - assert sum(ttc_values.values()) == 2021 - sim.register_attacker('path_finder', {entry_point}, {goal}) path = get_shortest_path_to( @@ -70,4 +66,32 @@ def test_path_finding_ttc_lang() -> None: assert path for node in path: # Should only have picked the low ttc steps - assert ('easy' in node.full_name or node == goal) + assert ('hard' not in node.full_name or node == goal) + + +def test_sandor_path_finding_ttc_lang() -> None: + scenario_file = ( + "tests/testdata/scenarios/ttc_lang_scenario.yml" + ) + scenario = load_scenario(scenario_file) + + entry_point = scenario.attack_graph.get_node_by_full_name('Net1:easyAccess') + goal = scenario.attack_graph.get_node_by_full_name('DataD:read') + + # Run the greedy a star + path = greedy_a_star_attack(scenario.attack_graph, entry_point, goal) + + # Validate path + visited = {entry_point} + curr_node = None + for curr_node in path: + if curr_node.type == 'or' and curr_node not in visited: + assert any(p in visited for p in curr_node.parents), ( + f"Node {curr_node} was reached before any of its parents were" + ) + elif curr_node.type == 'and' and curr_node not in visited: + assert all(p in visited for p in curr_node.parents), ( + f"Node {curr_node} was reached before all of its parents" + ) + visited.add(curr_node) + assert curr_node == goal diff --git a/tests/agents/test_ttc_avoider.py b/tests/agents/test_ttc_avoider.py index 95e709f3..d1fa720d 100644 --- a/tests/agents/test_ttc_avoider.py +++ b/tests/agents/test_ttc_avoider.py @@ -34,8 +34,7 @@ def test_ttc_avoider() -> None: assert attacker_node # Should always pick the easy path or the goal - assert 'easy' in attacker_node.name or attacker_node == goal - + assert 'hard' not in attacker_node.name # Step actions = { attacker_agent_name: [attacker_node] if attacker_node else [] From cf91444f5caf2cb5bdb94b740ddea410d87ba675 Mon Sep 17 00:00:00 2001 From: Joakim Loxdal Date: Tue, 30 Sep 2025 17:05:29 +0200 Subject: [PATCH 4/8] Upgrade ttc lang --- tests/testdata/langs/ttc_lang.mal | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/tests/testdata/langs/ttc_lang.mal b/tests/testdata/langs/ttc_lang.mal index 113f2a00..6cba3c0a 100644 --- a/tests/testdata/langs/ttc_lang.mal +++ b/tests/testdata/langs/ttc_lang.mal @@ -24,15 +24,9 @@ category TestAssets{ asset Computer { # shutDown - -> easyAccess, - hardAccess + -> access - & easyAccess - -> datas.easyRead, - datas.hardRead, - networks.easyAccess, - networks.hardAccess - & hardAccess [VeryHardAndCertain] + & access -> datas.easyRead, datas.hardRead, networks.easyAccess, @@ -58,11 +52,9 @@ category TestAssets{ hardScan | easyAccess - -> computer.easyAccess, - computer.hardAccess + -> computer.access | hardAccess [VeryHardAndCertain] - -> computer.easyAccess, - computer.hardAccess + -> computer.access | easyScan -> vulns.easyExploit, vulns.hardExploit @@ -77,13 +69,10 @@ category TestAssets{ hardAssume | easyAssume - -> computer.easyAccess, - computer.hardAccess + -> computer.access | hardAssume [VeryHardAndCertain] - -> computer.easyAccess, - computer.hardAccess - + -> computer.access } asset SoftwareVuln { From 26dd34976e9a313921f7aabc5c7f3849b31ad690 Mon Sep 17 00:00:00 2001 From: Joakim Loxdal Date: Tue, 30 Sep 2025 17:05:55 +0200 Subject: [PATCH 5/8] Add sandor greedy a star --- malsim/agents/utils/greedy_a_star/__init__,py | 0 malsim/agents/utils/greedy_a_star/algo.py | 229 ++++++++++++++++++ malsim/agents/utils/greedy_a_star/utils.py | 65 +++++ 3 files changed, 294 insertions(+) create mode 100644 malsim/agents/utils/greedy_a_star/__init__,py create mode 100644 malsim/agents/utils/greedy_a_star/algo.py create mode 100644 malsim/agents/utils/greedy_a_star/utils.py diff --git a/malsim/agents/utils/greedy_a_star/__init__,py b/malsim/agents/utils/greedy_a_star/__init__,py new file mode 100644 index 00000000..e69de29b diff --git a/malsim/agents/utils/greedy_a_star/algo.py b/malsim/agents/utils/greedy_a_star/algo.py new file mode 100644 index 00000000..6fda7fd1 --- /dev/null +++ b/malsim/agents/utils/greedy_a_star/algo.py @@ -0,0 +1,229 @@ +import heapq +import logging +from copy import copy +from itertools import count +from typing import Callable, Iterable, List, Tuple + +from maltoolbox.attackgraph import AttackGraph, AttackGraphNode + +from .utils import ttc_map, filter_defense, NoPath, merge_paths + + +def naive_a_star( + attack_graph: AttackGraph, + source: AttackGraphNode, + target: AttackGraphNode, + heuristic: Callable[[AttackGraphNode, AttackGraphNode], float] = lambda u, v: 0, + unusable_nodes: List[AttackGraphNode] = [], +) -> Tuple[List[AttackGraphNode], float]: + """Returns a list of nodes in a heuristically shortest path between source and target using the A* ("A-star") algorithm. + + Parameters + ---------- + attack_graph : AttackGraph + + source : node + Starting node for path. + + target : int + Target node for path. + + heuristic : function + A function to evaluate the estimate of the distance from the a node to the target. The function takes two nodes arguments and must return a number. + The default heuristic is h=0 which is same as Dijkstra's algorithm. + + Raises + ------ + ValueError + If no path exists between source and target. + + Adapted from NetworkX and Sandor Berglund's thesis. + """ + if source.id not in attack_graph.nodes or target.id not in attack_graph.nodes: + raise ValueError(f"Either source {source} or target {target} is not in attack graph") + + def cost(u, v): + return ttc_map(v.ttc) + 1 + + unusable_ids = {node.id for node in unusable_nodes} + + # g_score[node.id] = the best-known cost from source to node + g_score = {} + # f_score[node.id] = g_score[node.id] + heuristic(node, target) + f_score = {} + # came_from[node.id] = the node we came from on the best path from source + came_from = {} + + for node_id in attack_graph.nodes: + g_score[node_id] = float("inf") + f_score[node_id] = float("inf") + g_score[source.id] = 0 + f_score[source.id] = heuristic(source, target) + + # Priority queue of (f_score, tie-break counter, node) + # Tie-break counter ensures no direct comparison of nodes + queue = [] + c = count() + heapq.heappush(queue, (f_score[source.id], next(c), source)) + + closed_set = set() + + while queue: + _, __, current = heapq.heappop(queue) + + # If we reached the target, reconstruct path + if current == target: + return _reconstruct_path(came_from, current), g_score[current.id] + + if current.id in closed_set or current.id in unusable_ids: + continue + closed_set.add(current.id) + + for neighbor in current.children: + if neighbor.id in closed_set or neighbor.id in unusable_ids: + continue + tentative_g = g_score[current.id] + cost(current, neighbor) + if tentative_g < g_score[neighbor.id]: + came_from[neighbor.id] = current + g_score[neighbor.id] = tentative_g + f_score[neighbor.id] = tentative_g + heuristic(neighbor, target) + heapq.heappush(queue, (f_score[neighbor.id], next(c), neighbor)) + + raise NoPath(target, f"Node {target.full_name} not reachable from {source.full_name}") + +def _reconstruct_path(came_from, current): + """Reconstructs path using came_from after we pop target from the queue.""" + path = [current] + while current.id in came_from: + current = came_from[current.id] + path.append(current) + path.reverse() + return path + + + +def correct_and_steps( + attack_graph: AttackGraph, + source: AttackGraphNode, + naive_path: List[AttackGraphNode], + unused_entry_points: List[AttackGraphNode], + unusable_nodes: List[AttackGraphNode], +): + new_path = naive_path + + def node_collection_difference( + a: Iterable[AttackGraphNode], b: Iterable[AttackGraphNode] + ) -> List[AttackGraphNode]: + ids = set(node.id for node in a).difference(node.id for node in b) + return [node for node in a if node.id in ids] + + for i, node in enumerate(new_path): + if node.type == "and" and node.id != source.id: + # For each AND-step parent, not in the path + for and_parent in node_collection_difference( + filter_defense(node.parents), new_path[:i] + ): + min_path, min_ttc = None, float("inf") + for sub_source in new_path[:i] + unused_entry_points: + try: + p, c = naive_a_star(attack_graph, sub_source, and_parent, unusable_nodes=unusable_nodes) + if c < min_ttc: + min_path, min_ttc = p, c + except NoPath: + pass + if not min_path: + raise NoPath( + node, f"No path to AND-step parent: {and_parent.full_name} for {node.full_name}" + ) + new_path = merge_paths(new_path, min_path, i) + + return new_path + + +def single_source_a_star( + attack_graph: AttackGraph, + source: AttackGraphNode, + target: AttackGraphNode, + other_sources: List[AttackGraphNode], +) -> List[AttackGraphNode]: + path, _ = naive_a_star(attack_graph, source, target) + unusable_nodes = [] + + def check_and_steps(path: List[AttackGraphNode]) -> bool: + and_steps = [ + node for node in path + if node.type == "and" + and node.id != source.id + and node.id not in { + node.id for node in other_sources + } + ] + return all( + parent in path + for node in and_steps + for parent in filter_defense(node.parents) + ) + + while not check_and_steps(other_sources + path): + try: + path = correct_and_steps( + attack_graph, source, path, other_sources, unusable_nodes + ) + except NoPath as unreachable: + logging.debug( + unreachable.args[0] + ) + unusable_nodes.append(unreachable.node) + old_path = copy(path) + path, _ = naive_a_star( + attack_graph, source, target, unusable_nodes=unusable_nodes + ) + if path == old_path: + raise unreachable + return path + + +def single_target_a_star( + attack_graph: AttackGraph, + sources: AttackGraphNode | List[AttackGraphNode], + target: AttackGraphNode, +) -> List[AttackGraphNode]: + if isinstance(sources, AttackGraphNode): + sources = [sources] + + ret_path, min_ttc = [], float("inf") + + for i, source in enumerate(sources): + try: + path = single_source_a_star( + attack_graph, source, target, sources[:i] + sources[i + 1 :] + ) + path_ttc = sum(ttc_map(node.ttc) for node in path) + if path_ttc < min_ttc: + ret_path, min_ttc = path, path_ttc + except NoPath: + pass + + if len(ret_path) == 0: + raise NoPath(target, f"No path to target: {target}") + + return ret_path + + +def greedy_a_star_attack( + attack_graph: AttackGraph, + sources: AttackGraphNode | List[AttackGraphNode], + targets: AttackGraphNode | List[AttackGraphNode], +) -> List[AttackGraphNode]: + if isinstance(sources, AttackGraphNode): + sources = [sources] + if isinstance(targets, AttackGraphNode): + targets = [targets] + + ret_path = [] + + for target in targets: + new_path = single_target_a_star(attack_graph, sources, target) + ret_path = merge_paths(ret_path, new_path, len(ret_path)) + + return ret_path diff --git a/malsim/agents/utils/greedy_a_star/utils.py b/malsim/agents/utils/greedy_a_star/utils.py new file mode 100644 index 00000000..c37e86e2 --- /dev/null +++ b/malsim/agents/utils/greedy_a_star/utils.py @@ -0,0 +1,65 @@ +from typing import Collection, Dict, Iterable, List, Optional +from maltoolbox.attackgraph import AttackGraphNode, AttackGraph +from maltoolbox.model import Model +import networkx as nx + +def ttc_map(ttc: Optional[Dict]): + if not ttc: + return 0 + elif ttc["name"] == "VeryHardAndUncertain": + return 50 + elif ttc["name"] == "VeryHardAndCertain": + return 25 + elif ttc["name"] == "HardAndUncertain": + return 5 + elif ttc["name"] == "EasyAndCertain": + return 1 + elif ttc["name"] == "Exponential": + return 1 / ttc["arguments"][0] + +def filter_defense(c: Iterable[AttackGraphNode]) -> List[AttackGraphNode]: + return [node for node in c if node.type != "defense"] + + +class NoPath(Exception): + def __init__(self, node: AttackGraphNode, *args: object) -> None: + super().__init__(*args) + self.node = node + +def merge_paths( + a: List[AttackGraphNode], b: List[AttackGraphNode], index: int +) -> List[AttackGraphNode]: + path_extension = [node for node in b if node not in a] + return a[:index] + path_extension + a[index:] + +def to_nx(nodes: AttackGraph | Iterable[AttackGraphNode]): + if isinstance(nodes, AttackGraph): + nodes = list(nodes.nodes.values()) + G = nx.DiGraph() + + for node in nodes: + G.add_node(node.id, **node.to_dict()) + G.nodes[node.id]["full_name"] = node.full_name + + edges = [(node.id, child.id) for node in nodes for child in node.children] + edges += [(parent.id, node.id) for node in nodes for parent in node.parents] + G.add_edges_from(edges) + + return G + + +def model_to_nx(model: Model) -> nx.Graph: + d = model._to_dict() + assets = d["assets"] + + G = nx.Graph() + + for id, vals in assets.items(): + G.add_node(id, **vals) + + for id, vals in assets.items(): + neighbour_ids = [id for d in vals["associated_assets"].values() for id in d.keys() ] + for neighbour_id in neighbour_ids: + G.add_edge(id, neighbour_id) + + return G \ No newline at end of file From 63d26149e7fbbbb42d6d58abc9c2da1884098809 Mon Sep 17 00:00:00 2001 From: Joakim Loxdal Date: Tue, 30 Sep 2025 17:08:16 +0200 Subject: [PATCH 6/8] Update ttclang scenario --- tests/testdata/scenarios/ttc_lang_scenario.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/testdata/scenarios/ttc_lang_scenario.yml b/tests/testdata/scenarios/ttc_lang_scenario.yml index 6660aeb7..b20e95b0 100644 --- a/tests/testdata/scenarios/ttc_lang_scenario.yml +++ b/tests/testdata/scenarios/ttc_lang_scenario.yml @@ -175,7 +175,7 @@ agents: Attacker: type: attacker entry_points: - - ComputerA:easyAccess + - ComputerA:access goals: - DataD:read agent_class: ShortestPathAttacker From 178aabe861170a057d514db7489dd999f7e7a071 Mon Sep 17 00:00:00 2001 From: Joakim Loxdal Date: Tue, 30 Sep 2025 17:08:50 +0200 Subject: [PATCH 7/8] Add comment to failing test --- tests/agents/test_path_finders.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/agents/test_path_finders.py b/tests/agents/test_path_finders.py index 3cf1092c..3a9d97fb 100644 --- a/tests/agents/test_path_finders.py +++ b/tests/agents/test_path_finders.py @@ -81,7 +81,7 @@ def test_sandor_path_finding_ttc_lang() -> None: # Run the greedy a star path = greedy_a_star_attack(scenario.attack_graph, entry_point, goal) - # Validate path + # Validate path - TODO: this fails! visited = {entry_point} curr_node = None for curr_node in path: From 28db997664e574c8a57bffbbc0c678c5a472704a Mon Sep 17 00:00:00 2001 From: Joakim Loxdal Date: Wed, 1 Oct 2025 14:40:25 +0200 Subject: [PATCH 8/8] Update ttc lang, prune before sandor path finding --- tests/agents/test_path_finders.py | 12 +++++++++-- tests/testdata/langs/ttc_lang.mal | 33 +++++++------------------------ 2 files changed, 17 insertions(+), 28 deletions(-) diff --git a/tests/agents/test_path_finders.py b/tests/agents/test_path_finders.py index 3a9d97fb..1d1e20db 100644 --- a/tests/agents/test_path_finders.py +++ b/tests/agents/test_path_finders.py @@ -4,6 +4,7 @@ from malsim.agents import get_shortest_path_to from malsim.agents.utils.greedy_a_star.algo import greedy_a_star_attack from malsim.scenario import load_scenario +from malsim.graph_processing import prune_unviable_and_unnecessary_nodes def test_path_finding() -> None: r""" @@ -74,12 +75,19 @@ def test_sandor_path_finding_ttc_lang() -> None: "tests/testdata/scenarios/ttc_lang_scenario.yml" ) scenario = load_scenario(scenario_file) + sim = MalSimulator.from_scenario(scenario) - entry_point = scenario.attack_graph.get_node_by_full_name('Net1:easyAccess') - goal = scenario.attack_graph.get_node_by_full_name('DataD:read') + prune_unviable_and_unnecessary_nodes( + scenario.attack_graph, + sim._viability_per_node, + sim._necessity_per_node + ) + entry_point = sim.get_node('Net1:easyAccess') + goal = sim.get_node('UserA:easyAssume') # Run the greedy a star path = greedy_a_star_attack(scenario.attack_graph, entry_point, goal) + assert path # Validate path - TODO: this fails! visited = {entry_point} diff --git a/tests/testdata/langs/ttc_lang.mal b/tests/testdata/langs/ttc_lang.mal index 6cba3c0a..13adfa02 100644 --- a/tests/testdata/langs/ttc_lang.mal +++ b/tests/testdata/langs/ttc_lang.mal @@ -4,9 +4,6 @@ category TestAssets{ asset Network { - # shutDown - -> easyAccess, - hardAccess | easyAccess -> toNets.easyAccess, @@ -23,10 +20,8 @@ category TestAssets{ } asset Computer { - # shutDown - -> access - & access + | access -> datas.easyRead, datas.hardRead, networks.easyAccess, @@ -45,28 +40,20 @@ category TestAssets{ } asset Software { - # remove - -> easyAccess, - hardAccess, - easyScan, - hardScan | easyAccess -> computer.access | hardAccess [VeryHardAndCertain] -> computer.access - | easyScan + & easyScan -> vulns.easyExploit, vulns.hardExploit - | hardScan [VeryHardAndCertain] + & hardScan [VeryHardAndCertain] -> vulns.easyExploit, vulns.hardExploit } asset User { - # lockout - -> easyAssume, - hardAssume | easyAssume -> computer.access @@ -76,26 +63,20 @@ category TestAssets{ } asset SoftwareVuln { - # patch - -> easyExploit, - hardExploit - | easyExploit + & easyExploit -> software.easyAccess, software.hardAccess - | hardExploit [VeryHardAndCertain] + & hardExploit [VeryHardAndCertain] -> software.easyAccess, software.hardAccess } asset Data { - # remove - -> easyRead, - hardRead - | easyRead + & easyRead -> read - | hardRead [VeryHardAndCertain] + & hardRead [VeryHardAndCertain] -> read | read }