Source code for boolforge.boolean_network

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
r"""
This module defines the :class:`~boolforge.BooleanNetwork` class, which provides
a high-level framework for modeling, simulating, and analyzing Boolean networks.

A :class:`BooleanNetwork` represents a discrete dynamical system
:math:`F = (f_1, \ldots, f_N)` composed of multiple
:class:`~boolforge.BooleanFunction` objects as update rules. The class includes
methods for constructing state transition graphs, identifying attractors,
computing robustness and sensitivity measures, and exporting truth tables.

Several computational routines—particularly those involving state space
exploration, attractor detection, and robustness estimation—offer optional
Numba-based just-in-time (JIT) acceleration. Installing Numba is recommended
for optimal performance but not required; all features remain functional
without it.

This module serves as the central interface for dynamic Boolean network
analysis within the BoolForge package.

Example
-------
>>> from boolforge import BooleanNetwork
>>> bn = BooleanNetwork(F=[[0, 1], [0, 0, 0, 1], [0, 1]], I=[[1], [0, 2], [1]])
>>> bn.get_attractors_synchronous_exact()
"""

import math
import warnings

from collections import defaultdict
from collections.abc import Sequence
from collections import deque
from copy import deepcopy
import numpy as np
import networkx as nx
import pandas as pd

try:
    import boolforge.utils as utils
    from boolforge.boolean_function import BooleanFunction
    from boolforge.wiring_diagram import WiringDiagram
except ModuleNotFoundError:
    import utils as utils
    from boolean_function import BooleanFunction
    from wiring_diagram import WiringDiagram
    
try:
    import cana.boolean_network
    __LOADED_CANA__ = True
except ModuleNotFoundError:
    __LOADED_CANA__ = False

try:
    from numba import njit, int64
    from numba.typed import List
    __LOADED_NUMBA__ = True
except ModuleNotFoundError:
    __LOADED_NUMBA__ = False

__all__ = [
    "dict_weights",
    "get_entropy_of_basin_size_distribution",
    "BooleanNetwork"
]

dict_weights = {'non-essential' : np.nan, 'conditional' : 0, 'positive' : 1, 'negative' : -1}

