diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index cb298aad..bdf1b4b7 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -24,7 +24,7 @@ jobs: - name: Lint with Black uses: rickstaa/action-black@v1 with: - black_args: ". --check" + black_args: ". --check --diff" - name: Check types run: | mypy biobalm @@ -52,12 +52,6 @@ jobs: run: | wget https://github.com/pauleve/pint/releases/download//2019-05-24/pint_2019-05-24_amd64.deb sudo apt install ./pint_2019-05-24_amd64.deb - - name: Fetch and install Mole - run: | - wget http://www.lsv.fr/~schwoon/tools/mole/mole-140428.tar.gz - tar -xvf mole-140428.tar.gz - (cd ./mole-140428 && make) - (cd ./mole-140428 && pwd >> $GITHUB_PATH) - name: Install repo dependencies run: pip install -r requirements.txt - name: Run doctests @@ -81,12 +75,6 @@ jobs: run: | wget https://github.com/pauleve/pint/releases/download//2019-05-24/pint_2019-05-24_amd64.deb sudo apt install ./pint_2019-05-24_amd64.deb - - name: Fetch and install Mole - run: | - wget http://www.lsv.fr/~schwoon/tools/mole/mole-140428.tar.gz - tar -xvf mole-140428.tar.gz - (cd ./mole-140428 && make) - (cd ./mole-140428 && pwd >> $GITHUB_PATH) - name: Install repo dependencies run: pip install -r requirements.txt - name: Install pytest @@ -117,12 +105,6 @@ jobs: run: | wget https://github.com/pauleve/pint/releases/download//2019-05-24/pint_2019-05-24_amd64.deb sudo apt install ./pint_2019-05-24_amd64.deb - - name: Fetch and install Mole - run: | - wget http://www.lsv.fr/~schwoon/tools/mole/mole-140428.tar.gz - tar -xvf mole-140428.tar.gz - (cd ./mole-140428 && make) - (cd ./mole-140428 && pwd >> $GITHUB_PATH) - name: Install repo dependencies run: pip install -r requirements.txt - name: Install pytest diff --git a/benchmark/.gitignore b/benchmark/.gitignore new file mode 100644 index 00000000..55f654d7 --- /dev/null +++ b/benchmark/.gitignore @@ -0,0 +1,6 @@ + +# Ignore benchmark runs. +_run_* + +# Ignore generated succession diagram files. +_sd_expand_* \ No newline at end of file diff --git a/benchmark/bench_sd_expand_bfs.py b/benchmark/bench_sd_expand_bfs.py index 1f2681ec..22d4bccb 100644 --- a/benchmark/bench_sd_expand_bfs.py +++ b/benchmark/bench_sd_expand_bfs.py @@ -1,10 +1,12 @@ from biodivine_aeon import BooleanNetwork -from biobalm.SuccessionDiagram import SuccessionDiagram +from biobalm import SuccessionDiagram import sys -import biobalm.SuccessionDiagram +import os +import pickle +import biobalm # Print progress and succession diagram size. -biobalm.SuccessionDiagram.DEBUG = True +biobalm.succession_diagram.DEBUG = True NODE_LIMIT = 1_000_000 DEPTH_LIMIT = 10_000 @@ -17,6 +19,11 @@ fully_expanded = sd.expand_bfs(bfs_level_limit=DEPTH_LIMIT, size_limit=NODE_LIMIT) assert fully_expanded +model_name = os.path.basename(sys.argv[1]) +sd_name = os.path.splitext(model_name)[0] + ".pickle" +with open(f"./_sd_expand_bfs/{sd_name}", "wb") as handle: + pickle.dump(sd, handle) + print(f"Succession diagram size:", len(sd)) print(f"Minimal traps:", len(sd.minimal_trap_spaces())) diff --git a/benchmark/bench_sd_expand_dfs.py b/benchmark/bench_sd_expand_dfs.py index 74578ccc..f2a5f6dc 100644 --- a/benchmark/bench_sd_expand_dfs.py +++ b/benchmark/bench_sd_expand_dfs.py @@ -1,10 +1,12 @@ from biodivine_aeon import BooleanNetwork -from biobalm.SuccessionDiagram import SuccessionDiagram +from biobalm import SuccessionDiagram import sys -import biobalm.SuccessionDiagram +import os +import pickle +import biobalm # Print progress and succession diagram size. -biobalm.SuccessionDiagram.DEBUG = True +biobalm.succession_diagram.DEBUG = True NODE_LIMIT = 1_000_000 DEPTH_LIMIT = 10_000 @@ -17,6 +19,11 @@ fully_expanded = sd.expand_dfs(dfs_stack_limit=DEPTH_LIMIT, size_limit=NODE_LIMIT) assert fully_expanded +model_name = os.path.basename(sys.argv[1]) +sd_name = os.path.splitext(model_name)[0] + ".pickle" +with open(f"./_sd_expand_dfs/{sd_name}", "wb") as handle: + pickle.dump(sd, handle) + print(f"Succession diagram size:", len(sd)) print(f"Minimal traps:", len(sd.minimal_trap_spaces())) diff --git a/benchmark/bench_sd_expand_min.py b/benchmark/bench_sd_expand_min.py index daa6ef7b..dc12f48f 100644 --- a/benchmark/bench_sd_expand_min.py +++ b/benchmark/bench_sd_expand_min.py @@ -1,10 +1,12 @@ from biodivine_aeon import BooleanNetwork -from biobalm.SuccessionDiagram import SuccessionDiagram +from biobalm import SuccessionDiagram import sys -import biobalm.SuccessionDiagram +import os +import pickle +import biobalm # Print progress and succession diagram size. -biobalm.SuccessionDiagram.DEBUG = True +biobalm.succession_diagram.DEBUG = True NODE_LIMIT = 1_000_000 DEPTH_LIMIT = 10_000 @@ -16,8 +18,13 @@ sd = SuccessionDiagram(bn) sd.expand_minimal_spaces() +model_name = os.path.basename(sys.argv[1]) +sd_name = os.path.splitext(model_name)[0] + ".pickle" +with open(f"./_sd_expand_min/{sd_name}", "wb") as handle: + pickle.dump(sd, handle) + print(f"Succession diagram size:", len(sd)) print(f"Minimal traps:", len(sd.minimal_trap_spaces())) print("size, expanded, minimal") -print(f"{len(sd)}, {len(list(sd.expanded_ids()))}, {len(sd.minimal_trap_spaces())}") +print(f"{len(sd)},{len(list(sd.expanded_ids()))},{len(sd.minimal_trap_spaces())}") diff --git a/benchmark/bench_sd_expand_scc.py b/benchmark/bench_sd_expand_scc.py index 7d69f9a4..a56cdc5a 100644 --- a/benchmark/bench_sd_expand_scc.py +++ b/benchmark/bench_sd_expand_scc.py @@ -1,11 +1,12 @@ -import sys from biodivine_aeon import BooleanNetwork -from biobalm.SuccessionDiagram import SuccessionDiagram -from biobalm._sd_algorithms.expand_source_SCCs import expand_source_SCCs -import biobalm.SuccessionDiagram +from biobalm import SuccessionDiagram +import sys +import os +import pickle +import biobalm # Print progress and succession diagram size. -biobalm.SuccessionDiagram.DEBUG = True +biobalm.succession_diagram.DEBUG = True NODE_LIMIT = 1_000_000 DEPTH_LIMIT = 10_000 @@ -18,8 +19,13 @@ fully_expanded = expand_source_SCCs(sd, check_maa=False) assert fully_expanded +model_name = os.path.basename(sys.argv[1]) +sd_name = os.path.splitext(model_name)[0] + ".pickle" +with open(f"./_sd_expand_scc/{sd_name}", "wb") as handle: + pickle.dump(sd, handle) + print(f"Succession diagram size:", len(sd)) print(f"Minimal traps:", len(sd.minimal_trap_spaces())) print("size, expanded, minimal") -print(f"{len(sd)}, {len(list(sd.expanded_ids()))}, {len(sd.minimal_trap_spaces())}") +print(f"{len(sd)},{len(list(sd.expanded_ids()))},{len(sd.minimal_trap_spaces())}") diff --git a/biobalm/_pint_reachability.py b/biobalm/_pint_reachability.py new file mode 100644 index 00000000..79732116 --- /dev/null +++ b/biobalm/_pint_reachability.py @@ -0,0 +1,146 @@ +""" +A module which provides utility methods for reachability analysis using Pint. + +Note that in biobalm, we try to treat pint as an *optional* dependency. Hence, if you +want to use any of these methods, make sure to import the module in a way that fails +safely when Pint is not installed. +""" + +from __future__ import annotations + +from functools import reduce +from typing import TYPE_CHECKING, cast, Iterable + +from networkx import DiGraph # type: ignore +from pypint import Goal, InMemoryModel # type:ignore + +from biobalm.petri_net_translation import ( + place_to_variable, + optimized_recursive_dnf_generator, +) +from biobalm.types import BooleanSpace, SuccessionDiagramConfiguration + +if TYPE_CHECKING: + from biodivine_aeon import Bdd + + +def pint_reachability( + petri_net: DiGraph, + initial_state: BooleanSpace, + target_states: Bdd, + config: SuccessionDiagramConfiguration, +) -> bool: + """ + Use Pint to check if a given `initial_state` can possibly reach some state + in the `target_states` BDD. + + If the reachability analysis is inconclusive, the method + raises a `RuntimeError`. + """ + if target_states.is_false(): + return False # Cannot reach a stat in an empty set. + + # Build a Pint model through an automata network and copy + # over the initial condition. + pint_model = InMemoryModel(_petri_net_as_automata_network(petri_net)) + for var, level in initial_state.items(): + pint_model.initial_state[var] = level + + goal = _pint_build_symbolic_goal(target_states, config) + + def failed(*_): + raise RuntimeError("Cannot verify.") + + return pint_model.reachability(goal=goal, fallback=failed) # type: ignore + + +def _pint_build_symbolic_goal( + states: Bdd, config: SuccessionDiagramConfiguration +) -> Goal: + """ + A helper method which (very explicitly) converts a set of states + represented through a BDD into a Pint `Goal`. + + Note that if `GOAL_SIZE_LIMIT` is exceeded, a partial goal is returned + that may not cover all the states in the argument `Bdd`. This avoids + exceeding the argument list size limit, but introduces additional source + of incompleteness into the reachability process. + """ + assert not states.is_false() + + goals: list[Goal] = [] + limit = config["pint_goal_size_limit"] + for clause in optimized_recursive_dnf_generator(states): + named_clause = { + states.__ctx__().get_variable_name(var): int(value) + for var, value in clause.items() + } + + limit -= len(named_clause) + if limit < 0: + # If the goal is too large to be passed as a command line argument, + # break here and don't continue. This is not ideal but I'm not sure + # how to fix this other than modifying `pint` itself. + if config["debug"]: + print( + "WARNING: `pint` goal size limit exceeded. A partial goal is used." + ) + break + + goal_atoms = [f"{var}={level}" for var, level in named_clause.items()] + goals.append(Goal(",".join(goal_atoms))) + + return reduce(lambda a, b: a | b, goals) + + +def _petri_net_as_automata_network(petri_net: DiGraph) -> str: + """ + Takes a Petri net which was created by implicant encoding from a Boolean network, + and builds an automata network file (`.an`) compatible with the Pint tool. + """ + automata_network = "" + + # Go through all PN places and save them as model variables. + variable_set: set[str] = set() + for place, kind in petri_net.nodes(data="kind"): # type: ignore + if kind != "place": + continue + variable_set.add(place_to_variable(place)[0]) # type: ignore[reportUnknownArgumentType] # noqa + variables = sorted(variable_set) + + # Declare all variables with 0/1 domains. + for var in variables: + automata_network += f'"{var}" [0, 1]\n' + + for transition, kind in petri_net.nodes(data="kind"): # type: ignore + if kind != "transition": + continue + + predecessors = set(cast(Iterable[str], petri_net.predecessors(transition))) # type: ignore + successors = set(cast(Iterable[str], petri_net.successors(transition))) # type: ignore + + # The value under modification is the only + # value that is different between successors and predecessors. + source_place = next(iter(predecessors - successors)) + target_place = next(iter(successors - predecessors)) + + (s_var, s_level) = place_to_variable(source_place) + (t_var, t_level) = place_to_variable(target_place) + assert s_var == t_var + + # The remaining places represent the necessary conditions. + # Here, we transform them into a text format. + condition_places = sorted(predecessors.intersection(successors)) + condition_tuples = [place_to_variable(p) for p in condition_places] + conditions = [f'"{var}"={int(level)}' for var, level in condition_tuples] + + # A pint rule consists of a variable name, value transition, + # and a list of necessary conditions for the transition (if any). + if len(conditions) == 0: + rule = f'"{s_var}" {int(s_level)} -> {int(t_level)}\n' + else: + rule = f"\"{s_var}\" {int(s_level)} -> {int(t_level)} when {' and '.join(conditions)}\n" + + automata_network += rule + + return automata_network diff --git a/biobalm/_sd_algorithms/compute_attractor_seeds.py b/biobalm/_sd_algorithms/compute_attractor_seeds.py deleted file mode 100644 index 4853bb83..00000000 --- a/biobalm/_sd_algorithms/compute_attractor_seeds.py +++ /dev/null @@ -1,88 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING - -from biobalm.symbolic_utils import state_list_to_bdd - -if TYPE_CHECKING: - from biobalm.succession_diagram import SuccessionDiagram - -import biobalm -import biobalm.succession_diagram -from biobalm.motif_avoidant import detect_motif_avoidant_attractors, make_retained_set -from biobalm.trappist_core import compute_fixed_point_reduced_STG -from biobalm.types import BooleanSpace - - -def compute_attractor_seeds( - sd: SuccessionDiagram, - node_id: int, -) -> list[BooleanSpace]: - """ - Compute the list of vertices such that each attractor within the subspace of - the given `node_id` is covered by exactly one vertex. - - If the node is a stub, the result covers the whole subspace. If the node is - expanded, the result only covers the "immediate" subspace without the - subspaces of the child nodes. - """ - - if biobalm.succession_diagram.DEBUG: - print(f"[{node_id}] Start computing attractor seeds.") - - node_space = sd.node_data(node_id)["space"] - - if len(node_space) == sd.network.variable_count(): - # This node is a fixed-point. - return [node_space] - - # Compute the list of child spaces if the node is expanded. Otherwise - # "pretend" that there are no children. - child_spaces = [] - if sd.node_data(node_id)["expanded"]: - child_spaces = [sd.node_data(s)["space"] for s in sd.node_successors(node_id)] - - # Fix everything in the NFVS to zero, as long as - # it isn't already fixed by our `node_space`. - # - # We add the whole node space to the retain set because we know - # the space is a trap and this will remove the corresponding unnecessary - # Petri net transitions. - retained_set = make_retained_set( - sd.symbolic, sd.node_nfvs(node_id), node_space, child_spaces - ) - - if len(retained_set) == sd.network.variable_count() and len(child_spaces) == 0: - # There is only a single attractor remaining here, - # and its "seed" is the retained set. - return [retained_set] - - terminal_restriction_space = state_list_to_bdd( - sd.symbolic.symbolic_context(), child_spaces - ).l_not() - - candidate_seeds = compute_fixed_point_reduced_STG( - sd.petri_net, - retained_set, - ensure_subspace=node_space, - avoid_subspaces=child_spaces, - ) - - if biobalm.succession_diagram.DEBUG: - print(f"[{node_id}] Found {len(candidate_seeds)} seed candidates.") - - if len(candidate_seeds) == 1 and len(child_spaces) == 0: - # If this is a (non-strict) minimal trap and there is only one seed, - # the seed must be valid. - return candidate_seeds - else: - attractors = detect_motif_avoidant_attractors( - sd.symbolic, - sd.petri_net, - candidate_seeds, - terminal_restriction_space, - max_iterations=1000, - is_in_an_mts=len(child_spaces) == 0, - ) - - return attractors diff --git a/biobalm/_sd_algorithms/expand_attractor_seeds.py b/biobalm/_sd_algorithms/expand_attractor_seeds.py index 25440f67..b0d85dc8 100644 --- a/biobalm/_sd_algorithms/expand_attractor_seeds.py +++ b/biobalm/_sd_algorithms/expand_attractor_seeds.py @@ -5,11 +5,11 @@ if TYPE_CHECKING: from biobalm.succession_diagram import SuccessionDiagram -import biobalm -import biobalm.succession_diagram -from biobalm.motif_avoidant import make_retained_set +from biobalm.types import BooleanSpace from biobalm.space_utils import intersect from biobalm.trappist_core import compute_fixed_point_reduced_STG +from biobalm._sd_attractors.attractor_candidates import make_heuristic_retained_set +from biodivine_aeon import AsynchronousGraph def expand_attractor_seeds(sd: SuccessionDiagram, size_limit: int | None = None): @@ -24,7 +24,7 @@ def expand_attractor_seeds(sd: SuccessionDiagram, size_limit: int | None = None) # motif-avoidant attractors. sd.expand_minimal_spaces(size_limit) - if biobalm.succession_diagram.DEBUG: + if sd.config["debug"]: print( "Minimal trap space expansion finished. Proceeding to attractor expansion." ) @@ -69,21 +69,33 @@ def expand_attractor_seeds(sd: SuccessionDiagram, size_limit: int | None = None) # Now, we need to asses if the next successor has some candidate states which # are not covered by the already expanded children. - successor_space = sd.node_data(successors[-1])["space"] - retained_set = make_retained_set( - sd.symbolic, sd.node_nfvs(node), successor_space - ) + s = successors[-1] + + successor_space = sd.node_data(s)["space"] + successor_bn = sd.node_percolated_network(s, compute=True) + successor_nfvs = sd.node_percolated_nfvs(s, compute=True) + successor_pn = sd.node_percolated_petri_net(s, compute=True) + successor_graph = AsynchronousGraph(successor_bn) avoid_or_none = [ intersect(successor_space, child) for child in expanded_motifs ] avoid = [x for x in avoid_or_none if x is not None] + avoid_restricted: list[BooleanSpace] = [] + for x in avoid: + y: BooleanSpace = { + var: val for (var, val) in x.items() if var not in successor_space + } + avoid_restricted.append(y) + + retained_set = make_heuristic_retained_set( + successor_graph, successor_nfvs, avoid + ) successor_seeds = compute_fixed_point_reduced_STG( - sd.petri_net, + successor_pn, retained_set, - ensure_subspace=successor_space, - avoid_subspaces=avoid, + avoid_subspaces=avoid_restricted, solution_limit=1, ) @@ -94,7 +106,7 @@ def expand_attractor_seeds(sd: SuccessionDiagram, size_limit: int | None = None) successors.pop() continue - if biobalm.succession_diagram.DEBUG: + if sd.config["debug"]: print( f"[{node}] Found successor with new attractor candidate seeds. Expand node {successors[-1]}." ) @@ -103,7 +115,7 @@ def expand_attractor_seeds(sd: SuccessionDiagram, size_limit: int | None = None) if len(successors) == 0: # Everything is done for this `node` and we can continue to the next one. - if biobalm.succession_diagram.DEBUG: + if sd.config["debug"]: print(f"[{node}] Finished node attractor expansion.") continue diff --git a/biobalm/_sd_algorithms/expand_source_SCCs.py b/biobalm/_sd_algorithms/expand_source_SCCs.py index f9025a6b..55ca8c00 100644 --- a/biobalm/_sd_algorithms/expand_source_SCCs.py +++ b/biobalm/_sd_algorithms/expand_source_SCCs.py @@ -77,7 +77,9 @@ def expand_source_SCCs( next_level.append(sd._ensure_node(root, sub_space)) # type: ignore sd.node_data(root)["expanded"] = True - sd.node_data(root)["attractors"] = [] # no need to look for attractors here + sd.node_data(root)[ + "attractor_seeds" + ] = [] # no need to look for attractors here current_level = next_level next_level = [] @@ -132,7 +134,9 @@ def expand_source_SCCs( for node_id in final_level: # These assertions should be unnecessary, but just to be sure. assert not sd.node_data(node_id)["expanded"] # expand nodes from here - assert sd.node_data(node_id)["attractors"] is None # check attractors from here + assert ( + sd.node_data(node_id)["attractor_seeds"] is None + ) # check attractors from here # restore this once we allow all expansion algorithms to expand from a node # expander(sd, node_id) diff --git a/biobalm/_sd_attractors/__init__.py b/biobalm/_sd_attractors/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/biobalm/_sd_attractors/attractor_candidates.py b/biobalm/_sd_attractors/attractor_candidates.py new file mode 100644 index 00000000..a0de3c91 --- /dev/null +++ b/biobalm/_sd_attractors/attractor_candidates.py @@ -0,0 +1,755 @@ +""" +Provides functions that are used to identify and reduce the set of attractor candidate states. + +Note that most of these functions assume they are operating on the network or petri net +percolated to the subspace of the relevant node. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Literal, cast + +if TYPE_CHECKING: + from biobalm.succession_diagram import SuccessionDiagram + from biobalm.types import BooleanSpace + from networkx import DiGraph # type: ignore + +import random +import biobalm +from biodivine_aeon import Bdd, AsynchronousGraph, BddVariable +from biobalm.trappist_core import compute_fixed_point_reduced_STG +from biobalm.symbolic_utils import state_list_to_bdd, valuation_to_state, state_to_bdd + +try: + pint_available = True + import biobalm._pint_reachability +except ModuleNotFoundError: + pint_available = False + + +def compute_attractor_candidates( + sd: SuccessionDiagram, + node_id: int, + greedy_asp_minification: bool, + simulation_minification: bool, + pint_minification: bool, +) -> list[BooleanSpace]: + """ + Compute an optimized list of candidate states that is guaranteed + to cover every attractor in the specified SD node. + + It should hold that for every attractor in the node's sub-space outside + of the child sub-spaces (if known), the list contains at least one state + from this attractor. However, it is not guaranteed that each candidate + covers some attractor, so a non-empty list of candidates can + still correspond to an empty list of attractors. + + The method starts by computing an NFVS of the network percolated to the + node's space, and then assigns each NFVS node a value towards which + it tends to. This transformation satisfies that the fixed-points in the + resulting network cover the attractors in the original sub-space. + + Subsequently, the method can further optimize this set of candidates. + + Greedy ASP minification + ----------------------- + Depending on which values are chosen for the NFVS nodes, a different set of + candidates is obtained. This optimization method flips the initial NFVS node + values and retains modifications that lead to a smaller candidate set, until + a local minimum is achieved. + + Simulation minification + ----------------------- + It is often possible to reduce the candidate set using a pseudo-random walk + which eventually leads to one of the child spaces, or to one of the existing + candidate states. + + Pint minification + ----------------- + The tool `pint` can perform (incomplete) reachability check for any network + state, hence we can also use it to reduce the list of candidate states. + This option is only available if pint is actually installed. + + Parameters + ---------- + sd : SuccessionDiagram + The succession diagram in question. + node_id : int + An ID of an SD node for which we are computing the candidates. + greedy_asp_minification: bool + Whether to enable the iterative ASP-based minification. + simulation_minification: bool + Whether to enable simulation minification. + pint_minification: bool + Whether to enable pint minification. + + Returns + ------- + list[BooleanSpace] + The list of attractor candidate states. + """ + + if sd.config["debug"]: + print(f"[{node_id}] Start computing attractor candidates.") + + node_data = sd.node_data(node_id) + + node_space = node_data["space"] + + if len(node_space) == sd.network.variable_count(): + if sd.config["debug"]: + print(f"[{node_id}] > Attractor candidates done: node is a fixed-point.") + return [node_space] + + node_nfvs = sd.node_percolated_nfvs(node_id, compute=True) + + if sd.config["debug"]: + root_nfvs = sd.node_percolated_nfvs(sd.root(), compute=True) + print( + f"[{node_id}] > Percolated node NFVS contains {len(node_nfvs)} nodes (vs {len(root_nfvs)} in the root)." + ) + + # Compute the list of child stable-motifs if the node is expanded. Otherwise + # "pretend" that there are no children. + # + # (We can safely ignore anything that is in these stable motifs, not just the + # percolated child spaces, because it is either in the child space, or it percolates + # to the child space, so it does not contain attractors) + child_motifs_reduced = [] + if node_data["expanded"]: + children = sd.node_successors(node_id, compute=False) + child_motifs_reduced = [ + sd.edge_stable_motif(node_id, s, reduced=True) for s in children + ] + + # Indicates that this space is either minimal, or has no computed successors. + # In either case, the space must contain at least one attractor. + node_is_pseudo_minimal = len(child_motifs_reduced) == 0 + + if sd.config["debug"] and node_is_pseudo_minimal: + print( + f"[{node_id}] > The node has no children (i.e. it is minimal or unexpanded)." + ) + + if len(node_nfvs) == 0: + # If the NFVS is empty, it means this node space has no complex attractors. + # In a fully expanded SD, all fixed-points should be handled by the condition above, + # so only non-minimal spaces remain and these can be resolved as empty. + # + # However, if SD is not fully expanded, it can contain further "pseudo-minimal" + # spaces that would contain minimal traps if expanded. For those, there should + # only be fixed-point attractors, but we need to identify + # them, so we need to continue. + + # This tests the assumption that all fully expanded minimal + # traps are handled before, and the only trap spaces with + # an empty NFVS are unexpanded pseudo-minimal, + # or expanded non-minimal. + assert not sd.node_is_minimal(node_id) + + if not node_is_pseudo_minimal: + if sd.config["debug"]: + print( + f"[{node_id}] > Attractor candidates done: empty NFVS in a non-minimal space." + ) + return [] + + # First, we create a retained set and corresponding candidate set that is valid in the + # percolated Boolean network. This process is *usually* quite fast, but + # sometimes the candidate set explodes and is very large (even too large to enumerate). + # In that case, we can try to simplify it greedily using the ASP solver (assuming this + # is enabled). + # + # Overall, our strategy is the following: + # 1. Compute a heuristic retained set based on the original NFVS method. + # 2. If the ASP optimization is disabled, we generate the candidates from this retained + # set in full. + # 3. If ASP optimization is enabled, we first test if there are at least 100 candidates + # for the initial retained set. + # 3a. If there are fewer than 100 candidates, we just run one optimization pass on + # the initial retained set and report the result. + # 3b. If there are more than 100 candidates, we probably need to optimize the retained + # set even further, in which case we start to recursively generate a new retained + # set that is optimized in each step to hopefully get a better result. + + pn_reduced = sd.node_percolated_petri_net(node_id, compute=True) + bn_reduced = sd.node_percolated_network(node_id, compute=True) + graph_reduced = AsynchronousGraph(bn_reduced) + + retained_set = make_heuristic_retained_set( + graph_reduced, node_nfvs, child_motifs_reduced + ) + + if len(retained_set) == sd.network.variable_count() and node_is_pseudo_minimal: + # If the retained set describes a fixed point, then only one attractor + # is present in this space and it must contain the state described by the retained set. + if sd.config["debug"]: + print( + f"[{node_id}] > Singular attractor found through fixed-point retained set. Done." + ) + return [retained_set | node_space] + + if not greedy_asp_minification: + candidate_states = compute_fixed_point_reduced_STG( + pn_reduced, + retained_set, + avoid_subspaces=child_motifs_reduced, + solution_limit=sd.config["attractor_candidates_limit"], + ) + if len(candidate_states) == sd.config["attractor_candidates_limit"]: + raise RuntimeError( + f"Exceeded the maximum amount of attractor candidates ({sd.config['attractor_candidates_limit']}; see `SuccessionDiagramConfiguation.attractor_candidates_limit`)." + ) + if sd.config["debug"]: + print( + f"[{node_id}] Computed {len(candidate_states)} candidate states without retained set optimization." + ) + else: + candidate_states = compute_fixed_point_reduced_STG( + pn_reduced, + retained_set, + avoid_subspaces=child_motifs_reduced, + solution_limit=sd.config["retained_set_optimization_threshold"], + ) + + if len(candidate_states) < sd.config["retained_set_optimization_threshold"]: + # The candidate set is small. No need to overthink it. The original + # retained set is probably quite good. However, we can still try to + # optimize it further if it makes sense. + if len(candidate_states) > 1 or ( + not node_is_pseudo_minimal and len(candidate_states) > 0 + ): + if sd.config["debug"]: + print( + f"[{node_id}] Initial retained set generated {len(candidate_states)} candidates. Optimizing..." + ) + optimized = asp_greedy_retained_set_optimization( + sd, + node_id, + petri_net=pn_reduced, + retained_set=retained_set, + candidate_states=candidate_states, + avoid_dnf=child_motifs_reduced, + ) + retained_set = optimized[0] + candidate_states = optimized[1] + else: + # There seem to be many candidates, in which case it might be better + # to optimize the retained set dynamically. + if sd.config["debug"]: + print( + f"[{node_id}] Initial retained set generated >{sd.config['retained_set_optimization_threshold']} candidates. Regenerate retained set." + ) + retained_set = {} + candidate_states = [] + for var in node_nfvs: + retained_set[var] = 0 + candidate_states_zero = compute_fixed_point_reduced_STG( + pn_reduced, + retained_set, + avoid_subspaces=child_motifs_reduced, + solution_limit=sd.config["attractor_candidates_limit"], + ) + + if len(candidate_states_zero) <= len(candidate_states): + if sd.config["debug"]: + print( + f"[{node_id}] Chosen {var}=0 without increasing candidate count ({len(candidate_states_zero)}). {len(retained_set)}/{len(node_nfvs)} variables chosen." + ) + candidate_states = candidate_states_zero + continue + + retained_set[var] = 1 + candidate_states_one = compute_fixed_point_reduced_STG( + pn_reduced, + retained_set, + avoid_subspaces=child_motifs_reduced, + solution_limit=len(candidate_states_zero), + ) + + if ( + len(candidate_states_zero) + == sd.config["attractor_candidates_limit"] + and len(candidate_states_one) + == sd.config["attractor_candidates_limit"] + ): + raise RuntimeError( + f"Exceeded the maximum amount of attractor candidates ({sd.config['attractor_candidates_limit']}; see `SuccessionDiagramConfiguation.attractor_candidates_limit`)." + ) + + if len(candidate_states_one) <= len(candidate_states): + if sd.config["debug"]: + print( + f"[{node_id}] Chosen {var}=1 without increasing candidate count ({len(candidate_states_one)}). {len(retained_set)}/{len(node_nfvs)} variables chosen." + ) + candidate_states = candidate_states_one + continue + + if len(candidate_states_zero) < len(candidate_states_one): + if sd.config["debug"]: + print( + f"[{node_id}] Chosen {var}=0 with better candidate count ({len(candidate_states_zero)}). {len(retained_set)}/{len(node_nfvs)} variables chosen." + ) + candidate_states = candidate_states_zero + retained_set[var] = 0 + else: + if sd.config["debug"]: + print( + f"[{node_id}] Chosen {var}=1 with better candidate count ({len(candidate_states_one)}). {len(retained_set)}/{len(node_nfvs)} variables chosen." + ) + candidate_states = candidate_states_one + retained_set[var] = 1 + + # At this point, we know the candidate count increased and so we should + # try to bring it back down. + if sd.config["debug"]: + print(f"[{node_id}] Optimizing partial retained set...") + optimized = asp_greedy_retained_set_optimization( + sd, + node_id, + petri_net=pn_reduced, + retained_set=retained_set, + candidate_states=candidate_states, + avoid_dnf=child_motifs_reduced, + ) + retained_set = optimized[0] + candidate_states = optimized[1] + + # Terminate if done. + if len(candidate_states) == 0: + if sd.config["debug"]: + print(f"[{node_id}] > Initial candidate set empty. Done.") + return [] + if node_is_pseudo_minimal and len(candidate_states) == 1: + if sd.config["debug"]: + print( + f"[{node_id}] > Single candidate found in (pseudo) minimal trap space. Done." + ) + return [candidate_states[0] | node_space] + + if sd.config["debug"]: + print( + f"[{node_id}] > Attractor candidates from retained set: {len(candidate_states)}." + ) + + if simulation_minification: + if sd.config["debug"]: + print(f"[{node_id}] Start simulation minification...") + avoid_children_symbolic = state_list_to_bdd( + graph_reduced.symbolic_context(), child_motifs_reduced + ) + + # Here, we gradually increase the iteration count while + # the candidate set is being actively reduced. If the simulation + # cannot reduce any further states, we are done. + iterations = 1024 + while len(candidate_states) > 0: + if sd.config["debug"]: + print( + f"[{node_id}] > Start simulation with {len(candidate_states)} states and simulation limit {iterations}." + ) + reduced = run_simulation_minification( + sd, + node_id, + graph_reduced, + candidate_states, + avoid_children_symbolic, + max_iterations=iterations, + simulation_seed=123, + ) + + if len(reduced) == len(candidate_states): + candidate_states = reduced + break + + iterations = 2 * iterations + candidate_states = reduced + + if len(candidate_states) == 1 and avoid_children_symbolic.is_false(): + break + + if sd.config["debug"]: + print(f"[{node_id}] > Candidates after simulation: {len(candidate_states)}") + + # Terminate if done. + if len(candidate_states) == 0: + if sd.config["debug"]: + print(f"[{node_id}] > Candidate set empty. Done.") + return [] + if node_is_pseudo_minimal and len(candidate_states) == 1: + if sd.config["debug"]: + print( + f"[{node_id}] > Single candidate found in (pseudo) minimal trap space. Done." + ) + return [candidate_states[0] | node_space] + + if pint_minification and not pint_available: + print("WARNING: Using `pint`, but `pint` is not installed. Skipping.") + elif pint_minification and pint_available: + if sd.config["debug"]: + print(f"[{node_id}] Start `pint` minification...") + + children_bdd = state_list_to_bdd( + graph_reduced.symbolic_context(), child_motifs_reduced + ) + candidates_bdd = state_list_to_bdd( + graph_reduced.symbolic_context(), candidate_states + ) + avoid_bdd = children_bdd.l_or(candidates_bdd) + + filtered_states: list[BooleanSpace] = [] + for i, state in enumerate(candidate_states): + state_bdd = state_to_bdd(graph_reduced.symbolic_context(), state) + + avoid_bdd = avoid_bdd.l_and_not(state_bdd) + + keep = True + try: + if biobalm._pint_reachability.pint_reachability( + pn_reduced, state, avoid_bdd, sd.config + ): + keep = False + except RuntimeError as e: + assert str(e) == "Cannot verify." + + if keep: + avoid_bdd = avoid_bdd.l_or(state_bdd) + filtered_states.append(state) + + if sd.config["debug"]: + print( + f"[{node_id}] > `pint` {i + 1}/{len(candidate_states)}: eliminated: {not keep}, retained: {len(filtered_states)}." + ) + + candidate_states = filtered_states + + if sd.config["debug"]: + print(f"[{node_id}] > Candidates after `pint`: {len(candidate_states)}") + + # Finally, we need to augment each candidate state with the + # fixed values from the node space, because until now, we only considered + # the reduced/percolated network. + candidate_states = [x | node_space for x in candidate_states] + + return candidate_states + + +def run_simulation_minification( + sd: SuccessionDiagram, + node_id: int, + graph: AsynchronousGraph, + candidate_states: list[BooleanSpace], + avoid_bdd: Bdd, + max_iterations: int, + simulation_seed: int, +) -> list[BooleanSpace]: + """ + A fast but incomplete method for eliminating spurious attractor candidates + based on stochastic simulation. + + Parameters + ---------- + node_id : int + The ID of the associated SD node. This is only for logging progress. + graph : biodivine_aeon.AsynchronousGraph + The symbolic encoding of the *percolated* network dynamics. + candidate_states: list[BooleanSpace] + The list of currently considered candidates that is to be reduced. + avoid_bdd: biodivine_aeon.Bdd + A symbolic representation of the states/spaces that are to be ignored. + If any of these states is reachable by a candidate, that candidate is + safe to ignore as well. + max_iterations: int + The number of steps performed by the simulator. + simulation_seed: int + The seed value for the random walk simulator. + + Returns + ------- + list[BooleanSpace] + The optimized list of candidate states. + """ + + # A random generator initialized with a fixed seed. Ensures simulation + # is randomized but deterministic. + generator = random.Random(simulation_seed) + + # Retrieve the symbolic variables that encode each network variable and + # associate them with the corresponding network's update functions. + symbolic_ctx = graph.symbolic_context() + network_vars = graph.network_variables() + symbolic_vars: list[BddVariable] = [] + for var in network_vars: + s_var = symbolic_ctx.find_network_bdd_variable(var) + assert s_var is not None + symbolic_vars.append(s_var) + update_functions = { + symbolic_vars[i]: graph.mk_update_function(network_vars[i]) + for i in range(len(network_vars)) + } + + # Use stochastic simulation to prune the set of candidate states. + # We use different simulation approach depending on whether this space + # is a minimal trap or not. + if not avoid_bdd.is_false(): + candidates_bdd = state_list_to_bdd(symbolic_ctx, candidate_states) + filtered_candidates: list[BooleanSpace] = [] + + for i, state in enumerate(candidate_states): + if i % 100 == 99 and sd.config["debug"]: + print( + f"[{node_id}] > Simulation progress: {i + 1}/{len(candidate_states)}" + ) + + # Remove the state from the candidates. If we can prove that is + # is not an attractor, we will put it back. + state_bdd = graph.mk_subspace(state).to_bdd() + candidates_bdd = candidates_bdd.l_and_not(state_bdd) + + # Convert the state BDD to a valuation object. This is like + # a dictionary, but updates much faster because it is managed + # by rust and indexed by symbolic variable IDs and not names. + # + # (The `state_bdd` should be a singleton, so it's ok to just + # take the first valuation) + simulation = state_bdd.valuation_first() + + is_valid_candidate = True + for _ in range(max_iterations): + # Advance all variables by one step in random order. + generator.shuffle(symbolic_vars) + for s_var in symbolic_vars: + # We can just "call" the BDD as a normal function with + # a valuation as the input. + step = update_functions[s_var](simulation) + simulation[s_var] = step + + # To check for set inclusion, we can also just "eval" the + # BDD, since it evaluates to true for every state that is + # in the encoded set. + + if candidates_bdd(simulation): + # The state can reach some other state in the candidate + # set. This does not mean it cannot be an attractor, but + # it means it is sufficient to keep considering + # the remaining candidates. + is_valid_candidate = False + break + + if avoid_bdd(simulation): + # The state can reach some other state in the avoid set, + # which means it cannot be an attractor in this subspace. + is_valid_candidate = False + break + + if is_valid_candidate: + # If we cannot rule out the candidate, we have to put it back + # into the candidate set. + + # However, we want to use the new state (after simulation), + # because this one is probably "closer" to the goals that we + # want to eventually get to. + + candidates_bdd = candidates_bdd.l_or(Bdd(simulation)) + filtered_candidates.append(valuation_to_state(symbolic_ctx, simulation)) + + return filtered_candidates + else: + # If the avoid set is empty, it means that this is a pseudo-minimal space + # and hence we (a) don't have to check if we reached the avoid set, + # and (b) we can stop with one candidate instead of zero. + + candidates_bdd = state_list_to_bdd(symbolic_ctx, candidate_states) + printed: set[int] = set() + for i in range(max_iterations): + progress = int((i * len(candidate_states)) / max_iterations) + if ( + progress % 100 == 99 + and sd.config["debug"] + and (progress not in printed) + ): + printed.add(progress) + print( + f"[{node_id}] > Simulation progress: {progress + 1}/{len(candidate_states)}" + ) + + generator.shuffle(symbolic_vars) + new_candidates_bdd = symbolic_ctx.mk_constant(False) + + # Goes through all the states in the candidates BDD, updates + # them, and puts them in the new BDD assuming they are not there yet. + for state_val in candidates_bdd.valuation_iterator(): + candidates_bdd = candidates_bdd.l_and_not(Bdd(state_val)) + + simulation = state_val + for s_var in symbolic_vars: + step = update_functions[s_var](simulation) + simulation[s_var] = step + + if candidates_bdd(simulation) or new_candidates_bdd(simulation): + # We have reached one of the other candidates. + continue + + new_candidates_bdd = new_candidates_bdd.l_or(Bdd(simulation)) + + candidates_bdd = new_candidates_bdd + + if candidates_bdd.cardinality() <= 1: + break + + # Finally, we convert the candidate states back into something that + # biobalm understands. + filtered_candidates = [] + for state_val in candidates_bdd.valuation_iterator(): + filtered_candidates.append(valuation_to_state(symbolic_ctx, state_val)) + + return filtered_candidates + + +def asp_greedy_retained_set_optimization( + sd: SuccessionDiagram, + node_id: int, + petri_net: DiGraph, + retained_set: BooleanSpace, + candidate_states: list[BooleanSpace], + avoid_dnf: list[BooleanSpace], +) -> tuple[BooleanSpace, list[BooleanSpace]]: + """ + Takes a Boolean network encoded as a Petri net and a candidate retained set. + Then, performs a greedy optimization process leading to a locally optimal + version of the original retained set that results in the least + amount of candidate states. + + + Parameters + ---------- + node_id : int + The ID of the associated SD node. This is only for logging progress. + petri_net : networkx.DiGraph + The Petri net encoding of the *percolated* network dynamics. + retained_set : BooleanSpace + The retained set that will be the initial point of the optimization. + candidate_states: list[BooleanSpace] + The list of candidates that are valid for the given `retained_set`, so + that they don't need to be recomputed. + avoid_dnf: list[BooleanSpace] + The list of subspaces in the given network in which candidate + states can be ignored. + + Returns + ------- + tuple[BooleanSpace, list[BooleanSpace]] + The optimized reatined set, together with the list of candidates that + are valid for this retained set. + """ + done = False + while not done: + done = True + for var in retained_set: + # Standrad termination checks. + if len(candidate_states) == 0: + return (retained_set, []) + if len(avoid_dnf) == 0 and len(candidate_states) == 1: + # If we are not avoiding anything, this is a (pseudo) + # minimal space and a single candidate is good enough. + return (retained_set, candidate_states) + + # Flip the value of the specified variable to see if the resulting + # candidate set is smaller. + retained_set_2 = retained_set.copy() + retained_set_2[var] = cast(Literal[0, 1], 1 - retained_set_2[var]) + candidate_states_2 = compute_fixed_point_reduced_STG( + petri_net, + retained_set_2, + avoid_subspaces=avoid_dnf, + # We don't need all solutions if the result isn't smaller. + solution_limit=len(candidate_states), + ) + if len(candidate_states_2) < len(candidate_states): + retained_set = retained_set_2 + candidate_states = candidate_states_2 + done = False + if sd.config["debug"]: + print( + f"[{node_id}] > Candidate states optimized to {len(candidate_states)}." + ) + return (retained_set, candidate_states) + + +def make_heuristic_retained_set( + graph: AsynchronousGraph, nfvs: list[str], avoid_dnf: list[BooleanSpace] +) -> BooleanSpace: + """ + Calculate the retained set for a given `space` based on heuristic criteria. + + The retained set is technically a space-like object that describes the + variables which have to be fixed in order for the network to lose all + complex attractors. However, note that this really means changing the update + functions of these variables. This is not a trap space that only contains + fixed-points, but a description of how the network must be modified to + eliminate all complex attractors. + + The construction guarantees that any complex attractor of the old + network will manifest as at least one fixed-point in the new network. But this + network modification is not implemented here. This method only generates + the necessary list of variables and values. + + Finally, note that the method uses a heuristic to select values + that should ideally lead to the least amount of fixed-points in + the modified network, but there can be other, more optimal retained sets. + + Parameters + ---------- + graph : AsynchronousGraph + The symbolic update functions stored as an `AsynchronousGraph` object + from the `biodivine_aeon` library. + nfvs : list[str] + The list of variables that represent an NFVS of the network encoded + by `graph`. + avoid_dnf : list[BooleanSpace] + A list of :class:`BooleanSpace` objects + describing the child spaces of the specified network. Only attractors that + are not in these child spaces are considered. If no child spaces are + provided, then all attractors are considered. + + Returns + ------- + BooleanSpace + A :class:`BooleanSpace` object describing the + retained set. + """ + retained_set: BooleanSpace = {} + + # First, if there are any child spaces present, we extend the retained set + # with the values from the one that has the least amount of fixed variables + # shared with the NFVS. + if len(avoid_dnf) > 0: + # Find the child space that has the fewest nodes in common with the NFVS: + least_common_child_space = avoid_dnf[0] + least_common_nodes = len(set(least_common_child_space) & set(nfvs)) + for child_space in avoid_dnf: + common_nodes = len(set(child_space) & set(nfvs)) + if common_nodes < least_common_nodes: + least_common_nodes = common_nodes + least_common_child_space = child_space + + for x in least_common_child_space: + if (x not in retained_set) and (x in nfvs): + retained_set[x] = least_common_child_space[x] + + # Then, set the remaining NFVS variables based on the majority output value + # in the update function of the relevant variable. + for x in nfvs: + if x in retained_set: + continue + + fn_bdd = graph.mk_update_function(x) + + # If most of the function is positive, we set the value + # to `1`, otherwise set it to `0`. + if fn_bdd.cardinality() > fn_bdd.l_not().cardinality(): + retained_set[x] = 1 + else: + retained_set[x] = 0 + + return retained_set diff --git a/biobalm/_sd_attractors/attractor_symbolic.py b/biobalm/_sd_attractors/attractor_symbolic.py new file mode 100644 index 00000000..12cd0bb3 --- /dev/null +++ b/biobalm/_sd_attractors/attractor_symbolic.py @@ -0,0 +1,242 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, cast, Literal + +if TYPE_CHECKING: + from biobalm.succession_diagram import SuccessionDiagram + from biodivine_aeon import VariableId + +from biodivine_aeon import AsynchronousGraph, ColoredVertexSet, VertexSet +from biobalm.symbolic_utils import state_list_to_bdd +from biobalm.types import BooleanSpace + + +def compute_attractors_symbolic( + sd: SuccessionDiagram, + node_id: int, + candidate_states: list[BooleanSpace], + seeds_only: bool = False, +) -> tuple[list[BooleanSpace], list[VertexSet] | None]: + """ + Uses exhaustive symbolic reachability to eliminate spurious candidate states + and to compute the exact attractor sets. + + Parameters + ---------- + sd : SuccessionDiagram + The succession diagram in question. + node_id : int + An ID of an SD node for which we are computing the candidates. + candidate_states: bool + The list of candidate states covering the attractors in the + given node. + seeds_only: bool + If `True`, the method can terminate early once it is guaranteed that + all seeds have been identified. In such case, the list of returned sets + is `None`. + + Returns + ------- + tuple[list[BooleanSpace], list[VertexSet] | None] + The list of attractor seed states and corresponding attractor sets (if computed). + """ + + node_data = sd.node_data(node_id) + node_space = node_data["space"] + bn_reduced = sd.node_percolated_network(node_id, compute=True) + graph_reduced = AsynchronousGraph(bn_reduced) + symbolic_ctx = graph_reduced.symbolic_context() + + # Ignore variables that are already fixed in the nodes space. + # These are not used in the reduced network. + candidate_states_reduced: list[BooleanSpace] = [] + for candidate in candidate_states: + candidate_reduced: BooleanSpace = { + k: v for (k, v) in candidate.items() if k not in node_space + } + candidate_states_reduced.append(candidate_reduced) + + child_motifs_reduced = [] + if node_data["expanded"]: + children = sd.node_successors(node_id, compute=False) + child_motifs_reduced = [ + sd.edge_stable_motif(node_id, s, reduced=True) for s in children + ] + + child_motifs_bdd = state_list_to_bdd(symbolic_ctx, child_motifs_reduced) + candidate_bdd = state_list_to_bdd(symbolic_ctx, candidate_states_reduced) + + children_set = ColoredVertexSet(symbolic_ctx, child_motifs_bdd) + candidate_set = ColoredVertexSet(symbolic_ctx, candidate_bdd) + + avoid = candidate_set.union(children_set) + + if sd.config["debug"]: + print( + f"[{node_id}] > Start symbolic seed state identification with {len(candidate_states)} candidates and {avoid} avoid states." + ) + + seeds: list[BooleanSpace] = [] + sets: list[ColoredVertexSet] = [] + for i, candidate in enumerate(candidate_states_reduced): + is_last = i == len(candidate_states_reduced) - 1 + is_minimal = len(child_motifs_reduced) == 0 + if seeds_only and is_minimal and is_last and len(seeds) == 0: + # This node is pseudo-minimal, so it must contain at least + # one attractor seed. If we only care about the attractor seeds, + # we can thus return the last candidate without checking it. + if sd.config["debug"]: + print( + f"[{node_id}] > Single seed remaining in a (pseduo) minimal space. Done." + ) + return ([candidate | node_space], None) + candidate_singleton = graph_reduced.mk_subspace(candidate) + + avoid = avoid.minus(candidate_singleton) + + closure = symbolic_attractor_test(sd, node_id, graph_reduced, candidate, avoid) + + if closure is None: + # This candidate can reach someone else in the candidate set, + # or they can reach one of the child spaces. Hence it is not + # an attractor. + continue + + # Otherwise, we have an attractor set and we can save it. + # It also becomes a part of the avoid set. + avoid = avoid.union(closure) + seeds.append(candidate | node_space) + sets.append(closure) + + if sd.config["debug"]: + print(f"[{node_id}] > Finished identification with {len(seeds)} seed states.") + + space_symbolic = sd.symbolic.mk_subspace(node_space).vertices() + sets_converted: list[VertexSet] = [] + for s in sets: + # Extend the attractor set with fixed nodes from the node space. + vertices = s.vertices() + vertices = sd.symbolic.transfer_from(vertices, graph_reduced) + vertices = vertices.intersect(space_symbolic) + sets_converted.append(vertices) + + return (seeds, sets_converted) + + +def symbolic_attractor_test( + sd: SuccessionDiagram, + node_id: int, + graph: AsynchronousGraph, + pivot: BooleanSpace, + avoid_set: ColoredVertexSet, +) -> ColoredVertexSet | None: + """ + Use symbolic reachability with saturation to compute the set of states + reachable from the given `pivot`, or `None` if the reachability procedure + intersects with any state in the `avoid` set. + + *The reason why we use `ColoredVertexSet` instead of `VertexSet` is mostly + a technicality that should be irelevant in biobalm, since we don't allow any + parameters outside of unknown inputs.* + """ + + if avoid_set.is_empty(): + avoid = None + else: + avoid = avoid_set + + reach_set = graph.mk_subspace(pivot) + + # The variables for which we already maintain that `reach_set` + # contains all reachable states. + saturated_vars: list[VariableId] = [] + + # Variables where `reach_set` differs from the states in `avoid`. + # We should prioritize updating these variables, because they *need* + # to be updated if we are ever to reach `avoid`. + conflict_vars: list[VariableId] = [] + + # Populate conflict vars, assuming we have any. + if avoid is not None: + avoid_bdd = avoid.to_bdd() + for name, val in pivot.items(): + var = graph.find_network_variable(name) + bdd_var = graph.symbolic_context().find_network_bdd_variable(name) + assert var is not None and bdd_var is not None + incompatible = avoid_bdd.r_select({bdd_var: cast(Literal[0, 1], 1 - val)}) + if not incompatible.is_false(): + conflict_vars.append(var) + conflict_vars = sort_variable_list(conflict_vars) + + # Remaining network variables that are still relevant, but may not + # be necessary to reach `avoid`. + other_vars: list[VariableId] = [ + x for x in graph.network_variables() if x not in conflict_vars + ] + other_vars = sort_variable_list(other_vars) + + if sd.config["debug"]: + print( + f"[{node_id}] > Start symbolic reachability with {len(conflict_vars)} conflict variables and {len(other_vars)} other variables." + ) + + all_done = False + while not all_done: + all_done = True + + # Saturate reach set with currently selected variables. + saturation_done = False + while not saturation_done: + if avoid is not None and not avoid.intersect(reach_set).is_empty(): + if sd.config["debug"]: + print(f"[{node_id}] > Discovered avoid state. Done.") + return None + + saturation_done = True + for var in saturated_vars: + successors = graph.var_post_out(var, reach_set) + if not successors.is_empty(): + reach_set = reach_set.union(successors) + saturation_done = False + if reach_set.symbolic_size() > 100_000 and sd.config["debug"]: + print( + f"[{node_id}] > Saturation({len(saturated_vars)}) Expanded reach_set: {reach_set}" + ) + break + + # Once saturated, try to expand the saturated + # collection with either a conflict variable or + # other variable. + + # First try conflict vars, then other vars. + for var in conflict_vars + other_vars: + successors = graph.var_post_out(var, reach_set) + if not successors.is_empty(): + reach_set = reach_set.union(successors) + all_done = False + + # This is a bit wasteful but at this point it + # should be irrelevant for performance. + if var in conflict_vars: + conflict_vars.remove(var) + + if var in other_vars: + other_vars.remove(var) + + saturated_vars.append(var) + saturated_vars = sort_variable_list(saturated_vars) + + if sd.config["debug"]: + print( + f"[{node_id}] > Saturation({len(saturated_vars)}) Added saturation variable. {len(conflict_vars)} conflict and {len(other_vars)} other variables remaining." + ) + break + + if sd.config["debug"]: + print(f"[{node_id}] > Reachability completed with {reach_set}.") + + return reach_set + + +def sort_variable_list(variables: list[VariableId]): + return list(sorted(variables, reverse=True)) diff --git a/biobalm/motif_avoidant.py b/biobalm/motif_avoidant.py deleted file mode 100644 index 7786d616..00000000 --- a/biobalm/motif_avoidant.py +++ /dev/null @@ -1,430 +0,0 @@ -""" -A module which provides utility methods for detecting motif-avoidant attractors -within a terminal restriction space. -""" - -from __future__ import annotations - -import random -from functools import reduce -from typing import TYPE_CHECKING - -from networkx import DiGraph # type: ignore -from pypint import Goal, InMemoryModel # type:ignore - -from biobalm.petri_net_translation import place_to_variable -from biobalm.space_utils import dnf_function_is_true, remove_state_from_dnf -from biobalm.symbolic_utils import ( - function_eval, - function_is_true, - state_list_to_bdd, - state_to_bdd, -) -from biobalm.types import BooleanSpace - -if TYPE_CHECKING: - from biodivine_aeon import AsynchronousGraph, Bdd - - -def make_retained_set( - graph: AsynchronousGraph, - nfvs: list[str], - space: BooleanSpace, - child_spaces: list[BooleanSpace] | None = None, -) -> BooleanSpace: - """ - Calculate the retained set for a given `space`. - - The retained set is technically a space-like object that describes the - variables which have to be fixed in order for the network to lose all - complex attractors. However, note that this really means changing the update - functions. This is not a trap space that only contains fixed-points, - but a description of how the network must be modified to eliminate - all complex attractors in the given `space`. - - The construction guarantees that any complex attractor of the old - network will manifest as at least one fixed-point in the new network. But this - network modification is not implemented here. This method only generates - the necessary list of variables and values. - - Finally, note that the method uses a heuristic to select values - that should lead to the least amount of fixed-points in - the modified network. - - Parameters - ---------- - graph : AsynchronousGraph - The symbolic update functions stored as an `AsynchronousGraph` object - from the `biodivine_aeon` library. - nfvs : list[str] - The list of variables in the NFVS that is valid for a network - restricted to the given `space`. - space : BooleanSpace - A :class:`BooleanSpace` object describing the - current trap space. - child_spaces : list[BooleanSpace] | None, optional - A list of :class:`BooleanSpace` objects - describing the child spaces of the current node. Only attractors that - are not in the child spaces are considered. If no child spaces are - provided, then all attractors are considered. - - Returns - ------- - BooleanSpace - A :class:`BooleanSpace` object describing the - retained set. - """ - - if child_spaces is None: - child_spaces = [] - - # Initially, the retained set only contains the fixed values from the - # current node space (this elimiantes unnecessary Petri net transitions for - # values which we already proved are constant). - # - # In the following code, we then extend the retained set based on the - # model's NFVS and the current child spaces. - retained_set = space.copy() - - # First, if there are any child spaces present, we extend the retained set - # with the values from the one that has the least amount of fixed variables - # shared with the NFVS. - if len(child_spaces) > 0: - # Find the child space that has the fewest nodes in common with the NFVS: - least_common_child_space = child_spaces[0] - least_common_nodes = len(set(least_common_child_space) & set(nfvs)) - for child_space in child_spaces: - common_nodes = len(set(child_space) & set(nfvs)) - if common_nodes < least_common_nodes: - least_common_nodes = common_nodes - least_common_child_space = child_space - - for x in least_common_child_space: - if (x not in retained_set) and (x in nfvs): - retained_set[x] = least_common_child_space[x] - - # Then, set the remaining NFVS variables based on the majority output value - # in the update function of the relevant variable. - for x in nfvs: - if x in retained_set: - continue - - fn_bdd = graph.mk_update_function(x) - - # If most of the function is positive, we set the value - # to `1`, otherwise set it to `0`. - if fn_bdd.cardinality() > fn_bdd.l_not().cardinality(): - retained_set[x] = 1 - else: - retained_set[x] = 0 - - return retained_set - - -def detect_motif_avoidant_attractors( - graph: AsynchronousGraph, - petri_net: DiGraph, - candidates: list[BooleanSpace], - terminal_restriction_space: Bdd, - max_iterations: int, - is_in_an_mts: bool = False, -) -> list[BooleanSpace]: - """ - Determine which seed states from the `candidates` list map to true - motif-avoidant attractors. - - Assumes that all attractors in the `terminal_restriction_space` are - covered by at least one state in `candidates`. Eliminates all states - from `candidates` that are provably not in an attractor, or that correspond - to an attractor that is already covered by some other candidate. - - The method consists of two steps: initial pruning, and exact reachability - analysis. As such, the result is always complete. However, some of the - parameters can be used to adjust the initial pruning step. - - If `is_in_an_mts` is set, the method assumes the `candidates` are members - of a minimal trap space. In such case, a different simulation algorithm - is used for initial pruning. Furthermore, `max_iterations` can be used to - to limit the time spent on the initial pruning step. - - Parameters - ---------- - graph : AsynchronousGraph - The symbolic update functions stored as an `AsynchronousGraph` object - from the `biodivine_aeon` library. - petri_net : DiGraph - The Petri net representation of the update functions. - candidates : list[BooleanSpace] - A list of :class:`BooleanSpace` objects - describing the candidate seed states. - terminal_restriction_space : Bdd - A symbolic set of states which contains all motif avoidant attractors - in question (i.e. if a candidate state can leave this set, the candidate - cannot be an attractor). - max_iterations : int - Specifies how much time should be spent on the simpler preprocessing methods. - is_in_an_mts : bool, optional - By default `False`. If `True`, then we assume that the candidates lie - within a minimal trap space, enabling certain optimizations. If - incorrectly set to `True`, the result is undefined. - - Returns - ------- - list[BooleanSpace] - A list of :class:`BooleanSpace` objects - describing the motif-avoidant attractors among the input candidate set. - """ - if len(candidates) == 0: - return [] - - if len(candidates) == 1 and is_in_an_mts: - return candidates - - candidates = _preprocess_candidates( - graph, - candidates, - terminal_restriction_space, - max_iterations, - is_in_an_mts=is_in_an_mts, - ) - - if len(candidates) == 0: - return [] - - if len(candidates) == 1 and is_in_an_mts: - return candidates - - return _filter_candidates(petri_net, candidates, terminal_restriction_space) - - -def _preprocess_candidates( - graph: AsynchronousGraph, - candidates: list[BooleanSpace], - terminal_restriction_space: Bdd, - max_iterations: int, - is_in_an_mts: bool = False, - simulation_seed: int = 0, -) -> list[BooleanSpace]: - """ - A fast but incomplete method for eliminating spurious attractor candidates. - - The idea is to build the symbolic encoding of the given `network`, and then - randomly simulate transitions for individual states, trying to reach a state - outside of the `terminal_restriction_space`. - - TODO (2): We could probably make this algorithm slighlty less random by doing - a limited version of symbolic reachability. I.e. instead of simulating just one - state transition in each step, compute the whole successor BDD and then test - against that. Once the BDD becomes too large after several steps, we can just - pick a single state from it and start again. Sam: I'll add a version of this - later, once we can actually benchmark how it performs :) - """ - # A random generator initialized with a fixed seed. Ensures simulation - # is randomized but deterministic. - generator = random.Random(simulation_seed) - - variables = graph.network_variable_names() - update_functions = {var: graph.mk_update_function(var) for var in variables} - - # Use stochastic simulation to prune the set of candidate states. - # We use different simulation approach depending on whether this space - # is a minimal trap or not. In previous work, this was shown to work - # well, but in the future we need to better document the resoning - # behind these two algorithms. - if not is_in_an_mts: - # Copy is sufficient because we won't be modifying the states within the set. - candidates_dnf = candidates.copy() - filtered_candidates: list[BooleanSpace] = [] - for state in candidates: - # Remove the state from the candidates. If we can prove that is - # is not an attractor, we will put it back. - candidates_dnf = remove_state_from_dnf(candidates_dnf, state) - - simulation = state.copy() # A copy of the state that we can overwrite. - is_valid_candidate = True - for _ in range(max_iterations): - # Advance all variables by one step in random order. - generator.shuffle(variables) - for var in variables: - step = function_eval(update_functions[var], simulation) - assert step is not None - simulation[var] = step - - if dnf_function_is_true(candidates_dnf, simulation): - # The state can reach some other state in the candidate - # set. This does not mean it cannot be an attractor, but - # it means it is sufficient to keep considering - # the remaining candidates. - is_valid_candidate = False - break - - if not function_is_true(terminal_restriction_space, simulation): - # The state can reach some other state outside of the - # terminal restriction space, which means it cannot be - # a motif avoidant attractor in this subspace. - is_valid_candidate = False - break - - if is_valid_candidate: - # If we cannot rule out the candidate, we have to put it back - # into the candidate set. - candidates_dnf.append(state) - filtered_candidates.append(state) - - return filtered_candidates - else: - filtered_candidates = [] - for _ in range(max_iterations): - generator.shuffle(variables) - candidates_dnf = candidates.copy() - filtered_candidates = [] - - for state in candidates: - candidates_dnf = remove_state_from_dnf(candidates_dnf, state) - - simulation = state.copy() - for var in variables: - step = function_eval(update_functions[var], simulation) - assert step is not None - simulation[var] = step - - if not dnf_function_is_true(candidates_dnf, simulation): - candidates_dnf.append(simulation) - filtered_candidates.append(simulation) - - if len(filtered_candidates) <= 1: - break - - candidates = filtered_candidates - - return filtered_candidates - - -def _filter_candidates( - petri_net: DiGraph, - candidates: list[BooleanSpace], - terminal_restriction_space: Bdd, -) -> list[BooleanSpace]: - """ - Filter candidate states using reachability procedure in Pint. - """ - - ctx = terminal_restriction_space.__ctx__() - candidates_bdd = state_list_to_bdd(ctx, candidates) - avoid_states = candidates_bdd.l_and_not(terminal_restriction_space) - filtered_candidates: list[BooleanSpace] = [] - - for state in candidates: - state_bdd = state_to_bdd(ctx, state) - - # Remove state from the symbolic set. If we can prove that it - # is not an attractor, we will put it back. - avoid_states = avoid_states.l_and_not(state_bdd) - - if not _Pint_reachability(petri_net, state, avoid_states): - avoid_states = avoid_states.l_or(state_bdd) - filtered_candidates.append(state) - - return filtered_candidates - - -def _Pint_reachability( - petri_net: DiGraph, - initial_state: BooleanSpace, - target_states: Bdd, -) -> bool: - """ - Use Pint to check if a given `initial_state` can possibly reach some state - in the `target_states` BDD. - - TODO: Here, if the result of static analysis is inconclusive, Pint falls back to `mole` - model checker. However, in the future, we might also explore other methods, such as - petri net reduction or symbolic reachability. - """ - if target_states.is_false(): - return False # Cannot reach a stat in an empty set. - - # Build a Pint model through an automata network and copy - # over the initial condition. - pint_model = InMemoryModel(_petri_net_as_automata_network(petri_net)) - for var, level in initial_state.items(): - pint_model.initial_state[var] = level - - goal = _Pint_build_symbolic_goal(target_states) - - return pint_model.reachability(goal=goal, fallback="mole") # type: ignore - - -def _Pint_build_symbolic_goal(states: Bdd) -> Goal: - """ - A helper method which (very explicitly) converts a set of states - represented through a BDD into a Pint `Goal`. - """ - assert not states.is_false() - - goals: list[Goal] = [] - for clause in states.clause_iterator(): - named_clause = { - states.__ctx__().get_variable_name(var): int(value) - for var, value in clause.items() - } - goal_atoms = [f"{var}={level}" for var, level in named_clause.items()] - goals.append(Goal(",".join(goal_atoms))) - - return reduce(lambda a, b: a | b, goals) - - -def _petri_net_as_automata_network(petri_net: DiGraph) -> str: - """ - Takes a Petri net which was created by implicant encoding from a Boolean network, - and builds an automata network file (`.an`) compatible with the Pint tool. - - TODO: This is one of those things that would probably be better served by having - an "explicit" `PetriNetEncoding` class. - """ - automata_network = "" - - # Go through all PN places and save them as model variables. - variable_set: set[str] = set() - for place, kind in petri_net.nodes(data="kind"): # type: ignore - if kind != "place": - continue - variable_set.add(place_to_variable(place)[0]) # type: ignore[reportUnknownArgumentType] # noqa - variables = sorted(variable_set) - - # Declare all variables with 0/1 domains. - for var in variables: - automata_network += f'"{var}" [0, 1]\n' - - for transition, kind in petri_net.nodes(data="kind"): # type: ignore - if kind != "transition": - continue - - predecessors = set(petri_net.predecessors(transition)) # type: ignore - successors = set(petri_net.successors(transition)) # type: ignore - - # The value under modification is the only - # value that is different between successors and predecessors. - source_place = next(iter(predecessors - successors)) # type: ignore[reportUnknownVariableType,reportUnknownArgumentType] # noqa - target_place = next(iter(successors - predecessors)) # type: ignore[reportUnknownVariableType,reportUnknownArgumentType] # noqa - - (s_var, s_level) = place_to_variable(source_place) # type: ignore[reportUnknownArgumentType] # noqa - (t_var, t_level) = place_to_variable(target_place) # type: ignore[reportUnknownArgumentType] # noqa - assert s_var == t_var - - # The remaining places represent the necessary conditions. - # Here, we transform them into a text format. - conditions = sorted(predecessors.intersection(successors)) # type: ignore[reportUnknownVariableType,reportUnknownArgumentType] # noqa - conditions = [place_to_variable(p) for p in conditions] # type: ignore[reportUnknownVariableType,reportUnknownArgumentType] # noqa - conditions = [f'"{var}"={int(level)}' for var, level in conditions] - - # A pint rule consists of a variable name, value transition, - # and a list of necessary conditions for the transition (if any). - if len(conditions) == 0: - rule = f'"{s_var}" {int(s_level)} -> {int(t_level)}\n' - else: - rule = f"\"{s_var}\" {int(s_level)} -> {int(t_level)} when {' and '.join(conditions)}\n" - - automata_network += rule - - return automata_network diff --git a/biobalm/petri_net_translation.py b/biobalm/petri_net_translation.py index cae55d26..0c7cda48 100644 --- a/biobalm/petri_net_translation.py +++ b/biobalm/petri_net_translation.py @@ -205,6 +205,9 @@ def restrict_petrinet_to_subspace( percolation. Variables that are fixed in the `sub_space` but do not exist in the Petri net are ignored. + The `sub_space` can contain variables that do not appear + in the `petri_net`. Such variables are simply ignored. + Parameters ---------- petri_net : DiGraph @@ -356,14 +359,14 @@ def network_to_petrinet( return pn -def _optimized_recursive_dnf_generator( +def optimized_recursive_dnf_generator( bdd: Bdd, ) -> Generator[BddPartialValuation, None, None]: """ Yields a generator of `BddPartialValuation` objects, similar to - `bdd.clause_iterator`, but uses a recursive optimization strategy to return - a smaller result than the default method `Bdd` clause sequence. Note that - this is still not the "optimal" DNF, but is often close enough. + `Bdd.clause_iterator` in AEON, but uses a recursive optimization strategy + to return a smaller result than the default method `Bdd` clause sequence. + Note that this is still not the "optimal" DNF, but is often close enough. This is technically slower for BDDs that already have a small clause count, but can be much better in the long-term when the clause count is high. @@ -395,11 +398,11 @@ def _optimized_recursive_dnf_generator( best_size = t_size + f_size best_var = var - for t_val in _optimized_recursive_dnf_generator(bdd.r_restrict({best_var: True})): + for t_val in optimized_recursive_dnf_generator(bdd.r_restrict({best_var: True})): t_val[best_var] = True yield t_val - for f_val in _optimized_recursive_dnf_generator(bdd.r_restrict({best_var: False})): + for f_val in optimized_recursive_dnf_generator(bdd.r_restrict({best_var: False})): f_val[best_var] = False yield f_val @@ -418,7 +421,7 @@ def _create_transitions( """ dir_str = "up" if go_up else "down" total = 0 - for t_id, implicant in enumerate(_optimized_recursive_dnf_generator(implicant_bdd)): + for t_id, implicant in enumerate(optimized_recursive_dnf_generator(implicant_bdd)): total += 1 t_name = f"tr_{var_name}_{dir_str}_{t_id + 1}" pn.add_node( # type: ignore[reportUnknownMemberType] diff --git a/biobalm/succession_diagram.py b/biobalm/succession_diagram.py index 24c4278c..974553cc 100644 --- a/biobalm/succession_diagram.py +++ b/biobalm/succession_diagram.py @@ -6,15 +6,20 @@ from typing import Iterator import networkx as nx # type: ignore -from biodivine_aeon import AsynchronousGraph, BooleanNetwork, VariableId +from biodivine_aeon import AsynchronousGraph, BooleanNetwork, VariableId, VertexSet -from biobalm._sd_algorithms.compute_attractor_seeds import compute_attractor_seeds +# Attractor detection algorithms. +from biobalm._sd_attractors.attractor_candidates import compute_attractor_candidates +from biobalm._sd_attractors.attractor_symbolic import compute_attractors_symbolic + +# SD expansion algorithms/heuristics. from biobalm._sd_algorithms.expand_attractor_seeds import expand_attractor_seeds from biobalm._sd_algorithms.expand_bfs import expand_bfs from biobalm._sd_algorithms.expand_dfs import expand_dfs from biobalm._sd_algorithms.expand_minimal_spaces import expand_minimal_spaces from biobalm._sd_algorithms.expand_source_SCCs import expand_source_SCCs from biobalm._sd_algorithms.expand_to_target import expand_to_target + from biobalm.interaction_graph_utils import ( cleanup_network, feedback_vertex_set, @@ -27,12 +32,12 @@ ) from biobalm.space_utils import percolate_network, percolate_space, space_unique_key from biobalm.trappist_core import trappist -from biobalm.types import BooleanSpace, NodeData, SuccessionDiagramState - -DEBUG: bool = False -""" -If `True`, print debug and progress messages. -""" +from biobalm.types import ( + BooleanSpace, + NodeData, + SuccessionDiagramState, + SuccessionDiagramConfiguration, +) class SuccessionDiagram: @@ -74,16 +79,6 @@ class SuccessionDiagram: """ - NFVS_NODE_THRESHOLD: int = 2_000 - """ - If the number of nodes in the (redunced) network is greater than this - threshold, we find a feedback vertex set (without consideration of cycle - sign) for reachability preprocessing. There is a trade-off between the speed - gains from a smaller node set to consider and the cost of determining which - FVS nodes only intersect negative cycles to find an NFVS subset. Typically, - for smaller networks, the trade-off is in favor of computing a smaller NFVS. - """ - __slots__ = ( "network", "symbolic", @@ -91,18 +86,27 @@ class SuccessionDiagram: "nfvs", "dag", "node_indices", + "config", ) - def __init__(self, network: BooleanNetwork): + def __init__( + self, + network: BooleanNetwork, + config: SuccessionDiagramConfiguration | None = None, + ): + if config is None: + config = SuccessionDiagram.default_config() + self.config = config + # Original Boolean network. self.network: BooleanNetwork = cleanup_network(network) """ - The Boolean network as an AEON network. + The Boolean network represented as a `biodivine_aeon.BooleanNetwork` object. """ self.symbolic: AsynchronousGraph = AsynchronousGraph(self.network) """ - The symbolic representation of the network. + The symbolic representation of the network using `biodivine_aeon.AsynchronousGraph`. """ self.petri_net: nx.DiGraph = network_to_petrinet(network) @@ -110,15 +114,14 @@ def __init__(self, network: BooleanNetwork): The Petri net representation of the network (see :mod:`petri_net_translation`). """ - if DEBUG: + if self.config["debug"]: print( f"Generated global Petri net with {len(self.petri_net.nodes)} nodes and {len(self.petri_net.edges)} edges." ) - # Initially, we don't need the NFVS for anything. self.nfvs: list[str] | None = None """ - The negative feedback vertex set used for attractor detection. + The negative feedback vertex set of `SuccessionDiagram.network`, or `None` if not computed yet. """ self.dag: nx.DiGraph = nx.DiGraph() @@ -126,8 +129,7 @@ def __init__(self, network: BooleanNetwork): The directed acyclic graph (DAG) representation of the succession diagram structure. """ - # A dictionary used for uniqueness checks on the nodes of the succession - # diagram. See `SuccessionDiagram.ensure_node` for details. + self.node_indices: dict[int, int] = {} """ A dictionary mapping subspace keys to their positions in the succession @@ -144,6 +146,7 @@ def __getstate__(self) -> SuccessionDiagramState: "nfvs": self.nfvs, "dag": self.dag, "node_indices": self.node_indices, + "config": self.config, } def __setstate__(self, state: SuccessionDiagramState): @@ -154,6 +157,7 @@ def __setstate__(self, state: SuccessionDiagramState): self.nfvs = state["nfvs"] self.dag = state["dag"] self.node_indices = state["node_indices"] + self.config = state["config"] def __len__(self) -> int: """ @@ -161,10 +165,22 @@ def __len__(self) -> int: """ return self.dag.number_of_nodes() + @staticmethod + def default_config() -> SuccessionDiagramConfiguration: + return { + "debug": False, + "max_motifs_per_node": 100_000, + "nfvs_size_threshold": 2_000, + "pint_goal_size_limit": 8_192, + "attractor_candidates_limit": 100_000, + "retained_set_optimization_threshold": 100, + } + @staticmethod def from_rules( rules: str, format: Literal["bnet", "aeon", "sbml"] = "bnet", + config: SuccessionDiagramConfiguration | None = None, ) -> SuccessionDiagram: """ Generate a succession diagram from the given string. @@ -176,6 +192,9 @@ def from_rules( format : Literal['bnet', 'aeon', 'sbml'] The format of the string. One of `"bnet"`, `"aeon"`, or `"sbml"`. Defaults to `"bnet"`. + config : SuccessionDiagramConfiguration | None + An optional configuration object with internal settings + and default values. Returns ------- @@ -184,31 +203,99 @@ def from_rules( """ if format == "bnet": - return SuccessionDiagram(BooleanNetwork.from_bnet(rules)) + return SuccessionDiagram(BooleanNetwork.from_bnet(rules), config) elif format == "aeon": - return SuccessionDiagram(BooleanNetwork.from_aeon(rules)) + return SuccessionDiagram(BooleanNetwork.from_aeon(rules), config) elif format == "sbml": - return SuccessionDiagram(BooleanNetwork.from_sbml(rules)) + return SuccessionDiagram(BooleanNetwork.from_sbml(rules), config) else: raise ValueError(f"Unknown format: {format}") @staticmethod - def from_file(path: str) -> SuccessionDiagram: + def from_file( + path: str, config: SuccessionDiagramConfiguration | None = None + ) -> SuccessionDiagram: """ Read a `BooleanNetwork` from the given file path. The format is automatically inferred from the file extension. + + Optionally, you can also supply a configuration object to customize the + resulting succession diagram. + """ + return SuccessionDiagram(BooleanNetwork.from_file(path), config) + + def expanded_attractor_candidates(self) -> dict[int, list[BooleanSpace]]: + """ + Attractor candidates for each expanded node. The candidate list is + computed for nodes where it is not known yet. + + Collectively, for every attractor in every expanded node, this returns + at least one candidate state for said attractor. However, for complex + attractors, there can be more than one candidate state, and there can + be candidates that are not contained in any attractor. + + *If called before the `SuccessionDiagram` is fully built, this will not + be a complete accounting of attractors, since any node that isn't expanded + is not included in the result.* + + See also: + - :meth:`expanded_attractor_seeds` + - :meth:`expanded_attractor_seeds` + - :meth:`expanded_attractor_seeds` + - :meth:`expanded_attractor_seeds` + - :meth:`expanded_attractor_seeds` + + Returns + ------- + dict[int,list[BooleanSpace]] + Each key is the id of an expanded succession diagram node, whose + corresponding value is a list of attractor candidates for that node. Note + that one succession diagram node can have multiple attractors. Ordering + of the lists in the returned dictionary is not guaranteed. + + Example + ------- + >>> import biobalm + >>> sd = biobalm.SuccessionDiagram.from_rules(\""" + ... A, B + ... B, A & C + ... C, !A | B + ... \""") + >>> sd.build() + >>> eas = sd.expanded_attractor_candidates() + >>> for id, atts in sorted(eas.items()): + ... for x in atts: + ... print(f"{id}: {dict(sorted(x.items()))}") + 1: {'A': 0, 'B': 0, 'C': 1} + 2: {'A': 1, 'B': 1, 'C': 1} """ - return SuccessionDiagram(BooleanNetwork.from_file(path)) + res: dict[int, list[BooleanSpace]] = {} + for id in self.expanded_ids(): + atts = self.node_attractor_candidates(id, compute=True) + if not atts: # no attractors for this node + continue + res[id] = atts + + return res def expanded_attractor_seeds(self) -> dict[int, list[BooleanSpace]]: """ - Attractor seeds for each expanded node. + Attractor seeds for each expanded node. The seed list is + computed for nodes where it is not known yet. - An attractor seed is a state belonging to an attractor (and thus a state - from which the entire attractor is reachable, by definition). + Collectively, for every attractor in every expanded node, this returns + exactly one seed state for said attractor. - If called before the `SuccessionDiagram` is fully built, this will not - be a complete accounting of attractor seed states. + *If called before the `SuccessionDiagram` is fully built, this will not + be a complete accounting of attractors, since any node that isn't expanded + is not included in the result.* + + See also: + - :meth:`expanded_attractor_seeds` + - :meth:`expanded_attractor_seeds` + - :meth:`expanded_attractor_seeds` + - :meth:`expanded_attractor_seeds` + - :meth:`expanded_attractor_seeds` Returns ------- @@ -220,6 +307,10 @@ def expanded_attractor_seeds(self) -> dict[int, list[BooleanSpace]]: Example ------- + + *Note that for this simple network, attractor candidates and attractor + seeds are the same states.* + >>> import biobalm >>> sd = biobalm.SuccessionDiagram.from_rules(\""" ... A, B @@ -236,7 +327,62 @@ def expanded_attractor_seeds(self) -> dict[int, list[BooleanSpace]]: """ res: dict[int, list[BooleanSpace]] = {} for id in self.expanded_ids(): - atts = self.node_attractor_seeds(id) + atts = self.node_attractor_seeds(id, compute=True) + if not atts: # no attractors for this node + continue + res[id] = atts + + return res + + def expanded_attractor_sets(self) -> dict[int, list[VertexSet]]: + """ + Attractor sets for each expanded node. The sets are + computed for nodes where they are not known yet. + + These sets represent the complete collection of attractor states in each + expanded node. For fixed-point attractors, this is effectively equivalent + to the attractor seeds. For complex attractors, this set contains the + full attractor. Hence, it is harder to compute but can facilitate richer + post-processing and analysis. + + *If called before the `SuccessionDiagram` is fully built, this will not + be a complete accounting of attractors, since any node that isn't expanded + is not included in the result.* + + See also: + - :meth:`expanded_attractor_seeds` + - :meth:`expanded_attractor_seeds` + - :meth:`expanded_attractor_seeds` + - :meth:`expanded_attractor_seeds` + - :meth:`expanded_attractor_seeds` + + Returns + ------- + dict[int,list[biodivine_aeon.VertexSet]] + Each key is the id of an expanded succession diagram node, whose + corresponding value is a list of attractor sets for that node. Note + that one succession diagram node can have multiple attractors. Ordering + of the lists in the returned dictionary is not guaranteed. + + Example + ------- + >>> import biobalm + >>> sd = biobalm.SuccessionDiagram.from_rules(\""" + ... A, B + ... B, A & C + ... C, !A | B + ... \""") + >>> sd.build() + >>> eas = sd.expanded_attractor_sets() + >>> for id, atts in sorted(eas.items()): + ... for x in atts: + ... print(f"{id}: {x}") + 1: VertexSet(cardinality=1, symbolic_size=5) + 2: VertexSet(cardinality=1, symbolic_size=5) + """ + res: dict[int, list[VertexSet]] = {} + for id in self.expanded_ids(): + atts = self.node_attractor_sets(id, compute=True) if not atts: # no attractors for this node continue res[id] = atts @@ -434,10 +580,19 @@ def node_data(self, node_id: int) -> NodeData: Returns a `NodeData` object with the following attributes: - `depth`: The depth of the node. - - `attractors`: The attractors of the node (if computed). - - `petri_net`: The Petri net representation of the node (if computed). - `space`: The sub-space of the node. - `expanded`: Whether the node is expanded or not. + - `percolated_network`: [`None` if not computed] The percolation of + `SuccessionDiagram.network` with respect to the node's `space`. + - `percolated_petri_net`: [`None` if not computed] The percolation of + `SuccessionDiagram.petri_net` with respect to the node's `space`. + - `percolated_nfvs`: [`None` if not computed] The NFVS of `percolated_network`. + - `attractor_candidates`: [`None` if not computed] A collection of states + that collectively cover every attractor of this node. + - `attractor_seeds`: [`None` if not computed] A collection of states + one-to-one corresponding to the attractors of this node. + - `attractor_sets`: [`None` if not computed] A complete collection of + attractors of this node. See :class:`biobalm.types.NodeData` for more information. @@ -454,6 +609,29 @@ def node_data(self, node_id: int) -> NodeData: """ return cast(NodeData, self.dag.nodes[node_id]) + def reclaim_node_data(self): + """ + Removes non-essential data from the `NodeData` dictionary of each node. + + This method can be used to reduce the memory footprint of the succession + diagram, especially before serialization. However, note that this can + also slow down subsequent computations if the erased data needs + to be re-computed. + + The method removes the `percolated_network`, `percolated_petri_net`, + and the `percolated_nfvs`. Furthermore, if `attractor_seeds` are known, + it erases the `attractor_candidates`, since seeds can be used for the + same tasks. + """ + + for node_id in self.node_ids(): + data = self.node_data(node_id) + data["percolated_network"] = None + data["percolated_petri_net"] = None + data["percolated_nfvs"] = None + if data["attractor_seeds"] is not None: + data["attractor_candidates"] = None + def node_is_minimal(self, node_id: int) -> bool: """ True if the node represents a minimal trap space. @@ -505,7 +683,7 @@ def node_successors(self, node_id: int, compute: bool = False) -> list[int]: list[int] The list of successor node ids. """ - node = cast(dict[str, Any], self.dag.nodes[node_id]) + node = self.node_data(node_id) if not node["expanded"] and not compute: raise KeyError(f"Node {node_id} is not expanded.") if not node["expanded"]: @@ -513,8 +691,92 @@ def node_successors(self, node_id: int, compute: bool = False) -> list[int]: return list(self.dag.successors(node_id)) # type: ignore + def node_attractor_candidates( + self, + node_id: int, + compute: bool = False, + greedy_asp_minification: bool = True, + simulation_minification: bool = True, + pint_minification: bool = False, + ) -> list[BooleanSpace]: + """ + Return the list of attractor candidate states for the given `node_id`. + + If attractor candidates are not computed but seeds are, returns + attractor seeds, as these are also valid as candidate states, but + even more precise. + + Similar to :meth:`node_successors`, the method either computes the + data if unknown, or throws an exception, depending on the `compute` + flag. If `compute` is set to `True`, additional flags can be used + to adjust the candidate identification process (see *Parameters*). + + Note that you can compute attractor candidates for nodes that are not expanded, + but (a) multiple unexpanded nodes can contain the same attractor, and hence also + the same/similar candidates (i.e. you can "discover" the same attractor in multiple + unexpanded nodes, if the nodes intersect), and (b) this data is erased if the + node is later expanded. + + Parameters + ---------- + node_id: int + The ID of the node. + compute: bool + Whether to compute the attractor candidates if they are not already known. + greedy_asp_minification: bool + Indicate that the initial candidate set should be first greedily minified + through repeated ASP queries. [Default: True] + simulation_minification: bool + Indicate that the candidate set should be refined through stochastic + simulation. [Default: True] + pint_minification: bool + Indicate that the candidate set should be refined through reachability + analysis using `pint`. Only enable this option if you actually have `pint` + installed (it is an optional dependency). [Default: False] + + Returns + ------- + list[BooleanSpace] + The list of attractor candidate states. + """ + node = self.node_data(node_id) + + candidates = node["attractor_candidates"] + + # If candidates are already cleared, but seeds are known, we can + # just use seeds. + if candidates is None and node["attractor_seeds"] is not None: + return node["attractor_seeds"] + + if candidates is None and not compute: + raise KeyError(f"Attractor candidates not computed for node {node_id}.") + + if candidates is None: + candidates = compute_attractor_candidates( + self, + node_id, + greedy_asp_minification, + simulation_minification, + pint_minification, + ) + node["attractor_candidates"] = candidates + + # If the computed candidates are actually valid as seeds, just + # propagate the value so that it doesn't need to be computed later. + node_is_pseudo_minimal = (not node["expanded"]) or self.node_is_minimal( + node_id + ) + if len(candidates) == 0 or ( + node_is_pseudo_minimal and len(candidates) == 1 + ): + node["attractor_seeds"] = candidates + + return candidates + def node_attractor_seeds( - self, node_id: int, compute: bool = False + self, + node_id: int, + compute: bool = False, ) -> list[BooleanSpace]: """ Return the list of attractor seed states for the given `node_id`. @@ -523,10 +785,8 @@ def node_attractor_seeds( data if unknown, or throws an exception, depending on the `compute` flag. - Note that you can compute attractor seeds for stub nodes, but (a) these - attractors are not guaranteed to be unique (i.e. you can "discover" the - same attractor in multiple stub nodes, if the stub nodes intersect), and - (b) this data is erased if the stub node is expanded later on. + Note that the same considerations regarding attractors in unexpanded + nodes apply as for :meth:`node_attractor_candidates`. Parameters ---------- @@ -540,29 +800,111 @@ def node_attractor_seeds( list[BooleanSpace] The list of attractor seed states. """ - node = cast(NodeData, self.dag.nodes[node_id]) + node = self.node_data(node_id) + + seeds = node["attractor_seeds"] + + if seeds is None and not compute: + raise KeyError(f"Attractor seeds not computed for node {node_id}.") + + if seeds is None: + candidates = self.node_attractor_candidates(node_id, compute=True) + # Typically, this should be done when computing the candidates, but just in case + # something illegal happended... if we can show that the current candidate set + # is optimal, we just keep it and don't compute the attractors symbolically. + node_is_pseudo_minimal = (not node["expanded"]) or self.node_is_minimal( + node_id + ) + if len(candidates) == 0 or ( + node_is_pseudo_minimal and len(candidates) == 1 + ): + node["attractor_seeds"] = candidates + seeds = candidates + else: + result = compute_attractors_symbolic( + self, node_id, candidate_states=candidates, seeds_only=True + ) + node["attractor_seeds"] = result[0] + # At this point, attractor_sets could be `None`, but that + # is valid, as long as we actually compute them later when + # they are needed. + node["attractor_sets"] = result[1] + seeds = result[0] + + # Release memory once attractor seeds are known. We might need these + # for attractor set computation later, but only if the seeds are not empty. + if len(seeds) == 0: + node["percolated_network"] = None + node["percolated_nfvs"] = None + node["percolated_petri_net"] = None + + return seeds + + def node_attractor_sets( + self, + node_id: int, + compute: bool = False, + ) -> list[VertexSet]: + """ + Return the list of attractor sets for the given `node_id`. + + Similar to :meth:`node_successors`, the method either computes the + data if unknown, or throws an exception, depending on the `compute` + flag. + + Note that the same considerations regarding attractors in unexpanded + nodes apply as for :meth:`node_attractor_candidates`. + + Parameters + ---------- + node_id: int + The ID of the node. + compute: bool + Whether to compute the attractor sets if they are not already known. + + Returns + ------- + list[biodivine_aeon.VertexSet] + The list of attractor sets. + """ + node = self.node_data(node_id) - attractors = node["attractors"] + sets = node["attractor_sets"] - if attractors is None and not compute: - raise KeyError(f"Attractor data not computed for node {node_id}.") + if sets is None and not compute: + raise KeyError(f"Attractor sets not computed for node {node_id}.") - if attractors is None: - attractors = compute_attractor_seeds(self, node_id) - node["attractors"] = attractors + if sets is None: + seeds = self.node_attractor_seeds(node_id, compute=True) + result: tuple[list[BooleanSpace], list[VertexSet] | None] = ([], []) + if len(seeds) > 0: + result = compute_attractors_symbolic( + self, node_id, candidate_states=seeds + ) + assert result[1] is not None + node["attractor_sets"] = result[1] + sets = result[1] - return attractors + return sets - def node_nfvs(self, node_id: int) -> list[str]: + def node_percolated_nfvs(self, node_id: int, compute: bool = False) -> list[str]: """ - Approximate minimum negative feedback vertex set on the subspace for the given node. + Approximate minimum negative feedback vertex set on the Boolean network + percolated to the node's sub-space. - See :func:`biobalm.interaction_graph_utils.feedback_vertex_set` for further details. + Similar to :meth:`node_successors`, the method either computes the + data if unknown, or throws an exception, depending on the `compute` + flag. + + See :func:`biobalm.interaction_graph_utils.feedback_vertex_set` for + further details. Parameters ---------- node_id: int The ID of the node. + compute: bool + Whether to compute the node NFVS if it is not already known. Returns ------- @@ -570,20 +912,143 @@ def node_nfvs(self, node_id: int) -> list[str]: The negative feedback vertex set, as a list of node names. """ - # TODO: Right now, we are just returning the gloval FVS. But in the future, - # we probably want to compute a separate FVS for each node, because it could - # be much smaller if a lot of the nodes are fixed. assert node_id in self.dag.nodes - if self.nfvs is None: - if self.network.variable_count() < SuccessionDiagram.NFVS_NODE_THRESHOLD: + node = self.node_data(node_id) + + if node["percolated_nfvs"] is None and not compute: + raise KeyError(f"NFVS not computed for node {node_id}.") + + if node["percolated_nfvs"] is None: + percolated_network = self.node_percolated_network(node_id, compute) + percolated_size = percolated_network.variable_count() + if percolated_size < self.config["nfvs_size_threshold"]: # Computing the *negative* variant of the FVS is surprisingly costly. # Hence it mostly makes sense for the smaller networks only. - self.nfvs = feedback_vertex_set(self.network, parity="negative") + nfvs = feedback_vertex_set(percolated_network, parity="negative") else: - self.nfvs = feedback_vertex_set(self.network) + nfvs = feedback_vertex_set(percolated_network) + node["percolated_nfvs"] = nfvs + else: + nfvs = node["percolated_nfvs"] + + return nfvs + + def node_percolated_network( + self, node_id: int, compute: bool = False + ) -> BooleanNetwork: + """ + The Boolean network percolated to the node's sub-space, with + constant variables removed. + + Similar to :meth:`node_successors`, the method either computes the + data if unknown, or throws an exception, depending on the `compute` + flag. + + Parameters + ---------- + node_id: int + The ID of the node. + compute: bool + Whether to compute the node NFVS if it is not already known. + + Returns + ------- + biodivine_aeon.BooleanNetwork + The percolated Boolean network. + """ + + assert node_id in self.dag.nodes + + node = self.node_data(node_id) + network = node["percolated_network"] - return self.nfvs + node_space = node["space"] + + if len(node_space) == self.network.variable_count(): + # If fixed point, no need to compute, the network is always empty. + return BooleanNetwork() + + if network is None and not compute: + raise KeyError(f"Percolated network not computed for node {node_id}.") + + if network is None: + network = percolate_network( + self.network, node_space, self.symbolic, remove_constants=True + ) + if self.config["debug"]: + print( + f"[{node_id}] Computed percolated network with {network.variable_count()} variables (vs {self.network.variable_count()})." + ) + node["percolated_network"] = network + + return network + + def node_percolated_petri_net( + self, + node_id: int, + compute: bool = False, + parent_id: int | None = None, + ) -> nx.DiGraph: + """ + The Petri net representation of the Boolean network percolated to the + node's sub-space (with constant variables removed). + + Similar to :meth:`node_successors`, the method either computes the + data if unknown, or throws an exception, depending on the `compute` + flag. + + Parameters + ---------- + node_id: int + The ID of the node. + compute: bool + Whether to compute the node NFVS if it is not already known. + parent_id: int | None + If given, the percolation process starts with the Petri net of the given + parent node (if computed). If parent is not given, the percolation starts + with `SuccessionDiagram.petri_net`, which can be slower but yields the + same result. + + Returns + ------- + networkx.DiGraph + The percolated Boolean network. + """ + + assert node_id in self.dag.nodes + + node = self.node_data(node_id) + percolated_pn = node["percolated_petri_net"] + + node_space = node["space"] + + if len(node_space) == self.network.variable_count(): + # If fixed point, the result is always empty. + return nx.DiGraph() + + if percolated_pn is None and not compute: + raise KeyError(f"Percolated network not computed for node {node_id}.") + + if percolated_pn is None: + base_pn = self.petri_net + percolate_space = node_space + if parent_id is not None: + parent_pn = self.node_data(parent_id)["percolated_petri_net"] + if parent_pn is not None: + base_pn = parent_pn + percolate_space = node_space + + percolated_pn = restrict_petrinet_to_subspace(base_pn, percolate_space) + + if self.config["debug"]: + print( + f"[{node_id}] Generated Petri net restriction with {len(percolated_pn.nodes)} nodes and {len(percolated_pn.edges)} edges." + ) + + node["percolated_petri_net"] = percolated_pn + + return percolated_pn def edge_stable_motif( self, parent_id: int, child_id: int, reduced: bool = False @@ -826,52 +1291,6 @@ def _update_node_depth(self, node_id: int, parent_id: int): current_depth = cast(int, self.dag.nodes[node_id]["depth"]) self.dag.nodes[node_id]["depth"] = max(current_depth, parent_depth + 1) - def _update_node_petri_net(self, node_id: int, parent_id: int | None): - """ - Try to compute a restricted Petri net for this node using the Petri net - stored in the node's parent. - - If the parent Petri net does not exist or the parent is not specified, the - restriction is performed on the "global" Petri net object, which is usually - slower but should yield the same result. - - No Petri net is computed if the specified node is a fixed-point, since - such Petri net is always empty. - """ - - node_space = self.node_data(node_id)["space"] - - if len(node_space) == self.network.variable_count(): - # If fixed point, no need to compute. - return - - if parent_id is not None: - parent_pn = self.node_data(parent_id)["petri_net"] - if parent_pn is None: - pn = self.petri_net - else: - pn = parent_pn - else: - pn = self.petri_net - - restricted_pn = restrict_petrinet_to_subspace(pn, node_space) - - if DEBUG: - print( - f"[{node_id}] Generated Petri net restriction with {len(restricted_pn.nodes)} nodes and {len(restricted_pn.edges)} edges." - ) - - self.dag.nodes[node_id]["petri_net"] = restricted_pn - - def _clear_node_petri_net(self, node_id: int): - """ - Remove the computed Petri net for this node. - - This is typically done once the node is expanded, since we know the Petri net won't - be needed anymore. - """ - del self.dag.nodes[node_id]["petri_net"] - def _expand_one_node(self, node_id: int): """ An internal method that expands a single node of the succession diagram. @@ -888,11 +1307,15 @@ def _expand_one_node(self, node_id: int): if node["expanded"]: return - node["expanded"] = True - node["attractors"] = None + # If the node had any attractor data computed as unexpanded, these are + # no longer valid and need to be erased. + node["attractor_seeds"] = None + node["attractor_candidates"] = None + node["attractor_sets"] = None + current_space = node["space"] - if DEBUG: + if self.config["debug"]: print( f"[{node_id}] Expanding: {len(self.node_data(node_id)['space'])} fixed vars." ) @@ -900,8 +1323,9 @@ def _expand_one_node(self, node_id: int): if len(current_space) == self.network.variable_count(): # This node is a fixed-point. Trappist would just # return this fixed-point again. No need to continue. - if DEBUG: + if self.config["debug"]: print(f"[{node_id}] Found fixed-point: {current_space}.") + node["expanded"] = True return # We use the non-propagated Petri net for backwards-compatibility reasons here. @@ -912,12 +1336,17 @@ def _expand_one_node(self, node_id: int): source_nodes = extract_source_variables(self.petri_net) sub_spaces: list[BooleanSpace] - pn = self.node_data(node_id)["petri_net"] + + # Only use the percolated PN if it is already known. + pn = node["percolated_petri_net"] if pn is not None: # We have a pre-propagated PN for this sub-space, hence we can use # that to compute the trap spaces. partial_sub_spaces = trappist( - pn, problem="max", optimize_source_variables=source_nodes + pn, + problem="max", + optimize_source_variables=source_nodes, + solution_limit=self.config["max_motifs_per_node"], ) sub_spaces = [(s | current_space) for s in partial_sub_spaces] else: @@ -928,6 +1357,18 @@ def _expand_one_node(self, node_id: int): problem="max", ensure_subspace=current_space, optimize_source_variables=source_nodes, + solution_limit=self.config["max_motifs_per_node"], + ) + + # Release the Petri net once the sub_spaces are computed. + # It might be needed later for attractor computation, but it + # uses a lot of memory in large diagrams to keep all the nets + # in memory. + node["percolated_petri_net"] = None + + if len(sub_spaces) == self.config["max_motifs_per_node"]: + raise RuntimeError( + f"Exceeded the maximum amount of stable motifs per node ({self.config['max_motifs_per_node']}; see `SuccessionDiagramConfiguration.max_motifs_per_node`)." ) # Sort the spaces based on a unique key in case trappist is not always @@ -937,21 +1378,22 @@ def _expand_one_node(self, node_id: int): ) if len(sub_spaces) == 0: - if DEBUG: + if self.config["debug"]: print(f"[{node_id}] Found minimum trap space: {current_space}.") + node["expanded"] = True return - if DEBUG: + if self.config["debug"]: print(f"[{node_id}] Found sub-spaces: {len(sub_spaces)}") for sub_space in sub_spaces: child_id = self._ensure_node(node_id, sub_space) - if DEBUG: + if self.config["debug"]: print(f"[{node_id}] Created edge into node {child_id}.") - # The node is now fully expanded, hence we won't need the restricted PN anymore. - self._clear_node_petri_net(node_id) + # If everything else worked out, we can mark the node as expanded. + node["expanded"] = True def _ensure_node(self, parent_id: int | None, stable_motif: BooleanSpace) -> int: """ @@ -980,7 +1422,12 @@ def _ensure_node(self, parent_id: int | None, stable_motif: BooleanSpace) -> int space=fixed_vars, depth=0, expanded=False, - attractors=None, + percolated_network=None, + percolated_petri_net=None, + percolated_nfvs=None, + attractor_candidates=None, + attractor_seeds=None, + attractor_sets=None, ) self.node_indices[key] = child_id else: @@ -995,6 +1442,11 @@ def _ensure_node(self, parent_id: int | None, stable_motif: BooleanSpace) -> int self.dag.add_edge(parent_id, child_id, motif=stable_motif) # type: ignore self._update_node_depth(child_id, parent_id) - self._update_node_petri_net(child_id, parent_id) + # Compute the percolated petri net here, because we know the parent node ID + # and using its already percolated petri net helps a lot. + # + # (For percolated network and nfvs, knowing the parent ID is not as useful, + # so the parent is not used and we can just compute them lazily) + self.node_percolated_petri_net(child_id, compute=True, parent_id=parent_id) return child_id diff --git a/biobalm/symbolic_utils.py b/biobalm/symbolic_utils.py index 57174694..0ae18523 100644 --- a/biobalm/symbolic_utils.py +++ b/biobalm/symbolic_utils.py @@ -4,14 +4,12 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Literal, cast if TYPE_CHECKING: - from typing import Literal - from biodivine_aeon import Bdd, SymbolicContext -from biodivine_aeon import BddVariableSet +from biodivine_aeon import BddVariableSet, BddValuation from biobalm.types import BooleanSpace @@ -128,3 +126,29 @@ def function_is_true(f: Bdd, state: BooleanSpace) -> bool: `True` if the function evaluates to `1` in the given state. """ return function_eval(f, state) == 1 + + +def valuation_to_state(ctx: SymbolicContext, valuation: BddValuation) -> BooleanSpace: + """ + Extract network state from a `BddValuation` into a `BooleanSpace`. + + Parameters + ---------- + f : SymbolicContext + A context which maps between network variables and their symbolic counterparts. + state : BddValuation + A valuation of the network's symbolic encoding. + + Returns + ------- + BooleanSpace + A `BooleanSpace` encoding the state data from the given valuation. + """ + + result: BooleanSpace = {} + for var, val in valuation.items(): + n_var = ctx.find_network_variable(var) + if n_var is not None: + result[ctx.get_network_variable_name(n_var)] = cast(Literal[0, 1], int(val)) + + return result diff --git a/biobalm/types.py b/biobalm/types.py index d4eb95e7..01eb09cd 100644 --- a/biobalm/types.py +++ b/biobalm/types.py @@ -3,6 +3,7 @@ from typing import Literal, TypeAlias, TypedDict import networkx as nx # type: ignore +import biodivine_aeon as ba BooleanSpace: TypeAlias = dict[str, Literal[0, 1]] """Type alias for `dict[str, Literal[0, 1]]`. Represents a Boolean subspace, which is defined by a set of fixed node values.""" @@ -44,6 +45,11 @@ class SuccessionDiagramState(TypedDict): diagram (see :func:`biobalm.space_utils.space_unique_key`). """ + config: SuccessionDiagramConfiguration + """ + "Global" configuration of a succession diagram. + """ + class NodeData(TypedDict): """ @@ -61,17 +67,6 @@ class NodeData(TypedDict): the longest is used. """ - attractors: list[BooleanSpace] | None - """ - Attractor seed states for the node. If `None`, these have not been - computed, and the node may or may not have associated attractors. - """ - - petri_net: nx.DiGraph | None - """ - The Petri net representation of the node. If `None`, this has not been - computed yet. - """ space: BooleanSpace """ The sub-space that the node represents (this subspace will always be a @@ -87,3 +82,149 @@ class NodeData(TypedDict): If `expanded=True`, the node must have all its successors computed and included in the succession diagram. """ + + percolated_network: ba.BooleanNetwork | None + """ + The AEON `BooleanNetwork` that has variables fixed and percolated + according to the node `space`. Constant variables are then fully + eliminated. + + If `None`, this has not been computed yet (`SuccessionDiagram.network` can be + often used instead). + """ + + percolated_petri_net: nx.DiGraph | None + """ + The Petri net representation of the network rules percolated to the + node's sub-space (i.e. a Petri net encoding of the `percolated_network`). + Constant variables are fully eliminated. + + If `None`, this has not been computed yet (`SuccessionDiagram.petri_net` can be + often used instead). + """ + + percolated_nfvs: list[str] | None + """ + An NFVS of the `percolated_network`. + + Note that this contains no variables that are fixed by the node `space`. + Also, there is no guarantee that this is a subset of `SuccessionDiagram.nfvs`. + + If `None`, this has not been computed yet (`SuccessionDiagram.nfvs` can be + often used instead). + """ + + attractor_candidates: list[BooleanSpace] | None + """ + List of states that cover all network attractors in the node's sub-space + (excluding child nodes, if known). + + Note that each complex/cyclic attractor can be covered by more than + one state, but each attractor has to be covered by at least one state. Furthermore, + outside of minimal trap spaces, some candidate states can cover zero attractors. + That is, even if the candidate set is not empty, it does not guarantee that + the non-minimal sub-space contains attractors. + + If `None`, these have not been computed, and the node may or may not + have associated attractors. + """ + + attractor_seeds: list[BooleanSpace] | None + """ + List of states that one-to-one correspond to network attractors in the node's + sub-space (excluding child nodes, if known). + + This is very similar to `attractor_candidates`, but here, it is guaranteed + that each attractor is represented by exactly one state. + + If `None`, these have not been computed, and the node may or may not + have associated attractors. + """ + + attractor_sets: list[ba.VertexSet] | None + """ + List of attractors that are present in the node's sub-space (excluding + child nodes, if known). + + Each attractor is represented symbolically using `biodivine_aeon.VertexSet`. + + Note that for fixed-point attractors, the attractor set is effectively + equivalent to the attractor seed. However, for complex attractors, this + set containes *all* the attractor states as opposed to just one. Hence it + is harder to compute, but can be used to analyze things like the average + value of each variable in the attractor states, or the presence of + particular oscillation patterns. + + If `None`, these have not been computed, and the node may or may not + have associated attractors. + """ + + +class SuccessionDiagramConfiguration(TypedDict): + """ + Describes the configuration options of a `SuccessionDiagram`. + + Use :meth:`SuccessionDiagram.default_config` to create a + configuration dictionary pre-populated with default values. + """ + + debug: bool + """ + If `True`, the `SuccessionDiagram` will print messages + describing the progress of the running operations. + + [Default: False] + """ + + max_motifs_per_node: int + """ + Limit on the number of stable motifs explored for one node of a succession + diagram. If this limit is exceeded during node expansion, a `RuntimeError` + is raised and the node remains unexpanded. + + This limit is in place mainly to avoid surprising out of memory errors, + because currently there is no logging mechanism that would report the + number of stable motifs gradually. + + [Default: 100_000] + """ + + nfvs_size_threshold: int + """ + For networks larger than this threshold, we only run FVS detection + instead of NFVS detection. This is still correct, but can produce + a larger node set. + + + There is a trade-off between the speed gains from a smaller node set + to consider and the cost of determining which FVS nodes only + intersect negative cycles to find an NFVS subset. Typically, + for smaller networks, the trade-off is in favor of + computing a smaller NFVS. + """ + + pint_goal_size_limit: int + """ + Pint is called using command line and can only accept a limited number of arguments. + This limit is applied when constructing goals to avoid reaching this argument limit. + + The limit currently applies to the total number of literals that can be used to + represent a goal. + + The default value was empirically tested as safe on Debian linux, but other operating + systems may need a different limit to stay safe. Nevertheless, this should not be + an issue on smaller/simpler networks. + """ + + attractor_candidates_limit: int + """ + If more than `attractor_candidates_limit` states are produced during the + attractor detection process, then the process fails with a `RuntimeError`. + This is mainly to avoid out-of-memory errors or crashing `clingo`. + """ + + retained_set_optimization_threshold: int + """ + If there are more than this amount of attractor candidates, the attractor + detection process will try to optimize the retained set using ASP (if enabled). + """ diff --git a/pyproject.toml b/pyproject.toml index a2938825..71aed3cb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,17 +6,20 @@ readme = "README.md" license = { file = "LISENSE" } requires-python = ">=3.11" dependencies = [ - 'biodivine_aeon ==1.0.0a4', + 'biodivine_aeon ==1.0.0a6', 'clingo >=5.6.2', - 'networkx>=2.8.8', - 'pypint>=1.6.2', + 'networkx >=2.8.8', ] -version = "0.1.1" +version = "0.2.0" +# Pint can be used for static analysis, but is not mandatory. +[project.optional-dependencies] +pint = [ + "pypint==1.6.2" +] [tool.setuptools] -packages = ['biobalm', 'biobalm._sd_algorithms'] - +packages = ['biobalm', 'biobalm._sd_algorithms', 'biobalm._sd_attractors'] [tool.pyright] include = ["biobalm", "tests"] diff --git a/requirements.txt b/requirements.txt index 1bf8f37b..2730581d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,4 @@ -biodivine_aeon==1.0.0a4 +biodivine_aeon==1.0.0a6 clingo==5.6.2 networkx==2.8.8 -pypint==1.6.2 -# This will be eventually required by pandas (which is required by pint), -# but for now it prevents pandas from printing warnings all over the place. -pyarrow==15.0.0 \ No newline at end of file +pypint[pint]==1.6.2 \ No newline at end of file diff --git a/tests/motif_avoidant_test.py b/tests/motif_avoidant_test.py.old similarity index 100% rename from tests/motif_avoidant_test.py rename to tests/motif_avoidant_test.py.old