[docs] def get_entropy_of_basin_size_distribution( basin_sizes: Sequence[int] | np.ndarray ) -> float: """ Compute the Shannon entropy of a basin size distribution. The basin sizes are first normalized to form a probability distribution. The Shannon entropy is then computed as ``H = -sum(p_i * log(p_i))``, where ``p_i`` is the proportion of states in basin ``i``. Parameters ---------- basin_sizes : Sequence[int] or np.ndarray Sizes of the basins of attraction, where each entry corresponds to the number of initial conditions that converge to a given attractor. Returns ------- float Shannon entropy of the basin size distribution. """ total = sum(basin_sizes) probabilities = [size * 1.0 / total for size in basin_sizes] return sum([-np.log(p) * p for p in probabilities])
if __LOADED_NUMBA__: @njit(fastmath=True) # safe: operations are integer-only def _update_network_synchronously_numba( x, F_array_list, I_array_list, ): """ Perform one synchronous update of a Boolean network. Given a binary state vector ``x``, this function computes the next network state under synchronous updating by evaluating each node’s Boolean update function based on its regulators. Parameters ---------- x : np.ndarray Binary state vector of shape ``(N,)`` with dtype ``uint8``. F_array_list : list[np.ndarray] List of truth tables for each node, where the ``j``-th entry is an array of length ``2**k_j`` giving the update rule for node ``j`` with ``k_j`` regulators. I_array_list : list[np.ndarray] List of regulator index arrays, where the ``j``-th entry contains the indices of the regulators of node ``j``. Returns ------- np.ndarray Updated binary state vector of shape ``(N,)`` with dtype ``uint8``. """ N = x.shape[0] fx = np.empty(N, dtype=np.uint8) for j in range(N): regulators = I_array_list[j] if regulators.shape[0] == 0: fx[j] = F_array_list[j][0] else: idx = 0 for k in range(regulators.shape[0]): idx = (idx << 1) | x[regulators[k]] fx[j] = F_array_list[j][idx] return fx @njit def _compute_synchronous_stg_numba( F_array_list, I_array_list, N_variables ): """ Compute the synchronous state transition graph (STG). This Numba-compiled function computes, for every possible binary state of a Boolean network, the index of its successor state under synchronous updating. Parameters ---------- F_array_list : list[np.ndarray] List of Boolean update tables. The ``j``-th entry is a NumPy array of length ``2**k_j`` representing the update rule for node ``j`` with ``k_j`` regulators. I_array_list : list[np.ndarray] List of regulator index arrays. The ``j``-th entry contains the indices of the regulators of node ``j``. N_variables : int Number of variables (nodes) in the network. Returns ------- np.ndarray One-dimensional array of length ``2**N_variables`` containing, for each state index, the index of the successor state under synchronous updating. """ nstates = 2 ** N_variables states = np.zeros((nstates, N_variables), dtype=np.uint8) for i in range(nstates): # binary representation of i for j in range(N_variables): states[i, N_variables - 1 - j] = (i >> j) & 1 next_states = np.zeros_like(states) powers_of_two = 2 ** np.arange(N_variables - 1, -1, -1) # Compute next state for each node for j in range(N_variables): regulators = I_array_list[j] if len(regulators) == 0: # constant node next_states[:, j] = F_array_list[j][0] continue n_reg = len(regulators) reg_powers = 2 ** np.arange(n_reg - 1, -1, -1) for s in range(nstates): idx = 0 for k in range(n_reg): idx += states[s, regulators[k]] * reg_powers[k] next_states[s, j] = F_array_list[j][idx] # Convert each next state to integer index next_indices = np.zeros(nstates, dtype=np.int64) # NOTE: this cannot be an unsigned int for safe indexing inside Numba kernels. for s in range(nstates): val = 0 for j in range(N_variables): val += next_states[s, j] * powers_of_two[j] next_indices[s] = val return next_indices @njit def _compute_synchronous_stg_numba_low_memory( F_array_list, I_array_list, N_variables ): """ Compute the synchronous state transition graph (STG) using minimal memory. For each integer state index ``i`` in ``[0, 2**N_variables)``, this function decodes ``i`` into its binary state vector, computes the synchronous update of the Boolean network, and encodes the resulting state back into an integer index. Parameters ---------- F_array_list : list[np.ndarray] List of Boolean update tables. The ``j``-th entry is an array of length ``2**k_j`` representing the update rule for node ``j`` with ``k_j`` regulators. I_array_list : list[np.ndarray] List of regulator index arrays. The ``j``-th entry contains the indices of the regulators of node ``j``. N_variables : int Number of variables (nodes) in the network. Returns ------- np.ndarray One-dimensional array of length ``2**N_variables`` containing, for each state index, the index of the successor state under synchronous updating. Notes ----- This implementation avoids storing the full state matrix and therefore reduces memory usage from ``O(N * 2**N)`` to ``O(N + 2**N)``. The time complexity remains exponential in ``N_variables``. """ nstates = 2 ** N_variables next_indices = np.zeros(nstates, dtype=np.int64) # NOTE: this cannot be an unsigned int for safe indexing inside Numba kernels. powers_of_two = 2 ** np.arange(N_variables - 1, -1, -1) state = np.zeros(N_variables, dtype=np.uint8) next_state = np.zeros(N_variables, dtype=np.uint8) for i in range(nstates): # --- Decode i into binary vector (most-significant bit first) tmp = i for j in range(N_variables): state[N_variables - 1 - j] = tmp & 1 tmp >>= 1 # --- Compute next-state values for j in range(N_variables): regulators = I_array_list[j] if regulators.shape[0] == 0: next_state[j] = F_array_list[j][0] else: n_reg = regulators.shape[0] idx = 0 for k in range(n_reg): idx = (idx << 1) | state[regulators[k]] next_state[j] = F_array_list[j][idx] # --- Encode next_state back to integer val = 0 for j in range(N_variables): val += next_state[j] * powers_of_two[j] next_indices[i] = val return next_indices @njit(fastmath=True) # safe: operations are integer-only def _hamming_distance(a, b): """ Compute the Hamming distance between two binary vectors. Parameters ---------- a : np.ndarray One-dimensional array of dtype ``uint8``. b : np.ndarray One-dimensional array of dtype ``uint8`` with the same shape as ``a``. Returns ------- int Number of positions at which ``a`` and ``b`` differ. """ dist = 0 for i in range(a.size): dist += a[i] != b[i] return dist @njit(fastmath=True) # safe: operations are integer-only def _derrida_simulation( F_array_list, I_array_list, N, nsim, seed ): """ Perform a Monte Carlo simulation to estimate the Derrida value. This function estimates the Derrida value by repeatedly sampling a random initial state, flipping a single randomly chosen bit, synchronously updating both states, and computing the Hamming distance between the resulting successor states. Parameters ---------- F_array_list : list[np.ndarray] List of Boolean update tables for each node. I_array_list : list[np.ndarray] List of regulator index arrays for each node. N : int Number of variables (nodes) in the network. nsim : int Number of Monte Carlo simulations to perform. seed : int Seed for the Numba-compatible random number generator. Returns ------- float Estimated Derrida value, i.e., the expected Hamming distance after one synchronous update following a random single-bit perturbation. """ # Numba RNG: seed once np.random.seed(seed) total_dist = 0.0 X = np.empty(N, dtype=np.uint8) Y = np.empty(N, dtype=np.uint8) for _ in range(nsim): # Random initial state for i in range(N): X[i] = np.random.randint(0, 2) Y[:] = X # Flip one random bit idx = np.random.randint(0, N) Y[idx] = 1 - Y[idx] # Synchronous updates FX = _update_network_synchronously_numba(X, F_array_list, I_array_list) FY = _update_network_synchronously_numba(Y, F_array_list, I_array_list) total_dist += _hamming_distance(FX, FY) return total_dist / nsim @njit(cache=True) def _attractors_functional_graph(next_state): """ Identify attractors and basins in a functional graph. Given a functional graph represented by a successor array, this function identifies all attractors (cycles), assigns each state to an attractor, and computes basin sizes and cycle properties. Parameters ---------- next_state : np.ndarray One-dimensional integer array of length ``n`` such that ``next_state[x]`` gives the successor of state ``x`` and lies in ``[0, n-1]``. Returns ------- attr_id : np.ndarray Integer array of length ``n`` mapping each state to its attractor index. basin_sizes : np.ndarray Integer array of length ``n_attr`` giving the basin size of each attractor. cycle_rep : np.ndarray Integer array of length ``n_attr`` containing one representative state from each attractor cycle. cycle_len : np.ndarray Integer array of length ``n_attr`` giving the length of each cycle. n_attr : np.int32 Number of attractors in the functional graph. """ n = next_state.shape[0] attr_id = np.full(n, -1, dtype=np.int32) # For detecting cycles within the current walk: # seen[u] == run_id means u was visited in this run # pos[u] = index of u in the current path (when first visited this run) seen = np.zeros(n, dtype=np.int32) pos = np.zeros(n, dtype=np.int32) # Upper bounds: in the worst case every node could be its own 1-cycle basin_sizes_full = np.zeros(n, dtype=np.int32) cycle_rep_full = np.empty(n, dtype=np.int64) cycle_len_full = np.zeros(n, dtype=np.int32) n_attr = 0 # Numba typed list for the current path path = List.empty_list(int64) for start in range(n): if attr_id[start] != -1: continue path.clear() u = start run_id = start + 1 # unique per start; safe while n << 2**31 (always true in practice) # Walk until we hit a known attractor or revisit a node in this run while attr_id[u] == -1 and seen[u] != run_id: seen[u] = run_id pos[u] = len(path) path.append(u) u = next_state[u] if attr_id[u] != -1: # This path flows into an already-known attractor aid = attr_id[u] for i in range(len(path)): v = path[i] attr_id[v] = aid basin_sizes_full[aid] += 1 else: # We found a cycle within the current run. # u is the first repeated node; cycle starts at pos[u] in path cyc_start = pos[u] aid = n_attr n_attr += 1 # Representative and length of the cycle cycle_rep_full[aid] = u cycle_len_full[aid] = len(path) - cyc_start # Assign all nodes on the path to this new attractor for i in range(len(path)): v = path[i] attr_id[v] = aid basin_sizes_full[aid] += 1 return attr_id, basin_sizes_full[:n_attr], cycle_rep_full[:n_attr], cycle_len_full[:n_attr], np.int32(n_attr) @njit(cache=True) def _transient_lengths_functional_numba( succ, is_attr_mask ): """ Compute exact transient length (distance to attractor) for a functional graph. Parameters ---------- succ : int64 array, shape (n_states,) succ[x] = successor of state x is_attr_mask : uint8/bool array, shape (n_states,) 1 if state lies on an attractor cycle, else 0 Returns ------- dist : int64 array, shape (n_states,) dist[x] = number of steps from x to its attractor """ n = succ.shape[0] dist = np.full(n, -1, dtype=np.int64) # Attractor states have distance 0 for i in range(n): if is_attr_mask[i]: dist[i] = 0 for i in range(n): if dist[i] >= 0: continue v = i # Walk forward until we hit a known distance while dist[v] == -1: dist[v] = -2 # temporary marker: "in current path" v = succ[v] # Now dist[v] is either: # 0,1,2,... (known) # or -2 (should not happen if cycles were pre-marked) d = dist[v] # Unwind path, assigning distances v = i while dist[v] == -2: d += 1 nxt = succ[v] dist[v] = d v = nxt return dist @njit(cache=True) def _robustness_edge_traversal_numba( N, attractor_idx, is_attr_mask, dist_attr ): """ Traverse hypercube edges to compute basin and attractor robustness measures. This function iterates over all undirected edges of the Boolean hypercube exactly once and accumulates coherence and fragility contributions for basins of attraction and for attractor states. Parameters ---------- N : int Number of variables (dimension of the Boolean hypercube). attractor_idx : np.ndarray Integer array of shape ``(2**N,)`` mapping each state to its attractor index in ``[0, n_attr - 1]``. is_attr_mask : np.ndarray Boolean or uint8 array of shape ``(2**N,)`` indicating whether a state lies on an attractor. dist_attr : np.ndarray Two-dimensional array of shape ``(n_attr, n_attr)`` giving pairwise distances between attractors. Returns ------- basin_coh : np.ndarray Array of length ``n_attr`` containing basin coherence values. basin_frag : np.ndarray Array of length ``n_attr`` containing basin fragility values. attr_coh : np.ndarray Array of length ``n_attr`` containing attractor coherence values. attr_frag : np.ndarray Array of length ``n_attr`` containing attractor fragility values. """ n_states = attractor_idx.shape[0] n_attr = dist_attr.shape[0] basin_coh = np.zeros(n_attr, dtype=np.float64) basin_frag = np.zeros(n_attr, dtype=np.float64) attr_coh = np.zeros(n_attr, dtype=np.float64) attr_frag = np.zeros(n_attr, dtype=np.float64) # Iterate each undirected hypercube edge exactly once: # For x, flip only bits that are 0 -> y = x | (1<<bit), which guarantees y > x. for xdec in range(n_states): idx_x = attractor_idx[xdec] # (Should never be -1 if attractor_idx is filled for all states) for bit in range(N): if (xdec >> bit) & 1: continue ydec = xdec | (1 << bit) idx_y = attractor_idx[ydec] if idx_x == idx_y: # same basin: count both directions (like your +2) basin_coh[idx_x] += 2.0 if is_attr_mask[xdec]: attr_coh[idx_x] += 1.0 if is_attr_mask[ydec]: attr_coh[idx_y] += 1.0 else: dxy = dist_attr[idx_x, idx_y] basin_frag[idx_x] += dxy basin_frag[idx_y] += dxy if is_attr_mask[xdec]: attr_frag[idx_x] += dxy if is_attr_mask[ydec]: attr_frag[idx_y] += dxy return basin_coh, basin_frag, attr_coh, attr_frag @njit(cache=True) def _robustness_edge_traversal_numba_stratified( N, attractor_idx, is_attr_mask, dist_attr, dist_state, max_dist ): n_states = attractor_idx.shape[0] n_attr = dist_attr.shape[0] basin_coh = np.zeros(n_attr, dtype=np.float64) basin_frag = np.zeros(n_attr, dtype=np.float64) attr_coh = np.zeros(n_attr, dtype=np.float64) attr_frag = np.zeros(n_attr, dtype=np.float64) strat_coh = np.zeros((n_attr, max_dist + 1), dtype=np.float64) strat_cnt = np.zeros((n_attr, max_dist + 1), dtype=np.int64) for xdec in range(n_states): dx = dist_state[xdec] idx_x = attractor_idx[xdec] for bit in range(N): if (xdec >> bit) & 1: continue ydec = xdec | (1 << bit) dy = dist_state[ydec] idx_y = attractor_idx[ydec] strat_cnt[idx_x, dx] += 1 strat_cnt[idx_y, dy] += 1 if idx_x == idx_y: basin_coh[idx_x] += 2.0 if is_attr_mask[xdec]: attr_coh[idx_x] += 1.0 if is_attr_mask[ydec]: attr_coh[idx_y] += 1.0 strat_coh[idx_x, dx] += 1.0 strat_coh[idx_y, dy] += 1.0 else: dxy = dist_attr[idx_x, idx_y] basin_frag[idx_x] += dxy basin_frag[idx_y] += dxy if is_attr_mask[xdec]: attr_frag[idx_x] += dxy if is_attr_mask[ydec]: attr_frag[idx_y] += dxy return ( basin_coh, basin_frag, attr_coh, attr_frag, strat_coh, strat_cnt, )
[docs] class BooleanNetwork(WiringDiagram): """ Representation of a Boolean network. A Boolean network consists of a wiring diagram specifying regulatory interactions between nodes and a collection of Boolean update functions defining the dynamics at each node. In a BooleanNetwork, constant nodes are removed during initialization, so all nodes represent dynamic variables. Parameters ---------- F : sequence Sequence of Boolean update functions or truth tables. Each entry may be a ``BooleanFunction`` instance, a truth table, or a Boolean expression. The length of ``F`` must match the number of nodes in the wiring diagram. I : sequence of sequences of int or WiringDiagram Wiring diagram specifying the regulators of each node, or an existing ``WiringDiagram`` instance. variables : sequence of str, optional Names of the variables corresponding to each node. Ignored if ``I`` is provided as a ``WiringDiagram``. SIMPLIFY_FUNCTIONS : bool, optional If True, simplify Boolean update functions after initialization. Default is False. Attributes ---------- F : list[BooleanFunction] Boolean update functions for each node. I : list[np.ndarray[int]] Wiring diagram specifying the regulators of each node. variables : np.ndarray[str] Names of the variables corresponding to each node. N : int Number of dynamic (non-constant) nodes in the network. indegrees : np.ndarray[int] Indegree of each node. outdegrees : np.ndarray[int] Outdegree of each node. constants : dict[str, dict[str, int | list[str]]] Mapping of node indices to constant values. weights : list[np.ndarray[float]] or None Interaction weights associated with the wiring diagram. STG : dict or None State transition graph, initialized to None and computed on demand. """ def __init__( self, F: Sequence[BooleanFunction | list[int] | np.ndarray], I: Sequence[Sequence[int]] | WiringDiagram, variables: Sequence[str] | None = None, SIMPLIFY_FUNCTIONS: bool = False, ): """ Initialize a Boolean network. A Boolean network is defined by a wiring diagram specifying regulatory interactions between nodes and a collection of Boolean update functions defining the dynamics at each node. Constant nodes (nodes with no regulators) are automatically eliminated during initialization and stored in the ``constants`` attribute. Parameters ---------- F : sequence of BooleanFunction or array-like of int Boolean update functions for each node. Each entry must be either a ``BooleanFunction`` instance or a truth table encoding the function outputs. The length of ``F`` must match the number of nodes in the wiring diagram, and each function must have an arity consistent with the indegree of the corresponding node. I : sequence of sequences of int or WiringDiagram Wiring diagram specifying the regulators of each node, or an existing ``WiringDiagram`` instance. Regulator indices are assumed to be zero-based. variables : sequence of str, optional Names of the variables corresponding to each node. Ignored if ``I`` is provided as a ``WiringDiagram``. SIMPLIFY_FUNCTIONS : bool, optional If True, Boolean update functions are simplified after initialization. Default is False. Raises ------ TypeError If ``F`` is not a sequence of ``BooleanFunction`` objects or truth tables, or if ``I`` is not a valid wiring diagram specification. ValueError If the length of ``F`` does not match the number of nodes in the wiring diagram, or if a Boolean function has an arity inconsistent with the wiring diagram. Notes ----- - Constant nodes are removed from the dynamic network during initialization and recorded in the ``constants`` attribute. - After initialization, the attribute ``N`` refers to the number of remaining dynamic nodes. - The state transition graph (``STG``) is initialized to ``None`` and computed on demand. """ # ---- Validate inputs ------------------------------------------------- if isinstance(F, (str, bytes)) or not isinstance(F, Sequence): raise TypeError( "F must be a sequence of BooleanFunction objects or truth tables" ) if isinstance(I, (str, bytes)) or not isinstance(I, (Sequence, WiringDiagram)): raise TypeError( "I must be a sequence of sequences of int or a WiringDiagram instance" ) # ---- Initialize wiring diagram -------------------------------------- if isinstance(I, WiringDiagram): if variables is not None: warnings.warn( "Provided variables ignored; using variables from WiringDiagram.", UserWarning, ) super().__init__(I.I, I.variables) else: super().__init__(I, variables) if len(F) != self.N: raise ValueError("len(F) must match the number of nodes in the wiring diagram") # ---- Initialize Boolean functions ----------------------------------- self.F = [] for i, f in enumerate(F): if isinstance(f, (list, np.ndarray)): bf = BooleanFunction(f, name=self.variables[i]) elif isinstance(f, BooleanFunction): bf = f bf.name = self.variables[i] else: raise TypeError( f"Invalid entry in F at index {i}: expected BooleanFunction, " f"truth table, got {type(f)}" ) if bf.n != self.indegrees[i]: raise ValueError( f"Index {i}: function has {bf.n} inputs but wiring diagram " f"has indegree {self.indegrees[i]}" ) self.F.append(bf) # ---- Constant bookkeeping ------------------------------------------- # Always initialize (may already exist if called from get_network_with_fixed_source_nodes, etc) self.constants = {} # IMPORTANT: remove constants based on topology, not on dict contents if np.any(self.indegrees == 0): self.remove_constants() # ---- State transition graph ----------------------------------------- self.STG = None # ---- Optional simplification ---------------------------------------- if SIMPLIFY_FUNCTIONS: self.simplify_functions()
[docs] def remove_constants(self) -> None: """ Remove structurally constant nodes from the Boolean network. A node is considered constant if it has no regulators (indegree zero). Such nodes are eliminated from the dynamic network by propagating their fixed Boolean values to downstream nodes. Eliminated constants and their effects are recorded in the ``constants`` attribute. Notes ----- - The Boolean value of a constant node is taken from its Boolean function. - After removal, ``self.N`` refers to the number of remaining dynamic nodes. - Nodes that lose all regulators as a result of constant removal are assigned a non-essential self-loop to preserve network structure. """ # Identify constant nodes from topology indices_constants = self.get_constants(AS_DICT=False) if len(indices_constants) == 0: return dict_constants = self.get_constants(AS_DICT=True) values_constants = [int(self.F[c][0]) for c in indices_constants] # Propagate constant values downstream for id_constant, value in zip(indices_constants, values_constants): regulated_nodes = [] for i in range(self.N): if dict_constants[i]: continue try: index = list(self.I[i]).index(id_constant) except ValueError: continue truth_table = utils.get_left_side_of_truth_table(self.indegrees[i]) indices_to_keep = np.where(truth_table[:, index] == value)[0] self.F[i].f = self.F[i].f[indices_to_keep] if self.weights is not None: self.weights[i] = self.weights[i][self.I[i] != id_constant] self.I[i] = self.I[i][self.I[i] != id_constant] self.indegrees[i] -= 1 self.F[i].n -= 1 regulated_nodes.append(str(self.variables[i])) self.constants[str(self.variables[id_constant])] = { "value": value, "regulatedNodes": regulated_nodes, } # Ensure no remaining node loses all regulators for i in range(self.N): if dict_constants[i]: continue if self.indegrees[i] == 0: self.indegrees[i] = 1 self.F[i].n = 1 self.F[i].f = np.array([self.F[i][0], self.F[i][0]], dtype=int) self.I[i] = np.array([i], dtype=int) if self.weights is not None: self.weights[i] = np.array([np.nan], dtype=float) # Remove constant nodes structurally (using original mask) self.F = [self.F[i] for i in range(self.N) if not dict_constants[i]] adjustment_for_I = np.cumsum([dict_constants[i] for i in range(self.N)]) self.I = [ self.I[i] - adjustment_for_I[self.I[i]] for i in range(self.N) if not dict_constants[i] ] if self.weights is not None: self.weights = [self.weights[i] for i in range(self.N) if not dict_constants[i]] self.variables = np.array( [self.variables[i] for i in range(self.N) if not dict_constants[i]], dtype=str, ) self.indegrees = np.array( [self.indegrees[i] for i in range(self.N) if not dict_constants[i]], dtype=int, ) # Update network size and recompute outdegrees self.N -= len(indices_constants) self.outdegrees = self.get_outdegrees()
[docs] @classmethod def from_cana( cls, cana_BooleanNetwork: "cana.boolean_network.BooleanNetwork", ) -> "BooleanNetwork": """ Construct a BooleanNetwork from a ``cana.BooleanNetwork`` instance. This compatibility method converts a Boolean network defined using the ``cana`` package into a BoolForge ``BooleanNetwork``. Parameters ---------- cana_BooleanNetwork : cana.boolean_network.BooleanNetwork A Boolean network instance from the ``cana`` package. Returns ------- BooleanNetwork The corresponding BoolForge BooleanNetwork. Raises ------ TypeError If the input object does not appear to be a valid CANA BooleanNetwork. KeyError If required fields are missing from the CANA logic specification. """ try: logic = cana_BooleanNetwork.logic except AttributeError as e: raise TypeError( "Input must be a cana.boolean_network.BooleanNetwork instance." ) from e F = [] I = [] variables = [] # Ensure deterministic ordering by node index for idx in sorted(logic.keys()): entry = logic[idx] if "name" not in entry or "in" not in entry or "out" not in entry: raise KeyError( f"Logic entry for node {idx} must contain keys " "'name', 'in', and 'out'." ) variables.append(str(entry["name"])) I.append(list(entry["in"])) F.append(np.array(entry["out"], dtype=int)) return cls(F=F, I=I, variables=variables)
[docs] @classmethod def from_string( cls, network_string: str, separator: str | Sequence[str] = ",", max_degree: int = 24, original_not: str | Sequence[str] = "NOT", original_and: str | Sequence[str] = "AND", original_or: str | Sequence[str] = "OR", ALLOW_TRUNCATION: bool = False ) -> "BooleanNetwork": """ Construct a BooleanNetwork from a textual Boolean rule specification. This compatibility method parses a string representation of Boolean update rules and constructs a corresponding BooleanNetwork. The input format is intended for legacy or trusted sources and supports logical expressions using AND/OR/NOT operators. .. warning:: This method uses ``eval`` internally and MUST NOT be used on untrusted input. It is provided solely for backward compatibility. Parameters ---------- network_string : str String encoding Boolean update rules, one per line. separator : str or sequence of str, optional Separator(s) between variable names and Boolean expressions. max_degree : int, optional Maximum allowed indegree for explicit truth-table construction. original_not, original_and, original_or : str or sequence of str, optional Operator strings to be replaced by logical NOT, AND, OR. ALLOW_TRUNCATION : bool, optional If False (default), nodes with indegree greater than ``max_degree`` raise a ValueError. If True, such nodes are replaced by identity self-loops, allowing fast construction of large networks while ignoring high-degree functions. Returns ------- BooleanNetwork The constructed Boolean network. Raises ------ ValueError If parsing fails or if ``ALLOW_TRUNCATION`` is False and a node exceeds ``max_degree``. """ sepstr, andop, orop, notop = "@", "∧", "∨", "¬" get_dummy_var = lambda i: f"x{int(i)}y" # reformat network string def _replace_all(string, original, replacement): if isinstance(original, (list, np.ndarray)): for s in original: string = string.replace(s, f" {replacement} ") else: string = string.replace(original, f" {replacement} ") return string text = ( network_string.replace('\t', ' ',) .replace('(', ' ( ') .replace(')', ' ) ') ) text = _replace_all(text, separator, sepstr) text = _replace_all(text, original_not, notop) text = _replace_all(text, original_and, andop) text = _replace_all(text, original_or, orop) # Remove comments and empty lines lines = [ l for l in text.splitlines() if l.strip() and not l.strip().startswith("#") ] n = len(lines) # find variables and constants var = ["" for i in range(n)] for i in range(n): var[i] = lines[i].split(sepstr)[0].replace(' ', '') consts_and_vars = [] for line in lines: words = line.split(' ') for word in words: if word not in ['(', ')', sepstr, andop, orop, notop, ''] and not utils.is_float(word): consts_and_vars.append(word) consts = list(set(consts_and_vars)-set(var)) dict_var_const = dict(list(zip(var, [get_dummy_var(i) for i in range(len(var))]))) dict_var_const.update(dict(list(zip(consts, [get_dummy_var(i+len(var)) for i in range(len(consts))])))) # replace all variables and constants with dummy names for i, line in enumerate(lines): words = line.split(' ') for j, word in enumerate(words): if word not in ['(', ')', sepstr, andop, orop, notop, ''] and not utils.is_float(word): words[j] = dict_var_const[word] lines[i] = ' '.join(words) # update line to only be function expressions = [line.split(sepstr)[1] for line in lines] # generate wiring diagram I I: list[np.ndarray] = [] for i in range(n): try: idcs_open = utils.find_all_indices(expressions[i], 'x') idcs_end = utils.find_all_indices(expressions[i], 'y') regs = np.sort(np.array(list(map(int,list(set([expressions[i][(begin+1):end] for begin,end in zip(idcs_open,idcs_end)])))))) I.append(regs) except ValueError: I.append(np.array([], int)) # Build Boolean functions F: list[np.ndarray] = [] for i, expr in enumerate(expressions): deg = len(I[i]) if deg == 0: f = np.array([int(expr)], int) elif deg <= max_degree: tt = utils.get_left_side_of_truth_table(deg) env = { get_dummy_var(I[i][j]) : tt[:, j].astype(bool) for j in range(deg) } f = eval( expr.replace(andop, '&') .replace(orop, '|') .replace(notop, '~') .replace(' ', ''), {"__builtins__" : None}, env ) else: if not ALLOW_TRUNCATION: raise ValueError( f"Node '{var[i]}' has indegree {deg} > max_degree={max_degree}." ) # Truncate: identity self-loop F.append(np.array([0, 1], dtype=int)) I[i] = np.array([i], dtype=int) F.append(f.astype(int)) for j in range(len(consts)): F.append(np.array([0, 1], dtype=int)) I.append(np.array([len(var) + j])) return cls(F, I, var+consts)
[docs] @classmethod def from_DiGraph(cls, nx_DiGraph: "nx.DiGraph") -> "WiringDiagram": raise NotImplementedError( "from_DiGraph is not supported for BooleanNetwork. " "Use WiringDiagram.from_DiGraph and then construct " "a BooleanNetwork by providing Boolean update functions." )
[docs] def to_cana(self) -> "cana.boolean_network.BooleanNetwork": """ Export the Boolean network as a ``cana.BooleanNetwork`` instance. This compatibility method converts the current BooleanNetwork into an equivalent representation from the ``cana`` package. The exported network reflects the current state of the model, including any removed constants, simplifications, or identity self-loops. Returns ------- cana.boolean_network.BooleanNetwork A ``cana`` BooleanNetwork instance representing this network. Raises ------ ImportError If the ``cana`` package is not installed. """ try: import cana except ImportError as e: raise ImportError( "The 'cana' package is required for to_cana()." ) from e logic_dicts = [] for bf, regulators, var in zip(self.F, self.I, self.variables): logic_dicts.append( { "name": var, "in": list(regulators), "out": bf.f.tolist(), } ) return cana.boolean_network.BooleanNetwork( Nnodes=self.N, logic={i: d for i, d in enumerate(logic_dicts)}, )
[docs] def to_bnet( self, separator: str = ",\t", AS_POLYNOMIAL: bool = True, ) -> str: """ Export the Boolean network in BNET format. This compatibility method returns a string representation of the Boolean network in the BNET format used by tools such as BoolNet and PyBoolNet, with one line per variable of the form ``variable <separator> function``. Parameters ---------- separator : str, optional String used to separate the target variable from its update function. Default is `",\\t"`. AS_POLYNOMIAL : bool, optional If True (default), return Boolean functions in polynomial form. If False, return functions as logical expressions. Returns ------- str A string containing the BNET representation of the network. """ lines = [] constants_indices = self.get_constants(AS_DICT=True) for i in range(self.N): if constants_indices.get(i, False): function = str(self.F[i].f[0]) elif AS_POLYNOMIAL: function = utils.bool_to_poly( self.F[i], self.variables[self.I[i]].tolist(), ) else: function = self.F[i].to_expression(" & ", " | ") lines.append(f"{self.variables[i]}{separator}{function}") return "\n".join(lines)
[docs] def to_truth_table( self, filename: str | None = None, ) -> pd.DataFrame: """ Construct the full synchronous truth table of the Boolean network. Each row corresponds to a network state at time ``t`` and its deterministic successor at time ``t+1`` under synchronous updating. Parameters ---------- filename : str, optional If provided, the truth table is written to a file. The file extension determines the format and must be one of ``'csv'``, ``'xls'``, or ``'xlsx'``. If None (default), no file is created. Returns ------- pandas.DataFrame The full truth table with shape ``(2**N, 2*N)``. Notes ----- - States are enumerated in lexicographic order, consistent with ``utils.get_left_side_of_truth_table``. - This method computes and stores the synchronous state transition graph (``self.STG``) if it has not been computed previously. - Exporting to Excel requires the ``openpyxl`` package. """ columns = [name + "(t)" for name in self.variables] columns += [name + "(t+1)" for name in self.variables] if self.STG is None: self.compute_synchronous_state_transition_graph() data = np.zeros((2**self.N, 2*self.N), dtype=int) data[:, :self.N] = utils.get_left_side_of_truth_table(self.N) for i in range(2**self.N): data[i, self.N:] = utils.dec2bin(self.STG[i], self.N) truth_table = pd.DataFrame(data, columns=columns) if filename is not None: if not isinstance(filename, str): raise TypeError("filename must be a string") ending = filename.split(".")[-1] if ending not in {"csv", "xls", "xlsx"}: raise ValueError("filename must end in 'csv', 'xls', or 'xlsx'") if ending == "csv": truth_table.to_csv(filename, index=False) else: truth_table.to_excel(filename, index=False) return truth_table
def __len__(self): return self.N
[docs] def __str__(self): return ( f"BooleanNetwork(N={self.N}, " f"indegrees={self.indegrees.tolist()})" )
def __getitem__(self, index): return self.F[index]
[docs] def __repr__(self): return f"{type(self).__name__}(N={self.N})"
[docs] def __call__(self, state): """ Apply one synchronous update step to the Boolean network. The next state is obtained by evaluating each node's Boolean update function on the current values of its regulators. Parameters ---------- state : sequence of int Current network state as a binary vector of length ``N``, ordered according to ``self.variables``. Returns ------- np.ndarray The updated network state after one synchronous update. Notes ----- This method is equivalent to calling ``update_network_synchronously``. """ return self.update_network_synchronously(state)
[docs] def get_types_of_regulation(self) -> list[np.ndarray]: """ Compute and return regulation types (weights) for all nodes in the network. For each Boolean function, the type of each input regulation is determined via ``BooleanFunction.get_type_of_inputs`` and mapped to numerical weights using ``dict_weights``. The resulting weights are stored in the ``self.weights`` attribute and also returned. Returns ------- list of np.ndarray Regulation weights for each node, aligned with the wiring diagram. Notes ----- - This method recomputes ``self.weights`` from scratch. - Calling this method overwrites any existing values in ``self.weights``. """ self.weights = [ np.array([dict_weights[el] for el in bf.get_type_of_inputs()], dtype=float) for bf in self.F ] return self.weights
## Transform Boolean networks
[docs] def simplify_functions(self) -> None: """ Remove all non-essential regulators from the Boolean network. For each node, non-essential regulators (identified via ``np.nan`` entries in ``self.weights``) are removed from the wiring diagram and the associated Boolean function is restricted to its essential inputs. Nodes that would otherwise lose all regulators are assigned an identity self-loop to preserve network structure. Notes ----- - This method modifies the network in place. - Regulation types (``self.weights``) are recomputed if necessary. - Identity self-loops introduced here are structural artifacts and do not represent genuine regulatory interactions. """ # Ensure regulation types / weights are available self.get_types_of_regulation() for i in range(self.N): regulator_is_non_essential = np.isnan(self.weights[i]) # All regulators are essential if not np.any(regulator_is_non_essential): continue non_essential_variables = np.where(regulator_is_non_essential)[0] essential_variables = np.where(~regulator_is_non_essential)[0] # Update outdegrees (each regulator appears at most once in I[i]) self.outdegrees[non_essential_variables] -= 1 # No essential regulators: introduce identity self-loop if len(essential_variables) == 0: self.indegrees[i] = 1 self.F[i].f = np.array([self.F[i][0], self.F[i][0]], dtype=int) self.F[i].n = 1 self.F[i].variables = np.array([self.variables[i]], dtype=str) self.I[i] = np.array([i], dtype=int) self.weights[i] = np.array([np.nan], dtype=float) self.outdegrees[i] += 1 # keep sum(outdegrees) == sum(indegrees) continue # Restrict truth table to essential inputs left_side = utils.get_left_side_of_truth_table(self.indegrees[i]) mask = np.sum(left_side[:, non_essential_variables], axis=1) == 0 self.F[i].f = self.F[i][mask] self.F[i].n = len(essential_variables) self.F[i].variables = self.F[i].variables[~regulator_is_non_essential] self.I[i] = self.I[i][essential_variables] self.weights[i] = self.weights[i][essential_variables] self.indegrees[i] = len(essential_variables)
[docs] def get_identity_nodes( self, AS_DICT: bool = False ) -> dict[int, bool] | np.ndarray: """ Identify identity (memory) nodes in the Boolean network. An identity node is a node with a single self-regulatory edge whose Boolean update function is the identity function ``f(x) = x``. Such nodes retain their state over time unless externally modified. Parameters ---------- AS_DICT : bool, optional If True, return a dictionary mapping node indices to booleans. If False (default), return an array of indices of identity nodes. Returns ------- dict[int, bool] or np.ndarray If ``AS_DICT`` is True, a dictionary indicating which nodes are identity nodes. If ``AS_DICT`` is False, an array of indices of identity nodes. """ is_identity = np.array( [ self.indegrees[i] == 1 and self.I[i][0] == i and self.F[i][0] == 0 and self.F[i][1] == 1 for i in range(self.N) ], dtype=bool, ) if AS_DICT: return dict(enumerate(is_identity.tolist())) return np.where(is_identity)[0]
[docs] def get_network_with_fixed_identity_nodes( self, values_identity_nodes: Sequence[int], ) -> "BooleanNetwork": """ Construct a Boolean network with identity nodes fixed to given values. Identity nodes are nodes with a single self-regulatory edge and identity update rule ``f(x) = x``. This method fixes the values of such nodes and returns a new BooleanNetwork with the corresponding constants removed. Parameters ---------- values_identity_nodes : sequence of int Values to fix for each identity node, in the order returned by ``get_identity_nodes(AS_DICT=False)``. Each value must be either 0 or 1. Returns ------- BooleanNetwork A new BooleanNetwork with the specified identity nodes fixed. Any constants previously removed from the original network are preserved. """ indices_identity_nodes = self.get_identity_nodes(AS_DICT=False) if len(values_identity_nodes) != len(indices_identity_nodes): raise ValueError( f"The number of values provided ({len(values_identity_nodes)}) must " f"match the number of identity nodes ({len(indices_identity_nodes)})." ) for v in values_identity_nodes: if v not in (0, 1): raise ValueError("Identity node values must be 0 or 1.") F = deepcopy(self.F) I = deepcopy(self.I) for identity_node, value in zip(indices_identity_nodes, values_identity_nodes): F[identity_node].f = np.array([value], dtype=int) I[identity_node] = np.array([], dtype=int) bn = self.__class__(F, I, self.variables) # Preserve previously removed constants bn.constants.update(self.constants) return bn
[docs] def get_network_with_node_controls( self, indices_controlled_nodes: Sequence[int], values_controlled_nodes: Sequence[int], KEEP_CONTROLLED_NODES: bool = False, ) -> "BooleanNetwork": """ Construct a Boolean network with specified nodes fixed to given values. This method applies node-level interventions by fixing selected nodes to constant Boolean values. Controlled nodes may either be removed from the dynamic network as constants or retained as identity-clamped nodes. Parameters ---------- indices_controlled_nodes : sequence of int Indices of nodes to be fixed. values_controlled_nodes : sequence of int Values to fix for each specified node, in the same order as ``indices_controlled_nodes``. Each value must be either 0 or 1. KEEP_CONTROLLED_NODES : bool, optional If True, controlled nodes are retained in the network as identity nodes with self-loops. If False (default), controlled nodes are eliminated as constants. Returns ------- BooleanNetwork A new BooleanNetwork with the specified node controls applied. """ if len(indices_controlled_nodes) != len(values_controlled_nodes): raise ValueError( f"The number of controlled nodes ({len(indices_controlled_nodes)}) " f"must match the number of values provided ({len(values_controlled_nodes)})." ) for node in indices_controlled_nodes: if not isinstance(node, int) or node < 0 or node >= self.N: raise ValueError(f"Invalid node index: {node}") for v in values_controlled_nodes: if v not in (0, 1): raise ValueError("Controlled node values must be 0 or 1.") F = deepcopy(self.F) I = deepcopy(self.I) for node, value in zip(indices_controlled_nodes, values_controlled_nodes): if KEEP_CONTROLLED_NODES: # Identity-clamped node F[node].f = np.array([value, value], dtype=int) I[node] = np.array([node], dtype=int) else: # Structural constant (to be removed) F[node].f = np.array([value], dtype=int) I[node] = np.array([], dtype=int) bn = self.__class__(F, I, self.variables) # Preserve previously removed constants if controlled nodes are eliminated if not KEEP_CONTROLLED_NODES: bn.constants.update(self.constants) return bn
[docs] def get_network_with_edge_controls( self, control_targets: Sequence[int], control_sources: Sequence[int], values_edge_controls: Sequence[int] | None = None, ) -> "BooleanNetwork": """ Construct a Boolean network with specified regulatory edges controlled. This method fixes the influence of selected source nodes on selected target nodes by restricting the target's Boolean update function to entries where the source assumes a specified value, and then removing the corresponding regulatory edge. Parameters ---------- control_targets : sequence of int Indices of target nodes. control_sources : sequence of int Indices of source nodes whose influence on the corresponding targets is to be controlled. values_edge_controls : sequence of int, optional Fixed values (0 or 1) imposed on each controlled edge. If None, all controlled edges are fixed to 0. Returns ------- BooleanNetwork A new BooleanNetwork with the specified edge controls applied. Raises ------ ValueError If input lengths do not match, indices are invalid, or edge values are not in {0, 1}. """ if len(control_targets) != len(control_sources): raise ValueError("control_targets and control_sources must have equal length.") if values_edge_controls is None: values_edge_controls = [0] * len(control_targets) if len(values_edge_controls) != len(control_targets): raise ValueError( "values_edge_controls must have the same length as control_targets." ) F_new = deepcopy(self.F) I_new = deepcopy(self.I) for target, source, fixed_value in zip( control_targets, control_sources, values_edge_controls ): if fixed_value not in (0, 1): raise ValueError("Edge control values must be 0 or 1.") if not (0 <= target < self.N): raise ValueError(f"Invalid target index: {target}") if not (0 <= source < self.N): raise ValueError(f"Invalid source index: {source}") if source not in I_new[target]: raise ValueError( f"Source node {source} is not a regulator of target node {target}." ) idx_reg = list(I_new[target]).index(source) n_inputs = self.indegrees[target] truth_indices = np.arange(2**n_inputs, dtype=np.uint32) mask = ((truth_indices >> (n_inputs - 1 - idx_reg)) & 1) == fixed_value # Restrict truth table F_new[target].f = F_new[target].f[mask] F_new[target].n -= 1 # Remove regulator I_new[target] = np.delete(I_new[target], idx_reg) return self.__class__(F_new, I_new, self.variables)
[docs] def update_single_node( self, index: int, states_regulators: Sequence[int], ) -> int: """ Update the state of a single node. The new state is obtained by applying the Boolean update function to the states of its regulators. Parameters ---------- index : int Index of the node to update. states_regulators : sequence of int Binary states of the node's regulators. Returns ------- int Updated state of the node (0 or 1). """ return self.F[index].f[utils.bin2dec(states_regulators)].item()
[docs] def update_network_synchronously( self, state: Sequence[int], ) -> np.ndarray: """ Perform a synchronous update of the Boolean network. Parameters ---------- state : sequence of int Binary state vector of length ``N``. Returns ------- np.ndarray Updated state vector. """ state = np.asarray(state, dtype=int) if state.shape[0] != self.N: raise ValueError( f"State vector must have length {self.N}, got {state.shape[0]}." ) if not np.all((state == 0) | (state == 1)): raise ValueError("State vector must be binary (0 or 1).") return self._update_network_synchronously_unchecked(state)
def _update_network_synchronously_unchecked( self, state: np.ndarray, ) -> np.ndarray: """Internal fast path. Assumes validated binary state.""" next_state = np.zeros(self.N, dtype=int) for i in range(self.N): next_state[i] = self.F[i].f[utils.bin2dec(state[self.I[i]])] return next_state
[docs] def update_network_SDDS( self, state: Sequence[int], P: np.ndarray, *, rng=None, ) -> np.ndarray: """ Perform a stochastic discrete dynamical system (SDDS) update of the network. This update scheme follows the SDDS formalism: for each node, the deterministic Boolean update is first computed. If the update would increase the node's state, the change occurs with the node-specific activation probability. If the update would decrease the node's state, the change occurs with the node-specific degradation probability. Otherwise, the node's state remains unchanged. Parameters ---------- state : sequence of int Current network state (binary vector of length ``N``). P : np.ndarray Array of shape ``(N, 2)``, where ``P[i, 0]`` is the activation probability and ``P[i, 1]`` is the degradation probability for node ``i``. rng : optional Random number generator or seed, passed to ``utils._coerce_rng``. Returns ------- np.ndarray Updated network state after one stochastic SDDS update. Notes ----- This implementation follows the SDDS framework introduced in: Murrugarra, D., Veliz-Cuba, A., Aguilar, B., Arat, S., & Laubenbacher, R. (2012). *Modeling stochasticity and variability in gene regulatory networks*. EURASIP Journal on Bioinformatics and Systems Biology, 2012(1), 5. The method assumes that ``state`` is a valid binary vector and that ``P`` has the correct shape; no additional validation is performed for performance reasons. """ rng = utils._coerce_rng(rng) state = np.asarray(state, dtype=int) Fx = state.copy() for i in range(self.N): nextstep = self.update_single_node( index=i, states_regulators=state[self.I[i]], ) if nextstep > state[i]: if rng.random() < P[i, 0]: Fx[i] = nextstep elif nextstep < state[i]: if rng.random() < P[i, 1]: Fx[i] = nextstep return Fx
[docs] def get_steady_states_asynchronous_exact( self, stochastic_weights: Sequence[float] | None = None, max_iterations: int = 1000, tol: float = 1e-9, ) -> dict: """ Compute the steady states and basin probabilities under general asynchronous update. This method exhaustively constructs the asynchronous state transition graph (STG) of the Boolean network under a general asynchronous update scheme, where nodes are selected for update according to given propensities. The resulting Markov chain is solved exactly using an iterative Gauss–Seidel scheme to obtain absorption probabilities into steady states. Parameters ---------- stochastic_weights : sequence of float or None, optional Relative update propensities for each node. If None (default), all nodes are updated with equal probability. The weights are normalized internally. max_iterations : int, optional Maximum number of Gauss–Seidel iterations used to compute absorption probabilities before declaring non-convergence. tol : float, optional Convergence tolerance for the infinity norm of probability updates. s Returns ------- dict Dictionary with the following entries: - ``"SteadyStates"`` : list of int Decimal representations of steady states. - ``"NumberOfSteadyStates"`` : int Total number of steady states. - ``"BasinSizes"`` : np.ndarray Fraction of the state space converging to each steady state. - ``"STGAsynchronous"`` : dict Asynchronous state transition graph represented as a Markov kernel. - ``"FinalTransitionProbabilities"`` : np.ndarray Absorption probabilities from each state into each steady state. Raises ------ RuntimeError If the iterative solver does not converge within ``max_iterations``. """ left_side_of_truth_table = utils.get_left_side_of_truth_table(self.N) if stochastic_weights is not None: stochastic_weights = np.asarray(stochastic_weights, dtype=float) if stochastic_weights.shape[0] != self.N: raise ValueError("stochastic_weights must have length N.") if np.any(stochastic_weights <= 0): raise ValueError("stochastic_weights must be strictly positive.") stochastic_weights = stochastic_weights / stochastic_weights.sum() else: stochastic_weights = np.full(self.N, 1.0 / self.N) steady_states = [] steady_state_dict = {} STG = dict(zip(range(2**self.N),[{} for i in range(2**self.N)])) sped_up_STG = dict(zip(range(2**self.N),[[np.zeros(0,dtype=int),np.zeros(0,dtype=float)] for i in range(2**self.N)])) for xdec in range(2**self.N): x = left_side_of_truth_table[xdec].copy() #important: must create a copy here! to_be_distributed = 0 for i in range(self.N): fx_i = self.update_single_node(i, x[self.I[i]]) if fx_i > x[i]: fxdec = xdec + 2**(self.N - 1 - i) elif fx_i < x[i]: fxdec = xdec - 2**(self.N - 1 - i) else: fxdec = xdec if fxdec in STG[xdec]: STG[xdec][fxdec] += stochastic_weights[i] else: STG[xdec][fxdec] = stochastic_weights[i] if fxdec!=xdec: sped_up_STG[xdec][0] = np.append(sped_up_STG[xdec][0], fxdec) sped_up_STG[xdec][1] = np.append(sped_up_STG[xdec][1], stochastic_weights[i]) else: to_be_distributed += stochastic_weights[i] if to_be_distributed < 1: sped_up_STG[xdec][1] /= (1-to_be_distributed) if len(STG[xdec])==1: steady_state_dict[xdec] = len(steady_states) steady_states.append(xdec) sped_up_STG[xdec][0] = np.append(sped_up_STG[xdec][0], xdec) sped_up_STG[xdec][1] = np.append(sped_up_STG[xdec][1], 1) # Probability vectors for all states final_probabilities = np.zeros((2**self.N, len(steady_states)), dtype=float) # Boundary conditions: absorbing states have probability 1 of themselves for xdec in steady_states: final_probabilities[xdec, steady_state_dict[xdec]] = 1.0 transient_states = [xdec for xdec in range(2**self.N) if xdec not in steady_state_dict] for it in range(1, max_iterations + 1): max_delta = 0.0 # In-place Gauss–Seidel update: for xdec in transient_states: nxt, pr = sped_up_STG[xdec] old = final_probabilities[xdec].copy() final_probabilities[xdec] = np.dot(pr, final_probabilities[nxt, :]) # weighted average of successor probability vectors # track convergence (infinity norm per row) delta = np.max(np.abs(final_probabilities[xdec] - old)) if delta > max_delta: max_delta = delta if max_delta < tol: basin_sizes = final_probabilities.sum(0)/2**self.N return { "SteadyStates": steady_states, "NumberOfSteadyStates": len(steady_states), "BasinSizes": basin_sizes, "STGAsynchronous": STG, "FinalTransitionProbabilities": final_probabilities, } raise RuntimeError(f"Did not converge in {max_iterations} iterations; last max_delta={max_delta:g}")
[docs] def get_steady_states_asynchronous( self, nsim: int = 500, initial_states: Sequence[int] | None = None, search_depth: int = 50, DEBUG: bool = False, *, rng=None, ) -> dict: """ Approximate steady states of a Boolean network under asynchronous updates. This method performs a Monte Carlo–style exploration of the asynchronous state space by simulating asynchronous updates from a collection of initial states. Each simulation proceeds until a steady state is reached or until a maximum search depth is exceeded. Unlike ``get_steady_states_asynchronous_exact``, this method does *not* exhaustively explore the full state space and does not guarantee that all steady states will be found. It is intended for large networks where exact enumeration is infeasible. Parameters ---------- nsim : int, optional Number of asynchronous simulations to perform (default is 500). initial_states : sequence of int or None, optional Initial states to use for the simulations, given as decimal representations of network states. If None (default), ``nsim`` random initial states are generated. search_depth : int, optional Maximum number of asynchronous update steps per simulation before giving up on convergence (default is 50). DEBUG : bool, optional If True, print detailed debugging information during simulation. rng : optional Random number generator or seed, passed to ``utils._coerce_rng``. Returns ------- dict Dictionary with the following entries: - ``"SteadyStates"`` : list of int Decimal representations of steady states encountered. - ``"NumberOfSteadyStates"`` : int Number of unique steady states found. - ``"BasinSizes"`` : list of int Counts of how many simulations converged to each steady state. - ``"STGAsynchronous"`` : dict Partial cache of asynchronous transitions encountered during simulation. Keys are ``(state, node_index)`` and values are successor states (all in decimal form). - ``"InitialSamplePoints"`` : list of int Decimal initial states used in the simulations (either provided explicitly or generated randomly). Notes ----- - This method detects only *steady states* (fixed points). If the asynchronous dynamics contain limit cycles, simulations may fail to converge within ``search_depth``. - The returned asynchronous transition graph is generally incomplete and should be interpreted as a cache of explored transitions rather than the full STG. """ rng = utils._coerce_rng(rng) sampled_states: list[int] = [] STG_asynchronous: dict[tuple[int, int], int] = {} steady_states: list[int] = [] basin_sizes: list[int] = [] steady_state_dict: dict[int, int] = {} for iteration in range(nsim): # Initialize state if initial_states is None: x = rng.integers(2, size=self.N) xdec = utils.bin2dec(x) sampled_states.append(xdec) else: xdec = initial_states[iteration] x = utils.dec2bin(xdec, self.N) if DEBUG: print(iteration, -1, -1, False, xdec, x) for step in range(search_depth): FOUND_NEW_STATE = False # Check if state is already known to be steady if xdec in steady_state_dict: basin_sizes[steady_state_dict[xdec]] += 1 break update_order = rng.permutation(self.N) for i in map(int, update_order): try: fxdec = STG_asynchronous[(xdec, i)] except KeyError: fx_i = self.update_single_node(i, x[self.I[i]]) if fx_i > x[i]: fxdec = xdec + 2 ** (self.N - 1 - i) x[i] = 1 FOUND_NEW_STATE = True elif fx_i < x[i]: fxdec = xdec - 2 ** (self.N - 1 - i) x[i] = 0 FOUND_NEW_STATE = True else: fxdec = xdec STG_asynchronous[(xdec, i)] = fxdec if fxdec != xdec: xdec = fxdec FOUND_NEW_STATE = True break if DEBUG: print(iteration, step, i, FOUND_NEW_STATE, xdec, x) if not FOUND_NEW_STATE: # New steady state found if xdec in steady_state_dict: basin_sizes[steady_state_dict[xdec]] += 1 else: steady_state_dict[xdec] = len(steady_states) steady_states.append(xdec) basin_sizes.append(1) break if DEBUG: print() if sum(basin_sizes) < nsim: print( f"Warning: only {sum(basin_sizes)} of the {nsim} simulations " "reached a steady state. Consider increasing search_depth. " "The network may also contain asynchronous limit cycles." ) return { "SteadyStates": steady_states, "NumberOfSteadyStates": len(steady_states), "BasinSizes": basin_sizes, "STGAsynchronous": STG_asynchronous, "InitialSamplePoints": ( initial_states if initial_states is not None else sampled_states ), }
[docs] def get_steady_states_asynchronous_given_one_initial_condition( self, initial_condition: int | Sequence[int] = 0, nsim: int = 500, stochastic_weights: Sequence[float] | None = None, search_depth: int = 50, DEBUG: bool = False, *, rng=None, ) -> dict: """ Approximate steady states reachable from a single initial condition under asynchronous updates. This method performs multiple asynchronous simulations starting from the same initial condition. In each simulation, nodes are updated one at a time according to either a uniform random order or node-specific stochastic update propensities. The simulation proceeds until a steady state is reached or a maximum number of update steps is exceeded. The method is sampling-based and does *not* guarantee that all reachable steady states are found. It is intended for exploratory analysis and for networks where exhaustive asynchronous analysis is infeasible. Parameters ---------- initial_condition : int or sequence of int, optional Initial network state. If an integer is provided, it is interpreted as the decimal encoding of a Boolean state. If a sequence is provided, it must be a binary vector of length ``N``. Default is 0. nsim : int, optional Number of asynchronous simulation runs (default is 500). stochastic_weights : sequence of float or None, optional Relative update propensities for each node. If provided, must have length ``N`` and be strictly positive. The weights are normalized internally. If None (default), nodes are updated uniformly at random. search_depth : int, optional Maximum number of asynchronous update steps per simulation. DEBUG : bool, optional If True, print detailed debugging information during simulation. rng : optional Random number generator or seed, passed to ``utils._coerce_rng``. Returns ------- dict Dictionary with the following entries: - ``"SteadyStates"`` : list of int Decimal representations of steady states reached. - ``"NumberOfSteadyStates"`` : int Number of unique steady states found. - ``"BasinSizes"`` : list of int Number of simulations converging to each steady state. - ``"TransientTimes"`` : list of list of int For each steady state, a list of transient lengths (number of update steps before convergence). - ``"STGAsynchronous"`` : dict Partial cache of asynchronous transitions encountered during simulation. Keys are ``(state, node_index)`` and values are successor states (all in decimal form). - ``"UpdateQueues"`` : list of list of int For each simulation, the sequence of visited states (in decimal form). Notes ----- - Only steady states (fixed points) are detected. If the asynchronous dynamics contain limit cycles, simulations may fail to converge within ``search_depth``. - The returned asynchronous transition graph is incomplete and represents only transitions encountered during sampling. """ rng = utils._coerce_rng(rng) # --- Initialize initial condition --- if isinstance(initial_condition, int): x0 = utils.dec2bin(initial_condition, self.N) x0dec = initial_condition else: x0 = np.asarray(initial_condition, dtype=int) if x0.shape[0] != self.N: raise ValueError( f"Initial condition must have length {self.N}, got {x0.shape[0]}." ) x0dec = utils.bin2dec(x0) # --- Handle stochastic weights --- if stochastic_weights is not None: stochastic_weights = np.asarray(stochastic_weights, dtype=float) if stochastic_weights.shape[0] != self.N: raise ValueError("stochastic_weights must have length N.") if np.any(stochastic_weights <= 0): raise ValueError("stochastic_weights must be strictly positive.") stochastic_weights = stochastic_weights / stochastic_weights.sum() # --- Bookkeeping --- STG_async: dict[tuple[int, int], int] = {} steady_states: list[int] = [] basin_sizes: list[int] = [] transient_times: list[list[int]] = [] steady_state_dict: dict[int, int] = {} queues: list[list[int]] = [] # --- Simulations --- for iteration in range(nsim): x = x0.copy() xdec = x0dec queue = [xdec] for step in range(search_depth): FOUND_NEW_STATE = False # If already known steady state, stop if xdec in steady_state_dict: idx = steady_state_dict[xdec] basin_sizes[idx] += 1 transient_times[idx].append(step) queues.append(queue) break # Choose update order if stochastic_weights is None: update_order = rng.permutation(self.N) else: update_order = rng.choice( self.N, size=self.N, replace=False, p=stochastic_weights ) for i in map(int, update_order): try: fxdec = STG_async[(xdec, i)] except KeyError: fx_i = self.update_single_node(i, x[self.I[i]]) if fx_i > x[i]: fxdec = xdec + 2 ** (self.N - 1 - i) x[i] = 1 elif fx_i < x[i]: fxdec = xdec - 2 ** (self.N - 1 - i) x[i] = 0 else: fxdec = xdec STG_async[(xdec, i)] = fxdec if fxdec != xdec: xdec = fxdec queue.append(xdec) FOUND_NEW_STATE = True break if DEBUG: print(iteration, step, i, FOUND_NEW_STATE, xdec, x) if not FOUND_NEW_STATE: # New steady state reached if xdec in steady_state_dict: idx = steady_state_dict[xdec] basin_sizes[idx] += 1 transient_times[idx].append(step) else: steady_state_dict[xdec] = len(steady_states) steady_states.append(xdec) basin_sizes.append(1) transient_times.append([step]) queues.append(queue) break if DEBUG: print() if sum(basin_sizes) < nsim: print( f"Warning: only {sum(basin_sizes)} of the {nsim} simulations " "reached a steady state. Consider increasing search_depth. " "The network may contain asynchronous limit cycles." ) return { "SteadyStates": steady_states, "NumberOfSteadyStates": len(steady_states), "BasinSizes": basin_sizes, "TransientTimes": transient_times, "STGAsynchronous": STG_async, "UpdateQueues": queues, }
[docs] def get_attractors_synchronous( self, nsim: int = 500, initial_sample_points: Sequence[int | Sequence[int]] | None = None, n_steps_timeout: int = 1000, INITIAL_SAMPLE_POINTS_AS_BINARY_VECTORS: bool = False, USE_NUMBA: bool = True, *, rng=None, ) -> dict: """ Approximate synchronous attractors of a Boolean network via sampling. This method estimates the synchronous attractors (fixed points and cycles) of a Boolean network by simulating synchronous updates from a collection of initial states. For each simulation, the network is updated until an attractor is reached or a maximum number of update steps is exceeded. The method is sampling-based and does *not* guarantee that all attractors are found. Basin sizes are lower-bound estimates based on the sampled initial conditions. If Numba is available and ``USE_NUMBA=True``, synchronous updates are accelerated using a compiled kernel. Parameters ---------- nsim : int, optional Number of initial conditions to simulate (default is 500). Ignored if ``initial_sample_points`` is provided. initial_sample_points : sequence of int or sequence of sequence of int, optional Initial states to use. If provided, its length determines the number of simulations. Interpretation depends on ``INITIAL_SAMPLE_POINTS_AS_BINARY_VECTORS``. n_steps_timeout : int, optional Maximum number of synchronous update steps per simulation before declaring a timeout (default is 1000). INITIAL_SAMPLE_POINTS_AS_BINARY_VECTORS : bool, optional If True, ``initial_sample_points`` are interpreted as binary vectors; otherwise they are interpreted as decimal-encoded states. USE_NUMBA : bool, optional If True (default) and Numba is available, use a Numba-accelerated synchronous update kernel. rng : optional Random number generator or seed, passed to ``utils._coerce_rng``. Returns ------- dict Dictionary with the following entries: - ``"Attractors"`` : list of list of int Attractors found, each represented as a list of decimal states (cycles are given in cyclic order). - ``"NumberOfAttractors"`` : int Number of distinct attractors found. - ``"BasinSizes"`` : list of int Number of sampled initial conditions converging to each attractor. - ``"AttractorDict"`` : dict Mapping from visited states (decimal) to attractor index. - ``"InitialSamplePoints"`` : list of int Decimal initial states used for sampling. - ``"STG"`` : dict Sampled synchronous state transition graph (state → successor state). - ``"NumberOfTimeouts"`` : int Number of simulations that did not converge within ``n_steps_timeout``. Notes ----- - This method is intended for networks with long transient dynamics, where exhaustive synchronous analysis is infeasible. - Basin sizes are *sampling-based estimates* and should not be interpreted as exact proportions of the state space. """ rng = utils._coerce_rng(rng) # --- Bookkeeping --- dictF: dict[int, int] = {} # memorized synchronous transitions attractors: list[list[int]] = [] # attractor cycles basin_sizes: list[int] = [] # basin counts attr_dict: dict[int, int] = {} # state -> attractor index STG: dict[int, int] = {} # sampled synchronous STG n_timeout = 0 sampled_points: list[int] = [] INITIAL_SAMPLE_POINTS_EMPTY = initial_sample_points is None if not INITIAL_SAMPLE_POINTS_EMPTY: nsim = len(initial_sample_points) # --- Decide update backend --- use_numba = __LOADED_NUMBA__ and USE_NUMBA if use_numba: F_array_list = List([np.array(bf.f, dtype=np.uint8) for bf in self.F]) I_array_list = List([np.array(regs, dtype=np.int64) for regs in self.I]) # --- Main simulation loop --- for sim_idx in range(nsim): # Initialize state if INITIAL_SAMPLE_POINTS_EMPTY: x = rng.integers(2, size=self.N, dtype=np.uint8) xdec = utils.bin2dec(x) sampled_points.append(xdec) else: if INITIAL_SAMPLE_POINTS_AS_BINARY_VECTORS: x = np.asarray(initial_sample_points[sim_idx], dtype=np.uint8) if x.shape[0] != self.N: raise ValueError( f"Initial state must have length {self.N}, got {x.shape[0]}." ) xdec = utils.bin2dec(x) else: xdec = int(initial_sample_points[sim_idx]) x = np.array(utils.dec2bin(xdec, self.N), dtype=np.uint8) visited = {xdec: 0} trajectory = [xdec] count = 0 # --- Iterate until attractor or timeout --- while count < n_steps_timeout: if xdec in dictF: fxdec = dictF[xdec] else: if use_numba: fx = _update_network_synchronously_numba( x, F_array_list, I_array_list ) else: fx = self._update_network_synchronously_unchecked(x) fxdec = utils.bin2dec(fx) dictF[xdec] = fxdec x = fx # record sampled STG edge (first visit only) if count == 0: STG[xdec] = fxdec # already assigned to known attractor if fxdec in attr_dict: idx_attr = attr_dict[fxdec] basin_sizes[idx_attr] += 1 for s in trajectory: attr_dict[s] = idx_attr break # new attractor detected if fxdec in visited: cycle_start = visited[fxdec] attractor_states = trajectory[cycle_start:] attractors.append(attractor_states) basin_sizes.append(1) idx_attr = len(attractors) - 1 for s in attractor_states: attr_dict[s] = idx_attr break # continue traversal visited[fxdec] = len(trajectory) trajectory.append(fxdec) xdec = fxdec count += 1 if count == n_steps_timeout: n_timeout += 1 break return { "Attractors": attractors, "NumberOfAttractors": len(attractors), "BasinSizes": basin_sizes, "AttractorDict": attr_dict, "InitialSamplePoints": ( sampled_points if INITIAL_SAMPLE_POINTS_EMPTY else list(initial_sample_points) ), "STG": STG, "NumberOfTimeouts": n_timeout, }
[docs] def compute_synchronous_state_transition_graph( self, USE_NUMBA: bool = True, ) -> None: """ Compute the exact synchronous state transition graph (STG). The STG is stored in ``self.STG`` as a one-dimensional NumPy array of length ``2**N``, where ``self.STG[x]`` is the decimal representation of the successor state reached from state ``x`` under synchronous updating. This computation is exact and requires memory proportional to ``2**N``. It is therefore intended for small-to-moderate networks only. Parameters ---------- USE_NUMBA : bool, optional If True (default) and Numba is available, use a compiled kernel to accelerate computation. """ # Optional: avoid recomputation if self.STG is not None: return if __LOADED_NUMBA__ and USE_NUMBA: # Preprocess data into Numba-friendly types F_list = [np.array(bf.f, dtype=np.uint8) for bf in self.F] I_list = [np.array(regs, dtype=np.int64) for regs in self.I] if self.N <= 22: self.STG = _compute_synchronous_stg_numba(F_list, I_list, self.N) else: self.STG = _compute_synchronous_stg_numba_low_memory( F_list, I_list, self.N ) return # -------- Pure NumPy implementation -------- # 1. Enumerate all states (binary) states = utils.get_left_side_of_truth_table(self.N) # 2. Allocate next-state matrix next_states = np.zeros_like(states, dtype=np.uint8) # Binary-to-decimal weights powers_of_two = (1 << np.arange(self.N))[::-1] # 3. Compute next state for each node for j, bf in enumerate(self.F): regulators = self.I[j] if len(regulators) == 0: # Constant node next_states[:, j] = bf.f[0] continue subspace = states[:, regulators] idx = np.dot(subspace, powers_of_two[-len(regulators):]) next_states[:, j] = bf.f[idx] # 4. Convert next states to decimal self.STG = np.dot(next_states, powers_of_two).astype(np.int64)
[docs] def get_attractors_synchronous_exact( self, USE_NUMBA: bool = True, ) -> dict: """ Compute all attractors and their exact basin sizes under synchronous updating. This method computes the exact synchronous state transition graph (STG) and analyzes it as a functional graph on ``2**N`` states. All attractors (cycles), their basin sizes, and the attractor reached from each state are determined exactly. This computation requires memory and time proportional to ``2**N`` and is intended for small-to-moderate networks only. Parameters ---------- USE_NUMBA : bool, optional If True (default) and Numba is available, use a compiled kernel for attractor detection. Returns ------- dict Dictionary with keys: - Attractors : list[list[int]] Each attractor represented as a list of decimal states forming a cycle. - NumberOfAttractors : int Total number of attractors. - BasinSizes : np.ndarray[float] Fraction of all states belonging to each attractor basin. - AttractorID : np.ndarray[int] For each of the ``2**N`` states, the index of the attractor it reaches. - STG : np.ndarray[int] The synchronous state transition graph. """ if self.STG is None: self.compute_synchronous_state_transition_graph(USE_NUMBA=USE_NUMBA) attractors = [] if __LOADED_NUMBA__ and USE_NUMBA: attractor_id, basin_sizes, cycle_rep, cycle_len, n_attr = ( _attractors_functional_graph(self.STG) ) for k in range(int(n_attr)): rep = int(cycle_rep[k]) L = int(cycle_len[k]) cyc = [rep] x = rep for _ in range(L - 1): x = int(self.STG[x]) cyc.append(x) attractors.append(cyc) else: attractor_id = -np.ones(2**self.N, dtype=np.int32) basin_sizes = [] n_attr = 0 for xdec in range(2**self.N): if attractor_id[xdec] != -1: continue cur = xdec queue = [cur] while True: fxdec = int(self.STG[cur]) if attractor_id[fxdec] != -1: idx_attr = attractor_id[fxdec] basin_sizes[idx_attr] += len(queue) for q in queue: attractor_id[q] = idx_attr break if fxdec in queue: idx = queue.index(fxdec) cycle = queue[idx:] attractors.append(cycle) basin_sizes.append(len(queue)) for q in queue: attractor_id[q] = n_attr n_attr += 1 break queue.append(fxdec) cur = fxdec basin_sizes = np.array(basin_sizes, dtype=np.float64) / (2**self.N) return { "Attractors": attractors, "NumberOfAttractors": len(attractors), "BasinSizes": basin_sizes, "AttractorID": attractor_id, "STG": self.STG, }
[docs] def get_transient_lengths_exact( self, USE_NUMBA : bool = True ) -> np.ndarray: """ Compute exact transient length using: - Full STG from get_attractors_synchronous_exact() - Attractors (cycle states) from get_attractors_synchronous_exact() This avoids indegree-pruning because cycle states are given explicitly. """ attractor_info = self.get_attractors_synchronous_exact(USE_NUMBA=USE_NUMBA) stg = self.STG # full mapping: successor(s) attractors = attractor_info["Attractors"] # list of cycles if __LOADED_NUMBA__ and USE_NUMBA: is_attr_mask = np.full(2**self.N, dtype=np.uint8) for i, states in enumerate(attractors): states_arr = np.asarray(states, dtype=np.int64) is_attr_mask[states_arr] = 1 return _transient_lengths_functional_numba( self.STG.astype(np.int64, copy=False), is_attr_mask ) # Normalize STG to an integer array/list succ where succ[u] = v if isinstance(stg, np.ndarray): succ = stg.astype(int, copy=False) n = int(succ.shape[0]) else: succ = list(stg) n = len(succ) # Build reverse adjacency list rev[v] = all u such that u -> v rev = [[] for _ in range(n)] for u in range(n): v = int(succ[u]) if v < 0 or v >= n: raise ValueError(f"Invalid successor: {u} -> {v}") rev[v].append(u) # Initialize distances: all cycle states have transient length 0 dist = np.full(n, -1, dtype=np.int64) bfs = deque() for cycle in attractors: for s in cycle: if dist[s] == -1: dist[s] = 0 bfs.append(s) # Multi-source BFS outward from cycle states while bfs: v = bfs.popleft() for u in rev[v]: if dist[u] == -1: dist[u] = dist[v] + 1 bfs.append(u) # If STG is complete, every state must get a distance if any(d < 0 for d in dist): raise RuntimeError("Some states did not receive a transient length. Is STG complete?") return np.array(dist,dtype=int)
## Robustness measures: synchronous Derrida value, entropy of basin size distribution, coherence, fragility
[docs] def get_attractors_and_robustness_measures_synchronous_exact( self, USE_NUMBA: bool = True, GET_STRATIFIED_COHERENCES : bool = False ) -> dict: """ Compute attractors and exact robustness measures of a synchronously updated Boolean network. This method constructs the exact synchronous state transition graph (STG) on ``2**N`` states and analyzes it as a functional graph. All attractors (cycles), basin sizes, and the attractor reached from each state are determined exactly. Based on this decomposition, exact coherence and fragility measures are computed for the full network, for each basin of attraction, and for each attractor. Optionally, coherence can be stratified by the transient length (distance from the attractor) of each state, allowing robustness to be analyzed as a function of how far states lie from their eventual attractor. This computation requires memory and time proportional to ``2**N`` and is intended for small-to-moderate networks. When Numba is enabled, exact and stratified robustness measures remain feasible up to moderate values of ``N`` (e.g., ``N ≈ 20`` on typical hardware). Parameters ---------- USE_NUMBA : bool, optional If True (default) and Numba is available, compiled kernels are used for robustness and transient-length computations, resulting in substantial speedups. GET_STRATIFIED_COHERENCES : bool, optional If True, coherence is additionally computed as a function of the transient length (distance to the attractor) of each state. When Numba is enabled, this option incurs only modest additional computational cost. Default is False. Returns ------- dict Dictionary with the following keys: - Attractors : list[list[int]] Each attractor represented as a list of decimal states forming a cycle. - NumberOfAttractors : int Total number of attractors. - BasinSizes : np.ndarray of float Fraction of all states belonging to each attractor basin. - AttractorID : np.ndarray of int For each of the ``2**N`` states, the index of the attractor it eventually reaches. - Coherence : float Exact global network coherence. - Fragility : float Exact global network fragility. - BasinCoherence : np.ndarray of float Exact coherence of each basin of attraction. - BasinFragility : np.ndarray of float Exact fragility of each basin of attraction. - AttractorCoherence : np.ndarray of float Exact coherence of each attractor. - AttractorFragility : np.ndarray of float Exact fragility of each attractor. If ``GET_STRATIFIED_COHERENCES`` is True, the dictionary additionally contains: - StratifiedCoherences : np.ndarray of float Coherence values stratified by attractor and transient length. - DistanceFromAttractorCount : np.ndarray of int Number of state–hypercube-edge incidences contributing to each stratified coherence entry. - DistanceFromAttractor : np.ndarray of int Transient length (distance to attractor) for each state. """ # ------------------------------------------------------------------ # 0) Attractors and basins # ------------------------------------------------------------------ result = self.get_attractors_synchronous_exact(USE_NUMBA=USE_NUMBA) attractors = result["Attractors"] n_attractors = int(result["NumberOfAttractors"]) basin_sizes = np.asarray(result["BasinSizes"], dtype=np.float64) attractor_id = np.asarray(result["AttractorID"], dtype=np.int64) n_states = 1 << self.N # ------------------------------------------------------------------ # Single-attractor shortcut # ------------------------------------------------------------------ if n_attractors == 1: return { "Attractors": attractors, "NumberOfAttractors": 1, "BasinSizes": basin_sizes, "AttractorID": attractor_id, "Coherence": 1.0, "Fragility": 0.0, "BasinCoherence": np.ones(1, dtype=np.float64), "BasinFragility": np.zeros(1, dtype=np.float64), "AttractorCoherence": np.ones(1, dtype=np.float64), "AttractorFragility": np.zeros(1, dtype=np.float64), } # ------------------------------------------------------------------ # 1) Attractor membership and lengths # ------------------------------------------------------------------ is_attr_mask = np.zeros(n_states, dtype=np.uint8) len_attractors = np.empty(n_attractors, dtype=np.int64) for i, states in enumerate(attractors): states_arr = np.asarray(states, dtype=np.int64) len_attractors[i] = states_arr.size is_attr_mask[states_arr] = 1 # ------------------------------------------------------------------ # 2) Mean binary vector per attractor # ------------------------------------------------------------------ mean_states_attractors = np.empty((n_attractors, self.N), dtype=np.float64) for i, states in enumerate(attractors): if len(states) == 1: mean_states_attractors[i] = np.asarray( utils.dec2bin(states[0], self.N), dtype=np.float64 ) else: arr = np.asarray( [utils.dec2bin(s, self.N) for s in states], dtype=np.float64 ) mean_states_attractors[i] = arr.mean(axis=0) # ------------------------------------------------------------------ # 3) Distance matrix between attractors # ------------------------------------------------------------------ diff = mean_states_attractors[:, None, :] - mean_states_attractors[None, :, :] distance_between_attractors = np.sum(np.abs(diff), axis=2) distance_between_attractors = np.asarray( distance_between_attractors / float(self.N), dtype=np.float64 ) # ------------------------------------------------------------------ # 4) Hypercube edge traversal # ------------------------------------------------------------------ if __LOADED_NUMBA__ and USE_NUMBA: if GET_STRATIFIED_COHERENCES: distances_from_attractor = _transient_lengths_functional_numba( self.STG.astype(np.int64, copy=False), is_attr_mask ) max_distance_from_attractor = int(distances_from_attractor.max()) ( basin_coherences, basin_fragilities, attractor_coherences, attractor_fragilities, stratified_coherences, n_states_with_specific_distance_from_attractor, ) = _robustness_edge_traversal_numba_stratified( int(self.N), attractor_id, is_attr_mask, distance_between_attractors, distances_from_attractor, max_distance_from_attractor, ) stratified_coherences = np.asarray(stratified_coherences, dtype=np.float64) n_states_with_specific_distance_from_attractor = np.asarray(n_states_with_specific_distance_from_attractor, dtype=int) else: ( basin_coherences, basin_fragilities, attractor_coherences, attractor_fragilities, ) = _robustness_edge_traversal_numba( int(self.N), attractor_id, is_attr_mask, distance_between_attractors, ) basin_coherences = np.asarray(basin_coherences, dtype=np.float64) basin_fragilities = np.asarray(basin_fragilities, dtype=np.float64) attractor_coherences = np.asarray(attractor_coherences, dtype=np.float64) attractor_fragilities = np.asarray(attractor_fragilities, dtype=np.float64) else: basin_coherences = np.zeros(n_attractors, dtype=np.float64) basin_fragilities = np.zeros(n_attractors, dtype=np.float64) attractor_coherences = np.zeros(n_attractors, dtype=np.float64) attractor_fragilities = np.zeros(n_attractors, dtype=np.float64) if GET_STRATIFIED_COHERENCES: distances_from_attractor = self.get_transient_lengths_exact(result) max_distance_from_attractor = max(distances_from_attractor) stratified_coherences = np.zeros((n_attractors,max_distance_from_attractor+1), dtype=np.float64) n_states_with_specific_distance_from_attractor = np.zeros((n_attractors,max_distance_from_attractor+1), dtype=int) for xdec in range(n_states): for bitpos in range(self.N): if (xdec >> bitpos) & 1: continue ydec = xdec | (1 << bitpos) idx_x = attractor_id[xdec] idx_y = attractor_id[ydec] if GET_STRATIFIED_COHERENCES: n_states_with_specific_distance_from_attractor[idx_x,distances_from_attractor[xdec]] += 1 n_states_with_specific_distance_from_attractor[idx_y,distances_from_attractor[ydec]] += 1 if idx_x == idx_y: basin_coherences[idx_x] += 2.0 if is_attr_mask[xdec]: attractor_coherences[idx_x] += 1.0 if is_attr_mask[ydec]: attractor_coherences[idx_y] += 1.0 if GET_STRATIFIED_COHERENCES: stratified_coherences[idx_x,distances_from_attractor[xdec]] += 1.0 stratified_coherences[idx_y,distances_from_attractor[ydec]] += 1.0 else: dxy = float(distance_between_attractors[idx_x, idx_y]) basin_fragilities[idx_x] += dxy basin_fragilities[idx_y] += dxy if is_attr_mask[xdec]: attractor_fragilities[idx_x] += dxy if is_attr_mask[ydec]: attractor_fragilities[idx_y] += dxy # ------------------------------------------------------------------ # 5) Normalization # ------------------------------------------------------------------ basin_counts = basin_sizes * float(n_states) if GET_STRATIFIED_COHERENCES: n_states_with_specific_distance_from_attractor //= self.N for i in range(n_attractors): if basin_counts[i] > 0.0: basin_coherences[i] /= basin_counts[i] * self.N basin_fragilities[i] /= basin_counts[i] * self.N if len_attractors[i] > 0: attractor_coherences[i] /= len_attractors[i] * self.N attractor_fragilities[i] /= len_attractors[i] * self.N if GET_STRATIFIED_COHERENCES: for d in range(max_distance_from_attractor+1): if n_states_with_specific_distance_from_attractor[i,d] > 0.0: stratified_coherences[i,d] /= n_states_with_specific_distance_from_attractor[i,d] * self.N else: stratified_coherences[i,d] = np.nan coherence = float(np.dot(basin_sizes, basin_coherences)) fragility = float(np.dot(basin_sizes, basin_fragilities)) # ------------------------------------------------------------------ # Final return # ------------------------------------------------------------------ return_dict = { "Attractors": attractors, "NumberOfAttractors": int(n_attractors), "BasinSizes": basin_sizes, "AttractorID": attractor_id, "Coherence": coherence, "Fragility": fragility, "BasinCoherence": basin_coherences, "BasinFragility": basin_fragilities, "AttractorCoherence": attractor_coherences, "AttractorFragility": attractor_fragilities, } if GET_STRATIFIED_COHERENCES: return_dict['StratifiedCoherences'] = stratified_coherences return_dict['DistanceFromAttractorCount'] = n_states_with_specific_distance_from_attractor return_dict['DistanceFromAttractor'] = distances_from_attractor return return_dict
[docs] def get_attractors_and_robustness_measures_synchronous( self, number_different_IC: int = 500, RETURN_ATTRACTOR_COHERENCE: bool = True, *, rng=None, ) -> dict: """ Approximate attractors and robustness measures under synchronous updating. This method samples the attractor landscape by simulating the network from multiple random initial conditions (ICs) and their single-bit perturbations. It returns Monte-Carlo approximations of global coherence, fragility, and a final Hamming-distance-based measure, along with per-basin approximations. Optionally, it additionally estimates attractor-level coherence and fragility by perturbing attractor states found during sampling. Notes ----- - The attractor set returned is a *lower bound* on the true number of attractors, because only the sampled portion of state space is explored. - For ``N >= 64``, decimal encoding of states may exceed ``np.int64`` and this method uses bitstrings (type ``str``) as state identifiers. Parameters ---------- number_different_IC : int, optional Number of random initial conditions to sample (default is 500). For each IC, the method also simulates one randomly chosen single-bit perturbation. RETURN_ATTRACTOR_COHERENCE : bool, optional If True (default), also compute attractor-level coherence and fragility by perturbing attractor states found during sampling. rng : None or numpy.random.Generator, optional Random number generator or seed-like object. Passed to ``utils._coerce_rng``. Returns ------- dict Dictionary with keys: - Attractors : list[list[int]] or list[list[str]] List of discovered attractors, each represented as a list of states forming a cycle. States are decimals (``int``) for ``N < 64`` and bitstrings (``str``) for ``N >= 64``. - LowerBoundOfNumberOfAttractors : int Number of distinct attractors discovered (a lower bound on the true number of attractors). - BasinSizesApproximation : np.ndarray[float] Approximate basin size (fraction of sampled trajectories that end in each attractor). Sums to ~1 over discovered attractors. - CoherenceApproximation : float Approximate global coherence: probability that a random IC and its single-bit perturbation reach the same attractor. - FragilityApproximation : float Approximate global fragility: expected normalized difference between reached attractors when the IC and perturbation reach different attractors. Normalized by ``N``. - FinalHammingDistanceApproximation : float Approximate final Hamming distance between the two periodic trajectories when comparing the IC and its perturbation. This is a *distance* in [0, 1], where 0 means identical and 1 means completely different. - BasinCoherenceApproximation : np.ndarray[float] Approximate coherence per basin (same definition as coherence but conditioned on having reached that basin). - BasinFragilityApproximation : np.ndarray[float] Approximate fragility per basin (same definition as fragility but conditioned on having reached that basin). - AttractorCoherence : np.ndarray[float], optional If ``RETURN_ATTRACTOR_COHERENCE`` is True: estimated attractor-level coherence (probability that a single-bit perturbation of an attractor state returns to the same attractor). - AttractorFragility : np.ndarray[float], optional If ``RETURN_ATTRACTOR_COHERENCE`` is True: estimated attractor-level fragility based on differences between the original attractor and the attractor reached after perturbation. References ---------- Park, K. H., Costa, F. X., Rocha, L. M., Albert, R., & Rozum, J. C. (2023). Models of cell processes are far from the edge of chaos. PRX Life, 1(2), 023009. Bavisetty, V. S. N., Wheeler, M., & Kadelka, C. (2025). Attractors are less stable than their basins: Canalization creates a coherence gap in gene regulatory networks. bioRxiv 2025-11. """ rng = utils._coerce_rng(rng) def lcm(a: int, b: int) -> int: return abs(a * b) // math.gcd(a, b) # ------------------------------------------------------------------ # Initialization # ------------------------------------------------------------------ dictF = {} attractors = [] ICs_per_attractor_state = [] basin_sizes = [] attractor_dict = {} attractor_state_dict = [] distance_from_attractor_state_dict = [] counter_phase_shifts = [] powers_of_2s = [ np.asarray([2**i for i in range(NN)][::-1], dtype=np.int64) for NN in range(max(self.indegrees) + 1) ] if self.N < 64: powers_of_2 = np.asarray([2**i for i in range(self.N)][::-1], dtype=np.int64) robustness_approximation = 0 fragility_sum = 0.0 basin_robustness = defaultdict(float) basin_fragility = defaultdict(float) final_hamming_distance_approximation = 0.0 mean_states_attractors = [] states_attractors = [] # ------------------------------------------------------------------ # Sampling phase # ------------------------------------------------------------------ for _ in range(number_different_IC): index_attractors = [] index_within_attr = [] dist_from_attr = [] for j in range(2): if j == 0: x = rng.integers(2, size=self.N, dtype=np.uint8) if self.N < 64: xdec = int(np.dot(x, powers_of_2)) else: xdec = "".join(str(int(b)) for b in x) x_old = x.copy() else: x = x_old.copy() bit = int(rng.integers(self.N)) x[bit] ^= 1 if self.N < 64: xdec = int(np.dot(x, powers_of_2)) else: xdec = "".join(str(int(b)) for b in x) queue = [xdec] try: idx_attr = attractor_dict[xdec] except KeyError: while True: try: fxdec = dictF[xdec] except KeyError: fx = np.empty(self.N, dtype=np.uint8) for jj in range(self.N): if self.indegrees[jj] > 0: fx[jj] = self.F[jj].f[ int( np.dot( x[self.I[jj]], powers_of_2s[self.indegrees[jj]], ) ) ] else: fx[jj] = self.F[jj].f[0] if self.N < 64: fxdec = int(np.dot(fx, powers_of_2)) else: fxdec = "".join(str(int(b)) for b in fx) dictF[xdec] = fxdec try: idx_attr = attractor_dict[fxdec] idx_state = attractor_state_dict[idx_attr][fxdec] dist_state = distance_from_attractor_state_dict[idx_attr][fxdec] attractor_dict.update({q: idx_attr for q in queue}) attractor_state_dict[idx_attr].update( {q: idx_state for q in queue} ) distance_from_attractor_state_dict[idx_attr].update( { q: d for q, d in zip( queue, range(len(queue) + dist_state, dist_state, -1), ) } ) break except KeyError: if fxdec in queue: idx = queue.index(fxdec) idx_attr = len(attractors) attractors.append(queue[idx:]) basin_sizes.append(1) ICs_per_attractor_state.append( [0] * len(attractors[-1]) ) counter_phase_shifts.append( [0] * len(attractors[-1]) ) attractor_dict.update({q: idx_attr for q in queue}) attractor_state_dict.append( { q: (0 if q in queue[:idx] else queue[idx:].index(q)) for q in queue } ) distance_from_attractor_state_dict.append( { q: (idx - queue.index(q)) if q in queue[:idx] else 0 for q in queue } ) if len(attractors[-1]) == 1: fp = ( np.asarray( utils.dec2bin(queue[idx], self.N), dtype=np.float64, ) if self.N < 64 else np.asarray(list(queue[idx]), dtype=np.float64) ) states_attractors.append(fp.reshape(1, self.N)) mean_states_attractors.append(fp) else: lc = ( np.asarray( [ utils.dec2bin(s, self.N) for s in queue[idx:] ], dtype=np.float64, ) if self.N < 64 else np.asarray( [list(s) for s in queue[idx:]], dtype=np.float64, ) ) states_attractors.append(lc) mean_states_attractors.append(lc.mean(axis=0)) break else: x = fx.copy() queue.append(fxdec) xdec = fxdec index_attractors.append(idx_attr) index_within_attr.append(attractor_state_dict[idx_attr][xdec]) dist_from_attr.append( distance_from_attractor_state_dict[idx_attr][xdec] ) basin_sizes[idx_attr] += 1 ICs_per_attractor_state[idx_attr][ attractor_state_dict[idx_attr][xdec] ] += 1 if index_attractors[0] == index_attractors[1]: robustness_approximation += 1 basin_robustness[index_attractors[0]] += 1 ps = max(index_within_attr) - min(index_within_attr) counter_phase_shifts[index_attractors[0]][ps] += 1 else: d = np.sum( np.abs( mean_states_attractors[index_attractors[0]] - mean_states_attractors[index_attractors[1]] ) ) fragility_sum += d basin_fragility[index_attractors[0]] += d L = lcm( len(attractors[index_attractors[0]]), len(attractors[index_attractors[1]]), ) s0 = states_attractors[index_attractors[0]] s1 = states_attractors[index_attractors[1]] p0 = np.tile(s0, (L // len(s0) + 1, 1))[ index_within_attr[0] : index_within_attr[0] + L ] p1 = np.tile(s1, (L // len(s1) + 1, 1))[ index_within_attr[1] : index_within_attr[1] + L ] final_hamming_distance_approximation += np.mean(p0 == p1) # ------------------------------------------------------------------ # Aggregation # ------------------------------------------------------------------ lower_bound_number_of_attractors = len(attractors) approximate_basin_sizes = ( np.asarray(basin_sizes, dtype=np.float64) / (2.0 * float(number_different_IC)) ) approximate_coherence = robustness_approximation / float(number_different_IC) approximate_fragility = fragility_sum / float(number_different_IC) / float(self.N) approximate_basin_coherence = np.asarray( [ 2.0 * basin_robustness[i] / basin_sizes[i] for i in range(lower_bound_number_of_attractors) ], dtype=np.float64, ) approximate_basin_fragility = np.asarray( [ 2.0 * basin_fragility[i] / basin_sizes[i] / float(self.N) for i in range(lower_bound_number_of_attractors) ], dtype=np.float64, ) final_hamming_distance_approximation /= float(number_different_IC) results = [ attractors, lower_bound_number_of_attractors, approximate_basin_sizes, approximate_coherence, approximate_fragility, final_hamming_distance_approximation, approximate_basin_coherence, approximate_basin_fragility, ] if not RETURN_ATTRACTOR_COHERENCE: return dict( zip( [ "Attractors", "LowerBoundOfNumberOfAttractors", "BasinSizesApproximation", "CoherenceApproximation", "FragilityApproximation", "FinalHammingDistanceApproximation", "BasinCoherenceApproximation", "BasinFragilityApproximation", ], results, ) ) # ------------------------------------------------------------------ # Attractor-level coherence / fragility (FIXED) # ------------------------------------------------------------------ attractor_coherence = np.zeros(lower_bound_number_of_attractors, dtype=np.float64) attractor_fragility = np.zeros(lower_bound_number_of_attractors, dtype=np.float64) attractors_original = attractors[:] for idx0, attractor in enumerate(attractors_original): for state in attractor: for i in range(self.N): x = ( np.asarray(utils.dec2bin(state, self.N), dtype=np.uint8) if self.N < 64 else np.asarray(list(state), dtype=np.uint8) ) x[i] ^= 1 if self.N < 64: xdec = int(np.dot(x, powers_of_2)) else: xdec = "".join(str(int(b)) for b in x) try: idx1 = attractor_dict[xdec] except KeyError: # --- safe forward-walk without touching basin counts queue = [xdec] x_local = x.copy() while True: try: fxdec = dictF[xdec] except KeyError: fx = np.empty(self.N, dtype=np.uint8) for jj in range(self.N): if self.indegrees[jj] > 0: fx[jj] = self.F[jj].f[ int( np.dot( x_local[self.I[jj]], powers_of_2s[self.indegrees[jj]], ) ) ] else: fx[jj] = self.F[jj].f[0] if self.N < 64: fxdec = int(np.dot(fx, powers_of_2)) else: fxdec = "".join(str(int(b)) for b in fx) dictF[xdec] = fxdec if fxdec in attractor_dict: idx1 = attractor_dict[fxdec] break if fxdec in queue: idx1 = len(attractors) attractors.append(queue[queue.index(fxdec):]) attractor_dict.update( {q: idx1 for q in queue} ) break queue.append(fxdec) xdec = fxdec x_local = fx.copy() if idx0 == idx1: attractor_coherence[idx0] += 1.0 else: attractor_fragility[idx0] += np.sum( np.abs( mean_states_attractors[idx0] - mean_states_attractors[idx1] ) ) attractor_coherence /= ( float(self.N) * np.asarray(list(map(len, attractors_original)), dtype=np.float64) ) attractor_fragility /= ( float(self.N) ** 2 * np.asarray(list(map(len, attractors_original)), dtype=np.float64) ) results[0] = attractors_original return dict( zip( [ "Attractors", "LowerBoundOfNumberOfAttractors", "BasinSizesApproximation", "CoherenceApproximation", "FragilityApproximation", "FinalHammingDistanceApproximation", "BasinCoherenceApproximation", "BasinFragilityApproximation", "AttractorCoherence", "AttractorFragility", ], results + [attractor_coherence, attractor_fragility], ) )
[docs] def get_derrida_value( self, nsim: int = 1000, EXACT: bool = False, USE_NUMBA: bool = True, *, rng=None, ) -> float: """ Compute the Derrida value of a Boolean network. The Derrida value measures the average Hamming distance between the one-step synchronous updates of two states that differ by a single-bit perturbation. It quantifies the short-term sensitivity of the network dynamics to small perturbations. If ``EXACT`` is True, the Derrida value is computed exactly as the mean (unnormalized) average sensitivity of the Boolean update functions. Otherwise, it is approximated via Monte Carlo simulation. Parameters ---------- nsim : int, optional Number of Monte Carlo simulations to perform (default is 1000). Ignored if ``EXACT`` is True. EXACT : bool, optional If True, compute the exact Derrida value. If False (default), approximate the Derrida value using Monte Carlo simulation. USE_NUMBA : bool, optional If True (default) and Numba is available, use a compiled kernel for Monte Carlo simulation. rng : None or np.random.Generator, optional Random number generator, passed through ``utils._coerce_rng``. Returns ------- float The Derrida value, defined as the average Hamming distance after one synchronous update following a single-bit perturbation. References ---------- Derrida, B., & Pomeau, Y. (1986). Random networks of automata: a simple annealed approximation. *Europhysics Letters*, 1(2), 45. """ # ------------------------------------------------------------------ # Exact computation # ------------------------------------------------------------------ if EXACT: return float( np.mean( [ bf.get_average_sensitivity( EXACT=True, NORMALIZED=False ) for bf in self.F ] ) ) # ------------------------------------------------------------------ # Monte Carlo approximation # ------------------------------------------------------------------ rng = utils._coerce_rng(rng) if __LOADED_NUMBA__ and USE_NUMBA: # Prepare Numba-friendly inputs F_array_list = List( [np.asarray(bf.f, dtype=np.uint8) for bf in self.F] ) I_array_list = List( [np.asarray(regs, dtype=np.int64) for regs in self.I] ) seed = int(rng.integers(0, 2**31 - 1)) return float( _derrida_simulation( F_array_list, I_array_list, int(self.N), int(nsim), seed, ) ) # ------------------------------------------------------------------ # Pure Python fallback # ------------------------------------------------------------------ total_dist: float = 0.0 for _ in range(int(nsim)): x = rng.integers(0, 2, size=self.N, dtype=np.uint8) y = x.copy() idx = int(rng.integers(0, self.N)) y[idx] ^= np.uint8(1) fx = np.asarray( self._update_network_synchronously_unchecked(x), dtype=np.uint8, ) fy = np.asarray( self._update_network_synchronously_unchecked(y), dtype=np.uint8, ) total_dist += float(np.sum(fx != fy)) return float(total_dist / float(nsim))