#!/usr/bin/env python3
# -*- coding: utf-8 -*-
r"""
This module defines the :class:`~boolforge.BooleanNetwork` class, which provides
a high-level framework for modeling, simulating, and analyzing Boolean networks.
A :class:`BooleanNetwork` represents a discrete dynamical system
:math:`F = (f_1, \ldots, f_N)` composed of multiple
:class:`~boolforge.BooleanFunction` objects as update rules. The class includes
methods for constructing state transition graphs, identifying attractors,
computing robustness and sensitivity measures, and exporting truth tables.
Several computational routines—particularly those involving state space
exploration, attractor detection, and robustness estimation—offer optional
Numba-based just-in-time (JIT) acceleration. Installing Numba is recommended
for optimal performance but not required; all features remain functional
without it.
This module serves as the central interface for dynamic Boolean network
analysis within the BoolForge package.
Example
-------
>>> from boolforge import BooleanNetwork
>>> bn = BooleanNetwork(F=[[0, 1], [0, 0, 0, 1], [0, 1]], I=[[1], [0, 2], [1]])
>>> bn.get_attractors_synchronous_exact()
"""
import math
import warnings
from collections import defaultdict
from collections.abc import Sequence
from collections import deque
from copy import deepcopy
import numpy as np
import networkx as nx
import pandas as pd
try:
import boolforge.utils as utils
from boolforge.boolean_function import BooleanFunction
from boolforge.wiring_diagram import WiringDiagram
except ModuleNotFoundError:
import utils as utils
from boolean_function import BooleanFunction
from wiring_diagram import WiringDiagram
try:
import cana.boolean_network
__LOADED_CANA__ = True
except ModuleNotFoundError:
__LOADED_CANA__ = False
try:
from numba import njit, int64
from numba.typed import List
__LOADED_NUMBA__ = True
except ModuleNotFoundError:
__LOADED_NUMBA__ = False
__all__ = [
"dict_weights",
"get_entropy_of_basin_size_distribution",
"BooleanNetwork"
]
dict_weights = {'non-essential' : np.nan, 'conditional' : 0, 'positive' : 1, 'negative' : -1}
[docs]
def get_entropy_of_basin_size_distribution(
basin_sizes: Sequence[int] | np.ndarray
) -> float:
"""
Compute the Shannon entropy of a basin size distribution.
The basin sizes are first normalized to form a probability distribution.
The Shannon entropy is then computed as
``H = -sum(p_i * log(p_i))``,
where ``p_i`` is the proportion of states in basin ``i``.
Parameters
----------
basin_sizes : Sequence[int] or np.ndarray
Sizes of the basins of attraction, where each entry corresponds to
the number of initial conditions that converge to a given attractor.
Returns
-------
float
Shannon entropy of the basin size distribution.
"""
total = sum(basin_sizes)
probabilities = [size * 1.0 / total for size in basin_sizes]
return sum([-np.log(p) * p for p in probabilities])
if __LOADED_NUMBA__:
@njit(fastmath=True) # safe: operations are integer-only
def _update_network_synchronously_numba(
x,
F_array_list,
I_array_list,
):
"""
Perform one synchronous update of a Boolean network.
Given a binary state vector ``x``, this function computes the next network
state under synchronous updating by evaluating each node’s Boolean update
function based on its regulators.
Parameters
----------
x : np.ndarray
Binary state vector of shape ``(N,)`` with dtype ``uint8``.
F_array_list : list[np.ndarray]
List of truth tables for each node, where the ``j``-th entry is an
array of length ``2**k_j`` giving the update rule for node ``j`` with
``k_j`` regulators.
I_array_list : list[np.ndarray]
List of regulator index arrays, where the ``j``-th entry contains the
indices of the regulators of node ``j``.
Returns
-------
np.ndarray
Updated binary state vector of shape ``(N,)`` with dtype ``uint8``.
"""
N = x.shape[0]
fx = np.empty(N, dtype=np.uint8)
for j in range(N):
regulators = I_array_list[j]
if regulators.shape[0] == 0:
fx[j] = F_array_list[j][0]
else:
idx = 0
for k in range(regulators.shape[0]):
idx = (idx << 1) | x[regulators[k]]
fx[j] = F_array_list[j][idx]
return fx
@njit
def _compute_synchronous_stg_numba(
F_array_list,
I_array_list,
N_variables
):
"""
Compute the synchronous state transition graph (STG).
This Numba-compiled function computes, for every possible binary state
of a Boolean network, the index of its successor state under synchronous
updating.
Parameters
----------
F_array_list : list[np.ndarray]
List of Boolean update tables. The ``j``-th entry is a NumPy array of
length ``2**k_j`` representing the update rule for node ``j`` with
``k_j`` regulators.
I_array_list : list[np.ndarray]
List of regulator index arrays. The ``j``-th entry contains the indices
of the regulators of node ``j``.
N_variables : int
Number of variables (nodes) in the network.
Returns
-------
np.ndarray
One-dimensional array of length ``2**N_variables`` containing, for
each state index, the index of the successor state under synchronous
updating.
"""
nstates = 2 ** N_variables
states = np.zeros((nstates, N_variables), dtype=np.uint8)
for i in range(nstates):
# binary representation of i
for j in range(N_variables):
states[i, N_variables - 1 - j] = (i >> j) & 1
next_states = np.zeros_like(states)
powers_of_two = 2 ** np.arange(N_variables - 1, -1, -1)
# Compute next state for each node
for j in range(N_variables):
regulators = I_array_list[j]
if len(regulators) == 0:
# constant node
next_states[:, j] = F_array_list[j][0]
continue
n_reg = len(regulators)
reg_powers = 2 ** np.arange(n_reg - 1, -1, -1)
for s in range(nstates):
idx = 0
for k in range(n_reg):
idx += states[s, regulators[k]] * reg_powers[k]
next_states[s, j] = F_array_list[j][idx]
# Convert each next state to integer index
next_indices = np.zeros(nstates, dtype=np.int64) # NOTE: this cannot be an unsigned int for safe indexing inside Numba kernels.
for s in range(nstates):
val = 0
for j in range(N_variables):
val += next_states[s, j] * powers_of_two[j]
next_indices[s] = val
return next_indices
@njit
def _compute_synchronous_stg_numba_low_memory(
F_array_list,
I_array_list,
N_variables
):
"""
Compute the synchronous state transition graph (STG) using minimal memory.
For each integer state index ``i`` in ``[0, 2**N_variables)``, this function
decodes ``i`` into its binary state vector, computes the synchronous update
of the Boolean network, and encodes the resulting state back into an integer
index.
Parameters
----------
F_array_list : list[np.ndarray]
List of Boolean update tables. The ``j``-th entry is an array of length
``2**k_j`` representing the update rule for node ``j`` with ``k_j``
regulators.
I_array_list : list[np.ndarray]
List of regulator index arrays. The ``j``-th entry contains the indices
of the regulators of node ``j``.
N_variables : int
Number of variables (nodes) in the network.
Returns
-------
np.ndarray
One-dimensional array of length ``2**N_variables`` containing, for each
state index, the index of the successor state under synchronous updating.
Notes
-----
This implementation avoids storing the full state matrix and therefore
reduces memory usage from ``O(N * 2**N)`` to ``O(N + 2**N)``. The time
complexity remains exponential in ``N_variables``.
"""
nstates = 2 ** N_variables
next_indices = np.zeros(nstates, dtype=np.int64) # NOTE: this cannot be an unsigned int for safe indexing inside Numba kernels.
powers_of_two = 2 ** np.arange(N_variables - 1, -1, -1)
state = np.zeros(N_variables, dtype=np.uint8)
next_state = np.zeros(N_variables, dtype=np.uint8)
for i in range(nstates):
# --- Decode i into binary vector (most-significant bit first)
tmp = i
for j in range(N_variables):
state[N_variables - 1 - j] = tmp & 1
tmp >>= 1
# --- Compute next-state values
for j in range(N_variables):
regulators = I_array_list[j]
if regulators.shape[0] == 0:
next_state[j] = F_array_list[j][0]
else:
n_reg = regulators.shape[0]
idx = 0
for k in range(n_reg):
idx = (idx << 1) | state[regulators[k]]
next_state[j] = F_array_list[j][idx]
# --- Encode next_state back to integer
val = 0
for j in range(N_variables):
val += next_state[j] * powers_of_two[j]
next_indices[i] = val
return next_indices
@njit(fastmath=True) # safe: operations are integer-only
def _hamming_distance(a, b):
"""
Compute the Hamming distance between two binary vectors.
Parameters
----------
a : np.ndarray
One-dimensional array of dtype ``uint8``.
b : np.ndarray
One-dimensional array of dtype ``uint8`` with the same shape as ``a``.
Returns
-------
int
Number of positions at which ``a`` and ``b`` differ.
"""
dist = 0
for i in range(a.size):
dist += a[i] != b[i]
return dist
@njit(fastmath=True) # safe: operations are integer-only
def _derrida_simulation(
F_array_list,
I_array_list,
N,
nsim,
seed
):
"""
Perform a Monte Carlo simulation to estimate the Derrida value.
This function estimates the Derrida value by repeatedly sampling a random
initial state, flipping a single randomly chosen bit, synchronously
updating both states, and computing the Hamming distance between the
resulting successor states.
Parameters
----------
F_array_list : list[np.ndarray]
List of Boolean update tables for each node.
I_array_list : list[np.ndarray]
List of regulator index arrays for each node.
N : int
Number of variables (nodes) in the network.
nsim : int
Number of Monte Carlo simulations to perform.
seed : int
Seed for the Numba-compatible random number generator.
Returns
-------
float
Estimated Derrida value, i.e., the expected Hamming distance after one
synchronous update following a random single-bit perturbation.
"""
# Numba RNG: seed once
np.random.seed(seed)
total_dist = 0.0
X = np.empty(N, dtype=np.uint8)
Y = np.empty(N, dtype=np.uint8)
for _ in range(nsim):
# Random initial state
for i in range(N):
X[i] = np.random.randint(0, 2)
Y[:] = X
# Flip one random bit
idx = np.random.randint(0, N)
Y[idx] = 1 - Y[idx]
# Synchronous updates
FX = _update_network_synchronously_numba(X, F_array_list, I_array_list)
FY = _update_network_synchronously_numba(Y, F_array_list, I_array_list)
total_dist += _hamming_distance(FX, FY)
return total_dist / nsim
@njit(cache=True)
def _attractors_functional_graph(next_state):
"""
Identify attractors and basins in a functional graph.
Given a functional graph represented by a successor array, this function
identifies all attractors (cycles), assigns each state to an attractor,
and computes basin sizes and cycle properties.
Parameters
----------
next_state : np.ndarray
One-dimensional integer array of length ``n`` such that
``next_state[x]`` gives the successor of state ``x`` and lies in
``[0, n-1]``.
Returns
-------
attr_id : np.ndarray
Integer array of length ``n`` mapping each state to its attractor
index.
basin_sizes : np.ndarray
Integer array of length ``n_attr`` giving the basin size of each
attractor.
cycle_rep : np.ndarray
Integer array of length ``n_attr`` containing one representative
state from each attractor cycle.
cycle_len : np.ndarray
Integer array of length ``n_attr`` giving the length of each cycle.
n_attr : np.int32
Number of attractors in the functional graph.
"""
n = next_state.shape[0]
attr_id = np.full(n, -1, dtype=np.int32)
# For detecting cycles within the current walk:
# seen[u] == run_id means u was visited in this run
# pos[u] = index of u in the current path (when first visited this run)
seen = np.zeros(n, dtype=np.int32)
pos = np.zeros(n, dtype=np.int32)
# Upper bounds: in the worst case every node could be its own 1-cycle
basin_sizes_full = np.zeros(n, dtype=np.int32)
cycle_rep_full = np.empty(n, dtype=np.int64)
cycle_len_full = np.zeros(n, dtype=np.int32)
n_attr = 0
# Numba typed list for the current path
path = List.empty_list(int64)
for start in range(n):
if attr_id[start] != -1:
continue
path.clear()
u = start
run_id = start + 1 # unique per start; safe while n << 2**31 (always true in practice)
# Walk until we hit a known attractor or revisit a node in this run
while attr_id[u] == -1 and seen[u] != run_id:
seen[u] = run_id
pos[u] = len(path)
path.append(u)
u = next_state[u]
if attr_id[u] != -1:
# This path flows into an already-known attractor
aid = attr_id[u]
for i in range(len(path)):
v = path[i]
attr_id[v] = aid
basin_sizes_full[aid] += 1
else:
# We found a cycle within the current run.
# u is the first repeated node; cycle starts at pos[u] in path
cyc_start = pos[u]
aid = n_attr
n_attr += 1
# Representative and length of the cycle
cycle_rep_full[aid] = u
cycle_len_full[aid] = len(path) - cyc_start
# Assign all nodes on the path to this new attractor
for i in range(len(path)):
v = path[i]
attr_id[v] = aid
basin_sizes_full[aid] += 1
return attr_id, basin_sizes_full[:n_attr], cycle_rep_full[:n_attr], cycle_len_full[:n_attr], np.int32(n_attr)
@njit(cache=True)
def _transient_lengths_functional_numba(
succ,
is_attr_mask
):
"""
Compute exact transient length (distance to attractor) for a functional graph.
Parameters
----------
succ : int64 array, shape (n_states,)
succ[x] = successor of state x
is_attr_mask : uint8/bool array, shape (n_states,)
1 if state lies on an attractor cycle, else 0
Returns
-------
dist : int64 array, shape (n_states,)
dist[x] = number of steps from x to its attractor
"""
n = succ.shape[0]
dist = np.full(n, -1, dtype=np.int64)
# Attractor states have distance 0
for i in range(n):
if is_attr_mask[i]:
dist[i] = 0
for i in range(n):
if dist[i] >= 0:
continue
v = i
# Walk forward until we hit a known distance
while dist[v] == -1:
dist[v] = -2 # temporary marker: "in current path"
v = succ[v]
# Now dist[v] is either:
# 0,1,2,... (known)
# or -2 (should not happen if cycles were pre-marked)
d = dist[v]
# Unwind path, assigning distances
v = i
while dist[v] == -2:
d += 1
nxt = succ[v]
dist[v] = d
v = nxt
return dist
@njit(cache=True)
def _robustness_edge_traversal_numba(
N,
attractor_idx,
is_attr_mask,
dist_attr
):
"""
Traverse hypercube edges to compute basin and attractor robustness measures.
This function iterates over all undirected edges of the Boolean hypercube
exactly once and accumulates coherence and fragility contributions for
basins of attraction and for attractor states.
Parameters
----------
N : int
Number of variables (dimension of the Boolean hypercube).
attractor_idx : np.ndarray
Integer array of shape ``(2**N,)`` mapping each state to its attractor
index in ``[0, n_attr - 1]``.
is_attr_mask : np.ndarray
Boolean or uint8 array of shape ``(2**N,)`` indicating whether a state
lies on an attractor.
dist_attr : np.ndarray
Two-dimensional array of shape ``(n_attr, n_attr)`` giving pairwise
distances between attractors.
Returns
-------
basin_coh : np.ndarray
Array of length ``n_attr`` containing basin coherence values.
basin_frag : np.ndarray
Array of length ``n_attr`` containing basin fragility values.
attr_coh : np.ndarray
Array of length ``n_attr`` containing attractor coherence values.
attr_frag : np.ndarray
Array of length ``n_attr`` containing attractor fragility values.
"""
n_states = attractor_idx.shape[0]
n_attr = dist_attr.shape[0]
basin_coh = np.zeros(n_attr, dtype=np.float64)
basin_frag = np.zeros(n_attr, dtype=np.float64)
attr_coh = np.zeros(n_attr, dtype=np.float64)
attr_frag = np.zeros(n_attr, dtype=np.float64)
# Iterate each undirected hypercube edge exactly once:
# For x, flip only bits that are 0 -> y = x | (1<<bit), which guarantees y > x.
for xdec in range(n_states):
idx_x = attractor_idx[xdec]
# (Should never be -1 if attractor_idx is filled for all states)
for bit in range(N):
if (xdec >> bit) & 1:
continue
ydec = xdec | (1 << bit)
idx_y = attractor_idx[ydec]
if idx_x == idx_y:
# same basin: count both directions (like your +2)
basin_coh[idx_x] += 2.0
if is_attr_mask[xdec]:
attr_coh[idx_x] += 1.0
if is_attr_mask[ydec]:
attr_coh[idx_y] += 1.0
else:
dxy = dist_attr[idx_x, idx_y]
basin_frag[idx_x] += dxy
basin_frag[idx_y] += dxy
if is_attr_mask[xdec]:
attr_frag[idx_x] += dxy
if is_attr_mask[ydec]:
attr_frag[idx_y] += dxy
return basin_coh, basin_frag, attr_coh, attr_frag
@njit(cache=True)
def _robustness_edge_traversal_numba_stratified(
N,
attractor_idx,
is_attr_mask,
dist_attr,
dist_state,
max_dist
):
n_states = attractor_idx.shape[0]
n_attr = dist_attr.shape[0]
basin_coh = np.zeros(n_attr, dtype=np.float64)
basin_frag = np.zeros(n_attr, dtype=np.float64)
attr_coh = np.zeros(n_attr, dtype=np.float64)
attr_frag = np.zeros(n_attr, dtype=np.float64)
strat_coh = np.zeros((n_attr, max_dist + 1), dtype=np.float64)
strat_cnt = np.zeros((n_attr, max_dist + 1), dtype=np.int64)
for xdec in range(n_states):
dx = dist_state[xdec]
idx_x = attractor_idx[xdec]
for bit in range(N):
if (xdec >> bit) & 1:
continue
ydec = xdec | (1 << bit)
dy = dist_state[ydec]
idx_y = attractor_idx[ydec]
strat_cnt[idx_x, dx] += 1
strat_cnt[idx_y, dy] += 1
if idx_x == idx_y:
basin_coh[idx_x] += 2.0
if is_attr_mask[xdec]:
attr_coh[idx_x] += 1.0
if is_attr_mask[ydec]:
attr_coh[idx_y] += 1.0
strat_coh[idx_x, dx] += 1.0
strat_coh[idx_y, dy] += 1.0
else:
dxy = dist_attr[idx_x, idx_y]
basin_frag[idx_x] += dxy
basin_frag[idx_y] += dxy
if is_attr_mask[xdec]:
attr_frag[idx_x] += dxy
if is_attr_mask[ydec]:
attr_frag[idx_y] += dxy
return (
basin_coh,
basin_frag,
attr_coh,
attr_frag,
strat_coh,
strat_cnt,
)
[docs]
class BooleanNetwork(WiringDiagram):
"""
Representation of a Boolean network.
A Boolean network consists of a wiring diagram specifying regulatory
interactions between nodes and a collection of Boolean update functions
defining the dynamics at each node.
In a BooleanNetwork, constant nodes are removed during initialization,
so all nodes represent dynamic variables.
Parameters
----------
F : sequence
Sequence of Boolean update functions or truth tables. Each entry may
be a ``BooleanFunction`` instance, a truth table, or a Boolean
expression. The length of ``F`` must match the number of nodes in the
wiring diagram.
I : sequence of sequences of int or WiringDiagram
Wiring diagram specifying the regulators of each node, or an existing
``WiringDiagram`` instance.
variables : sequence of str, optional
Names of the variables corresponding to each node. Ignored if ``I`` is
provided as a ``WiringDiagram``.
SIMPLIFY_FUNCTIONS : bool, optional
If True, simplify Boolean update functions after initialization.
Default is False.
Attributes
----------
F : list[BooleanFunction]
Boolean update functions for each node.
I : list[np.ndarray[int]]
Wiring diagram specifying the regulators of each node.
variables : np.ndarray[str]
Names of the variables corresponding to each node.
N : int
Number of dynamic (non-constant) nodes in the network.
indegrees : np.ndarray[int]
Indegree of each node.
outdegrees : np.ndarray[int]
Outdegree of each node.
constants : dict[str, dict[str, int | list[str]]]
Mapping of node indices to constant values.
weights : list[np.ndarray[float]] or None
Interaction weights associated with the wiring diagram.
STG : dict or None
State transition graph, initialized to None and computed on demand.
"""
def __init__(
self,
F: Sequence[BooleanFunction | list[int] | np.ndarray],
I: Sequence[Sequence[int]] | WiringDiagram,
variables: Sequence[str] | None = None,
SIMPLIFY_FUNCTIONS: bool = False,
):
"""
Initialize a Boolean network.
A Boolean network is defined by a wiring diagram specifying regulatory
interactions between nodes and a collection of Boolean update functions
defining the dynamics at each node. Constant nodes (nodes with no
regulators) are automatically eliminated during initialization and
stored in the ``constants`` attribute.
Parameters
----------
F : sequence of BooleanFunction or array-like of int
Boolean update functions for each node. Each entry must be either a
``BooleanFunction`` instance or a truth table encoding the function
outputs. The length of ``F`` must match the number of nodes in the
wiring diagram, and each function must have an arity consistent with
the indegree of the corresponding node.
I : sequence of sequences of int or WiringDiagram
Wiring diagram specifying the regulators of each node, or an existing
``WiringDiagram`` instance. Regulator indices are assumed to be
zero-based.
variables : sequence of str, optional
Names of the variables corresponding to each node. Ignored if ``I`` is
provided as a ``WiringDiagram``.
SIMPLIFY_FUNCTIONS : bool, optional
If True, Boolean update functions are simplified after initialization.
Default is False.
Raises
------
TypeError
If ``F`` is not a sequence of ``BooleanFunction`` objects or truth
tables, or if ``I`` is not a valid wiring diagram specification.
ValueError
If the length of ``F`` does not match the number of nodes in the wiring
diagram, or if a Boolean function has an arity inconsistent with the
wiring diagram.
Notes
-----
- Constant nodes are removed from the dynamic network during
initialization and recorded in the ``constants`` attribute.
- After initialization, the attribute ``N`` refers to the number of
remaining dynamic nodes.
- The state transition graph (``STG``) is initialized to ``None`` and
computed on demand.
"""
# ---- Validate inputs -------------------------------------------------
if isinstance(F, (str, bytes)) or not isinstance(F, Sequence):
raise TypeError(
"F must be a sequence of BooleanFunction objects or truth tables"
)
if isinstance(I, (str, bytes)) or not isinstance(I, (Sequence, WiringDiagram)):
raise TypeError(
"I must be a sequence of sequences of int or a WiringDiagram instance"
)
# ---- Initialize wiring diagram --------------------------------------
if isinstance(I, WiringDiagram):
if variables is not None:
warnings.warn(
"Provided variables ignored; using variables from WiringDiagram.",
UserWarning,
)
super().__init__(I.I, I.variables)
else:
super().__init__(I, variables)
if len(F) != self.N:
raise ValueError("len(F) must match the number of nodes in the wiring diagram")
# ---- Initialize Boolean functions -----------------------------------
self.F = []
for i, f in enumerate(F):
if isinstance(f, (list, np.ndarray)):
bf = BooleanFunction(f, name=self.variables[i])
elif isinstance(f, BooleanFunction):
bf = f
bf.name = self.variables[i]
else:
raise TypeError(
f"Invalid entry in F at index {i}: expected BooleanFunction, "
f"truth table, got {type(f)}"
)
if bf.n != self.indegrees[i]:
raise ValueError(
f"Index {i}: function has {bf.n} inputs but wiring diagram "
f"has indegree {self.indegrees[i]}"
)
self.F.append(bf)
# ---- Constant bookkeeping -------------------------------------------
# Always initialize (may already exist if called from get_network_with_fixed_source_nodes, etc)
self.constants = {}
# IMPORTANT: remove constants based on topology, not on dict contents
if np.any(self.indegrees == 0):
self.remove_constants()
# ---- State transition graph -----------------------------------------
self.STG = None
# ---- Optional simplification ----------------------------------------
if SIMPLIFY_FUNCTIONS:
self.simplify_functions()
[docs]
def remove_constants(self) -> None:
"""
Remove structurally constant nodes from the Boolean network.
A node is considered constant if it has no regulators (indegree zero).
Such nodes are eliminated from the dynamic network by propagating their
fixed Boolean values to downstream nodes. Eliminated constants and their
effects are recorded in the ``constants`` attribute.
Notes
-----
- The Boolean value of a constant node is taken from its Boolean function.
- After removal, ``self.N`` refers to the number of remaining dynamic nodes.
- Nodes that lose all regulators as a result of constant removal are
assigned a non-essential self-loop to preserve network structure.
"""
# Identify constant nodes from topology
indices_constants = self.get_constants(AS_DICT=False)
if len(indices_constants) == 0:
return
dict_constants = self.get_constants(AS_DICT=True)
values_constants = [int(self.F[c][0]) for c in indices_constants]
# Propagate constant values downstream
for id_constant, value in zip(indices_constants, values_constants):
regulated_nodes = []
for i in range(self.N):
if dict_constants[i]:
continue
try:
index = list(self.I[i]).index(id_constant)
except ValueError:
continue
truth_table = utils.get_left_side_of_truth_table(self.indegrees[i])
indices_to_keep = np.where(truth_table[:, index] == value)[0]
self.F[i].f = self.F[i].f[indices_to_keep]
if self.weights is not None:
self.weights[i] = self.weights[i][self.I[i] != id_constant]
self.I[i] = self.I[i][self.I[i] != id_constant]
self.indegrees[i] -= 1
self.F[i].n -= 1
regulated_nodes.append(str(self.variables[i]))
self.constants[str(self.variables[id_constant])] = {
"value": value,
"regulatedNodes": regulated_nodes,
}
# Ensure no remaining node loses all regulators
for i in range(self.N):
if dict_constants[i]:
continue
if self.indegrees[i] == 0:
self.indegrees[i] = 1
self.F[i].n = 1
self.F[i].f = np.array([self.F[i][0], self.F[i][0]], dtype=int)
self.I[i] = np.array([i], dtype=int)
if self.weights is not None:
self.weights[i] = np.array([np.nan], dtype=float)
# Remove constant nodes structurally (using original mask)
self.F = [self.F[i] for i in range(self.N) if not dict_constants[i]]
adjustment_for_I = np.cumsum([dict_constants[i] for i in range(self.N)])
self.I = [
self.I[i] - adjustment_for_I[self.I[i]]
for i in range(self.N)
if not dict_constants[i]
]
if self.weights is not None:
self.weights = [self.weights[i] for i in range(self.N) if not dict_constants[i]]
self.variables = np.array(
[self.variables[i] for i in range(self.N) if not dict_constants[i]],
dtype=str,
)
self.indegrees = np.array(
[self.indegrees[i] for i in range(self.N) if not dict_constants[i]],
dtype=int,
)
# Update network size and recompute outdegrees
self.N -= len(indices_constants)
self.outdegrees = self.get_outdegrees()
[docs]
@classmethod
def from_cana(
cls,
cana_BooleanNetwork: "cana.boolean_network.BooleanNetwork",
) -> "BooleanNetwork":
"""
Construct a BooleanNetwork from a ``cana.BooleanNetwork`` instance.
This compatibility method converts a Boolean network defined using the
``cana`` package into a BoolForge ``BooleanNetwork``.
Parameters
----------
cana_BooleanNetwork : cana.boolean_network.BooleanNetwork
A Boolean network instance from the ``cana`` package.
Returns
-------
BooleanNetwork
The corresponding BoolForge BooleanNetwork.
Raises
------
TypeError
If the input object does not appear to be a valid CANA BooleanNetwork.
KeyError
If required fields are missing from the CANA logic specification.
"""
try:
logic = cana_BooleanNetwork.logic
except AttributeError as e:
raise TypeError(
"Input must be a cana.boolean_network.BooleanNetwork instance."
) from e
F = []
I = []
variables = []
# Ensure deterministic ordering by node index
for idx in sorted(logic.keys()):
entry = logic[idx]
if "name" not in entry or "in" not in entry or "out" not in entry:
raise KeyError(
f"Logic entry for node {idx} must contain keys "
"'name', 'in', and 'out'."
)
variables.append(str(entry["name"]))
I.append(list(entry["in"]))
F.append(np.array(entry["out"], dtype=int))
return cls(F=F, I=I, variables=variables)
[docs]
@classmethod
def from_string(
cls,
network_string: str,
separator: str | Sequence[str] = ",",
max_degree: int = 24,
original_not: str | Sequence[str] = "NOT",
original_and: str | Sequence[str] = "AND",
original_or: str | Sequence[str] = "OR",
ALLOW_TRUNCATION: bool = False
) -> "BooleanNetwork":
"""
Construct a BooleanNetwork from a textual Boolean rule specification.
This compatibility method parses a string representation of Boolean update
rules and constructs a corresponding BooleanNetwork. The input format is
intended for legacy or trusted sources and supports logical expressions
using AND/OR/NOT operators.
.. warning::
This method uses ``eval`` internally and MUST NOT be used on untrusted
input. It is provided solely for backward compatibility.
Parameters
----------
network_string : str
String encoding Boolean update rules, one per line.
separator : str or sequence of str, optional
Separator(s) between variable names and Boolean expressions.
max_degree : int, optional
Maximum allowed indegree for explicit truth-table construction.
original_not, original_and, original_or : str or sequence of str, optional
Operator strings to be replaced by logical NOT, AND, OR.
ALLOW_TRUNCATION : bool, optional
If False (default), nodes with indegree greater than ``max_degree``
raise a ValueError. If True, such nodes are replaced by identity
self-loops, allowing fast construction of large networks while
ignoring high-degree functions.
Returns
-------
BooleanNetwork
The constructed Boolean network.
Raises
------
ValueError
If parsing fails or if ``ALLOW_TRUNCATION`` is False and
a node exceeds ``max_degree``.
"""
sepstr, andop, orop, notop = "@", "∧", "∨", "¬"
get_dummy_var = lambda i: f"x{int(i)}y"
# reformat network string
def _replace_all(string, original, replacement):
if isinstance(original, (list, np.ndarray)):
for s in original:
string = string.replace(s, f" {replacement} ")
else:
string = string.replace(original, f" {replacement} ")
return string
text = (
network_string.replace('\t', ' ',)
.replace('(', ' ( ')
.replace(')', ' ) ')
)
text = _replace_all(text, separator, sepstr)
text = _replace_all(text, original_not, notop)
text = _replace_all(text, original_and, andop)
text = _replace_all(text, original_or, orop)
# Remove comments and empty lines
lines = [
l for l in text.splitlines()
if l.strip() and not l.strip().startswith("#")
]
n = len(lines)
# find variables and constants
var = ["" for i in range(n)]
for i in range(n):
var[i] = lines[i].split(sepstr)[0].replace(' ', '')
consts_and_vars = []
for line in lines:
words = line.split(' ')
for word in words:
if word not in ['(', ')', sepstr, andop, orop, notop, ''] and not utils.is_float(word):
consts_and_vars.append(word)
consts = list(set(consts_and_vars)-set(var))
dict_var_const = dict(list(zip(var, [get_dummy_var(i) for i in range(len(var))])))
dict_var_const.update(dict(list(zip(consts, [get_dummy_var(i+len(var)) for i in range(len(consts))]))))
# replace all variables and constants with dummy names
for i, line in enumerate(lines):
words = line.split(' ')
for j, word in enumerate(words):
if word not in ['(', ')', sepstr, andop, orop, notop, ''] and not utils.is_float(word):
words[j] = dict_var_const[word]
lines[i] = ' '.join(words)
# update line to only be function
expressions = [line.split(sepstr)[1] for line in lines]
# generate wiring diagram I
I: list[np.ndarray] = []
for i in range(n):
try:
idcs_open = utils.find_all_indices(expressions[i], 'x')
idcs_end = utils.find_all_indices(expressions[i], 'y')
regs = np.sort(np.array(list(map(int,list(set([expressions[i][(begin+1):end] for begin,end in zip(idcs_open,idcs_end)]))))))
I.append(regs)
except ValueError:
I.append(np.array([], int))
# Build Boolean functions
F: list[np.ndarray] = []
for i, expr in enumerate(expressions):
deg = len(I[i])
if deg == 0:
f = np.array([int(expr)], int)
elif deg <= max_degree:
tt = utils.get_left_side_of_truth_table(deg)
env = { get_dummy_var(I[i][j]) : tt[:, j].astype(bool) for j in range(deg) }
f = eval(
expr.replace(andop, '&')
.replace(orop, '|')
.replace(notop, '~')
.replace(' ', ''),
{"__builtins__" : None},
env
)
else:
if not ALLOW_TRUNCATION:
raise ValueError(
f"Node '{var[i]}' has indegree {deg} > max_degree={max_degree}."
)
# Truncate: identity self-loop
F.append(np.array([0, 1], dtype=int))
I[i] = np.array([i], dtype=int)
F.append(f.astype(int))
for j in range(len(consts)):
F.append(np.array([0, 1], dtype=int))
I.append(np.array([len(var) + j]))
return cls(F, I, var+consts)
[docs]
@classmethod
def from_DiGraph(cls, nx_DiGraph: "nx.DiGraph") -> "WiringDiagram":
raise NotImplementedError(
"from_DiGraph is not supported for BooleanNetwork. "
"Use WiringDiagram.from_DiGraph and then construct "
"a BooleanNetwork by providing Boolean update functions."
)
[docs]
def to_cana(self) -> "cana.boolean_network.BooleanNetwork":
"""
Export the Boolean network as a ``cana.BooleanNetwork`` instance.
This compatibility method converts the current BooleanNetwork into an
equivalent representation from the ``cana`` package. The exported network
reflects the current state of the model, including any removed constants,
simplifications, or identity self-loops.
Returns
-------
cana.boolean_network.BooleanNetwork
A ``cana`` BooleanNetwork instance representing this network.
Raises
------
ImportError
If the ``cana`` package is not installed.
"""
try:
import cana
except ImportError as e:
raise ImportError(
"The 'cana' package is required for to_cana()."
) from e
logic_dicts = []
for bf, regulators, var in zip(self.F, self.I, self.variables):
logic_dicts.append(
{
"name": var,
"in": list(regulators),
"out": bf.f.tolist(),
}
)
return cana.boolean_network.BooleanNetwork(
Nnodes=self.N,
logic={i: d for i, d in enumerate(logic_dicts)},
)
[docs]
def to_bnet(
self,
separator: str = ",\t",
AS_POLYNOMIAL: bool = True,
) -> str:
"""
Export the Boolean network in BNET format.
This compatibility method returns a string representation of the Boolean
network in the BNET format used by tools such as BoolNet and PyBoolNet,
with one line per variable of the form ``variable <separator> function``.
Parameters
----------
separator : str, optional
String used to separate the target variable from its update function.
Default is `",\\t"`.
AS_POLYNOMIAL : bool, optional
If True (default), return Boolean functions in polynomial form.
If False, return functions as logical expressions.
Returns
-------
str
A string containing the BNET representation of the network.
"""
lines = []
constants_indices = self.get_constants(AS_DICT=True)
for i in range(self.N):
if constants_indices.get(i, False):
function = str(self.F[i].f[0])
elif AS_POLYNOMIAL:
function = utils.bool_to_poly(
self.F[i],
self.variables[self.I[i]].tolist(),
)
else:
function = self.F[i].to_expression(" & ", " | ")
lines.append(f"{self.variables[i]}{separator}{function}")
return "\n".join(lines)
[docs]
def to_truth_table(
self,
filename: str | None = None,
) -> pd.DataFrame:
"""
Construct the full synchronous truth table of the Boolean network.
Each row corresponds to a network state at time ``t`` and its deterministic
successor at time ``t+1`` under synchronous updating.
Parameters
----------
filename : str, optional
If provided, the truth table is written to a file. The file extension
determines the format and must be one of ``'csv'``, ``'xls'``, or
``'xlsx'``. If None (default), no file is created.
Returns
-------
pandas.DataFrame
The full truth table with shape ``(2**N, 2*N)``.
Notes
-----
- States are enumerated in lexicographic order, consistent with
``utils.get_left_side_of_truth_table``.
- This method computes and stores the synchronous state transition graph
(``self.STG``) if it has not been computed previously.
- Exporting to Excel requires the ``openpyxl`` package.
"""
columns = [name + "(t)" for name in self.variables]
columns += [name + "(t+1)" for name in self.variables]
if self.STG is None:
self.compute_synchronous_state_transition_graph()
data = np.zeros((2**self.N, 2*self.N), dtype=int)
data[:, :self.N] = utils.get_left_side_of_truth_table(self.N)
for i in range(2**self.N):
data[i, self.N:] = utils.dec2bin(self.STG[i], self.N)
truth_table = pd.DataFrame(data, columns=columns)
if filename is not None:
if not isinstance(filename, str):
raise TypeError("filename must be a string")
ending = filename.split(".")[-1]
if ending not in {"csv", "xls", "xlsx"}:
raise ValueError("filename must end in 'csv', 'xls', or 'xlsx'")
if ending == "csv":
truth_table.to_csv(filename, index=False)
else:
truth_table.to_excel(filename, index=False)
return truth_table
def __len__(self):
return self.N
[docs]
def __str__(self):
return (
f"BooleanNetwork(N={self.N}, "
f"indegrees={self.indegrees.tolist()})"
)
def __getitem__(self, index):
return self.F[index]
[docs]
def __repr__(self):
return f"{type(self).__name__}(N={self.N})"
[docs]
def __call__(self, state):
"""
Apply one synchronous update step to the Boolean network.
The next state is obtained by evaluating each node's Boolean update
function on the current values of its regulators.
Parameters
----------
state : sequence of int
Current network state as a binary vector of length ``N``, ordered
according to ``self.variables``.
Returns
-------
np.ndarray
The updated network state after one synchronous update.
Notes
-----
This method is equivalent to calling ``update_network_synchronously``.
"""
return self.update_network_synchronously(state)
[docs]
def get_types_of_regulation(self) -> list[np.ndarray]:
"""
Compute and return regulation types (weights) for all nodes in the network.
For each Boolean function, the type of each input regulation is determined
via ``BooleanFunction.get_type_of_inputs`` and mapped to numerical weights
using ``dict_weights``. The resulting weights are stored in the
``self.weights`` attribute and also returned.
Returns
-------
list of np.ndarray
Regulation weights for each node, aligned with the wiring diagram.
Notes
-----
- This method recomputes ``self.weights`` from scratch.
- Calling this method overwrites any existing values in ``self.weights``.
"""
self.weights = [
np.array([dict_weights[el] for el in bf.get_type_of_inputs()], dtype=float)
for bf in self.F
]
return self.weights
## Transform Boolean networks
[docs]
def simplify_functions(self) -> None:
"""
Remove all non-essential regulators from the Boolean network.
For each node, non-essential regulators (identified via ``np.nan`` entries
in ``self.weights``) are removed from the wiring diagram and the associated
Boolean function is restricted to its essential inputs. Nodes that would
otherwise lose all regulators are assigned an identity self-loop to preserve
network structure.
Notes
-----
- This method modifies the network in place.
- Regulation types (``self.weights``) are recomputed if necessary.
- Identity self-loops introduced here are structural artifacts and do not
represent genuine regulatory interactions.
"""
# Ensure regulation types / weights are available
self.get_types_of_regulation()
for i in range(self.N):
regulator_is_non_essential = np.isnan(self.weights[i])
# All regulators are essential
if not np.any(regulator_is_non_essential):
continue
non_essential_variables = np.where(regulator_is_non_essential)[0]
essential_variables = np.where(~regulator_is_non_essential)[0]
# Update outdegrees (each regulator appears at most once in I[i])
self.outdegrees[non_essential_variables] -= 1
# No essential regulators: introduce identity self-loop
if len(essential_variables) == 0:
self.indegrees[i] = 1
self.F[i].f = np.array([self.F[i][0], self.F[i][0]], dtype=int)
self.F[i].n = 1
self.F[i].variables = np.array([self.variables[i]], dtype=str)
self.I[i] = np.array([i], dtype=int)
self.weights[i] = np.array([np.nan], dtype=float)
self.outdegrees[i] += 1 # keep sum(outdegrees) == sum(indegrees)
continue
# Restrict truth table to essential inputs
left_side = utils.get_left_side_of_truth_table(self.indegrees[i])
mask = np.sum(left_side[:, non_essential_variables], axis=1) == 0
self.F[i].f = self.F[i][mask]
self.F[i].n = len(essential_variables)
self.F[i].variables = self.F[i].variables[~regulator_is_non_essential]
self.I[i] = self.I[i][essential_variables]
self.weights[i] = self.weights[i][essential_variables]
self.indegrees[i] = len(essential_variables)
[docs]
def get_identity_nodes(
self,
AS_DICT: bool = False
) -> dict[int, bool] | np.ndarray:
"""
Identify identity (memory) nodes in the Boolean network.
An identity node is a node with a single self-regulatory edge whose
Boolean update function is the identity function ``f(x) = x``. Such
nodes retain their state over time unless externally modified.
Parameters
----------
AS_DICT : bool, optional
If True, return a dictionary mapping node indices to booleans.
If False (default), return an array of indices of identity nodes.
Returns
-------
dict[int, bool] or np.ndarray
If ``AS_DICT`` is True, a dictionary indicating which nodes are
identity nodes.
If ``AS_DICT`` is False, an array of indices of identity nodes.
"""
is_identity = np.array(
[
self.indegrees[i] == 1
and self.I[i][0] == i
and self.F[i][0] == 0
and self.F[i][1] == 1
for i in range(self.N)
],
dtype=bool,
)
if AS_DICT:
return dict(enumerate(is_identity.tolist()))
return np.where(is_identity)[0]
[docs]
def get_network_with_fixed_identity_nodes(
self,
values_identity_nodes: Sequence[int],
) -> "BooleanNetwork":
"""
Construct a Boolean network with identity nodes fixed to given values.
Identity nodes are nodes with a single self-regulatory edge and identity
update rule ``f(x) = x``. This method fixes the values of such nodes and
returns a new BooleanNetwork with the corresponding constants removed.
Parameters
----------
values_identity_nodes : sequence of int
Values to fix for each identity node, in the order returned by
``get_identity_nodes(AS_DICT=False)``. Each value must be either 0 or 1.
Returns
-------
BooleanNetwork
A new BooleanNetwork with the specified identity nodes fixed. Any
constants previously removed from the original network are preserved.
"""
indices_identity_nodes = self.get_identity_nodes(AS_DICT=False)
if len(values_identity_nodes) != len(indices_identity_nodes):
raise ValueError(
f"The number of values provided ({len(values_identity_nodes)}) must "
f"match the number of identity nodes ({len(indices_identity_nodes)})."
)
for v in values_identity_nodes:
if v not in (0, 1):
raise ValueError("Identity node values must be 0 or 1.")
F = deepcopy(self.F)
I = deepcopy(self.I)
for identity_node, value in zip(indices_identity_nodes, values_identity_nodes):
F[identity_node].f = np.array([value], dtype=int)
I[identity_node] = np.array([], dtype=int)
bn = self.__class__(F, I, self.variables)
# Preserve previously removed constants
bn.constants.update(self.constants)
return bn
[docs]
def get_network_with_node_controls(
self,
indices_controlled_nodes: Sequence[int],
values_controlled_nodes: Sequence[int],
KEEP_CONTROLLED_NODES: bool = False,
) -> "BooleanNetwork":
"""
Construct a Boolean network with specified nodes fixed to given values.
This method applies node-level interventions by fixing selected nodes to
constant Boolean values. Controlled nodes may either be removed from the
dynamic network as constants or retained as identity-clamped nodes.
Parameters
----------
indices_controlled_nodes : sequence of int
Indices of nodes to be fixed.
values_controlled_nodes : sequence of int
Values to fix for each specified node, in the same order as
``indices_controlled_nodes``. Each value must be either 0 or 1.
KEEP_CONTROLLED_NODES : bool, optional
If True, controlled nodes are retained in the network as identity
nodes with self-loops. If False (default), controlled nodes are
eliminated as constants.
Returns
-------
BooleanNetwork
A new BooleanNetwork with the specified node controls applied.
"""
if len(indices_controlled_nodes) != len(values_controlled_nodes):
raise ValueError(
f"The number of controlled nodes ({len(indices_controlled_nodes)}) "
f"must match the number of values provided ({len(values_controlled_nodes)})."
)
for node in indices_controlled_nodes:
if not isinstance(node, int) or node < 0 or node >= self.N:
raise ValueError(f"Invalid node index: {node}")
for v in values_controlled_nodes:
if v not in (0, 1):
raise ValueError("Controlled node values must be 0 or 1.")
F = deepcopy(self.F)
I = deepcopy(self.I)
for node, value in zip(indices_controlled_nodes, values_controlled_nodes):
if KEEP_CONTROLLED_NODES:
# Identity-clamped node
F[node].f = np.array([value, value], dtype=int)
I[node] = np.array([node], dtype=int)
else:
# Structural constant (to be removed)
F[node].f = np.array([value], dtype=int)
I[node] = np.array([], dtype=int)
bn = self.__class__(F, I, self.variables)
# Preserve previously removed constants if controlled nodes are eliminated
if not KEEP_CONTROLLED_NODES:
bn.constants.update(self.constants)
return bn
[docs]
def get_network_with_edge_controls(
self,
control_targets: Sequence[int],
control_sources: Sequence[int],
values_edge_controls: Sequence[int] | None = None,
) -> "BooleanNetwork":
"""
Construct a Boolean network with specified regulatory edges controlled.
This method fixes the influence of selected source nodes on selected target
nodes by restricting the target's Boolean update function to entries where
the source assumes a specified value, and then removing the corresponding
regulatory edge.
Parameters
----------
control_targets : sequence of int
Indices of target nodes.
control_sources : sequence of int
Indices of source nodes whose influence on the corresponding targets
is to be controlled.
values_edge_controls : sequence of int, optional
Fixed values (0 or 1) imposed on each controlled edge. If None, all
controlled edges are fixed to 0.
Returns
-------
BooleanNetwork
A new BooleanNetwork with the specified edge controls applied.
Raises
------
ValueError
If input lengths do not match, indices are invalid, or edge values are
not in {0, 1}.
"""
if len(control_targets) != len(control_sources):
raise ValueError("control_targets and control_sources must have equal length.")
if values_edge_controls is None:
values_edge_controls = [0] * len(control_targets)
if len(values_edge_controls) != len(control_targets):
raise ValueError(
"values_edge_controls must have the same length as control_targets."
)
F_new = deepcopy(self.F)
I_new = deepcopy(self.I)
for target, source, fixed_value in zip(
control_targets, control_sources, values_edge_controls
):
if fixed_value not in (0, 1):
raise ValueError("Edge control values must be 0 or 1.")
if not (0 <= target < self.N):
raise ValueError(f"Invalid target index: {target}")
if not (0 <= source < self.N):
raise ValueError(f"Invalid source index: {source}")
if source not in I_new[target]:
raise ValueError(
f"Source node {source} is not a regulator of target node {target}."
)
idx_reg = list(I_new[target]).index(source)
n_inputs = self.indegrees[target]
truth_indices = np.arange(2**n_inputs, dtype=np.uint32)
mask = ((truth_indices >> (n_inputs - 1 - idx_reg)) & 1) == fixed_value
# Restrict truth table
F_new[target].f = F_new[target].f[mask]
F_new[target].n -= 1
# Remove regulator
I_new[target] = np.delete(I_new[target], idx_reg)
return self.__class__(F_new, I_new, self.variables)
[docs]
def update_single_node(
self,
index: int,
states_regulators: Sequence[int],
) -> int:
"""
Update the state of a single node.
The new state is obtained by applying the Boolean update function to the
states of its regulators.
Parameters
----------
index : int
Index of the node to update.
states_regulators : sequence of int
Binary states of the node's regulators.
Returns
-------
int
Updated state of the node (0 or 1).
"""
return self.F[index].f[utils.bin2dec(states_regulators)].item()
[docs]
def update_network_synchronously(
self,
state: Sequence[int],
) -> np.ndarray:
"""
Perform a synchronous update of the Boolean network.
Parameters
----------
state : sequence of int
Binary state vector of length ``N``.
Returns
-------
np.ndarray
Updated state vector.
"""
state = np.asarray(state, dtype=int)
if state.shape[0] != self.N:
raise ValueError(
f"State vector must have length {self.N}, got {state.shape[0]}."
)
if not np.all((state == 0) | (state == 1)):
raise ValueError("State vector must be binary (0 or 1).")
return self._update_network_synchronously_unchecked(state)
def _update_network_synchronously_unchecked(
self,
state: np.ndarray,
) -> np.ndarray:
"""Internal fast path. Assumes validated binary state."""
next_state = np.zeros(self.N, dtype=int)
for i in range(self.N):
next_state[i] = self.F[i].f[utils.bin2dec(state[self.I[i]])]
return next_state
[docs]
def update_network_SDDS(
self,
state: Sequence[int],
P: np.ndarray,
*,
rng=None,
) -> np.ndarray:
"""
Perform a stochastic discrete dynamical system (SDDS) update of the network.
This update scheme follows the SDDS formalism: for each node, the
deterministic Boolean update is first computed. If the update would
increase the node's state, the change occurs with the node-specific
activation probability. If the update would decrease the node's state,
the change occurs with the node-specific degradation probability.
Otherwise, the node's state remains unchanged.
Parameters
----------
state : sequence of int
Current network state (binary vector of length ``N``).
P : np.ndarray
Array of shape ``(N, 2)``, where ``P[i, 0]`` is the activation
probability and ``P[i, 1]`` is the degradation probability for node ``i``.
rng : optional
Random number generator or seed, passed to ``utils._coerce_rng``.
Returns
-------
np.ndarray
Updated network state after one stochastic SDDS update.
Notes
-----
This implementation follows the SDDS framework introduced in:
Murrugarra, D., Veliz-Cuba, A., Aguilar, B., Arat, S., & Laubenbacher, R.
(2012). *Modeling stochasticity and variability in gene regulatory networks*.
EURASIP Journal on Bioinformatics and Systems Biology, 2012(1), 5.
The method assumes that ``state`` is a valid binary vector and that
``P`` has the correct shape; no additional validation is performed
for performance reasons.
"""
rng = utils._coerce_rng(rng)
state = np.asarray(state, dtype=int)
Fx = state.copy()
for i in range(self.N):
nextstep = self.update_single_node(
index=i,
states_regulators=state[self.I[i]],
)
if nextstep > state[i]:
if rng.random() < P[i, 0]:
Fx[i] = nextstep
elif nextstep < state[i]:
if rng.random() < P[i, 1]:
Fx[i] = nextstep
return Fx
[docs]
def get_steady_states_asynchronous_exact(
self,
stochastic_weights: Sequence[float] | None = None,
max_iterations: int = 1000,
tol: float = 1e-9,
) -> dict:
"""
Compute the steady states and basin probabilities under general asynchronous update.
This method exhaustively constructs the asynchronous state transition graph
(STG) of the Boolean network under a general asynchronous update scheme,
where nodes are selected for update according to given propensities.
The resulting Markov chain is solved exactly using an iterative
Gauss–Seidel scheme to obtain absorption probabilities into steady states.
Parameters
----------
stochastic_weights : sequence of float or None, optional
Relative update propensities for each node. If None (default),
all nodes are updated with equal probability. The weights are
normalized internally.
max_iterations : int, optional
Maximum number of Gauss–Seidel iterations used to compute absorption
probabilities before declaring non-convergence.
tol : float, optional
Convergence tolerance for the infinity norm of probability updates.
s
Returns
-------
dict
Dictionary with the following entries:
- ``"SteadyStates"`` : list of int
Decimal representations of steady states.
- ``"NumberOfSteadyStates"`` : int
Total number of steady states.
- ``"BasinSizes"`` : np.ndarray
Fraction of the state space converging to each steady state.
- ``"STGAsynchronous"`` : dict
Asynchronous state transition graph represented as a Markov kernel.
- ``"FinalTransitionProbabilities"`` : np.ndarray
Absorption probabilities from each state into each steady state.
Raises
------
RuntimeError
If the iterative solver does not converge within ``max_iterations``.
"""
left_side_of_truth_table = utils.get_left_side_of_truth_table(self.N)
if stochastic_weights is not None:
stochastic_weights = np.asarray(stochastic_weights, dtype=float)
if stochastic_weights.shape[0] != self.N:
raise ValueError("stochastic_weights must have length N.")
if np.any(stochastic_weights <= 0):
raise ValueError("stochastic_weights must be strictly positive.")
stochastic_weights = stochastic_weights / stochastic_weights.sum()
else:
stochastic_weights = np.full(self.N, 1.0 / self.N)
steady_states = []
steady_state_dict = {}
STG = dict(zip(range(2**self.N),[{} for i in range(2**self.N)]))
sped_up_STG = dict(zip(range(2**self.N),[[np.zeros(0,dtype=int),np.zeros(0,dtype=float)] for i in range(2**self.N)]))
for xdec in range(2**self.N):
x = left_side_of_truth_table[xdec].copy() #important: must create a copy here!
to_be_distributed = 0
for i in range(self.N):
fx_i = self.update_single_node(i, x[self.I[i]])
if fx_i > x[i]:
fxdec = xdec + 2**(self.N - 1 - i)
elif fx_i < x[i]:
fxdec = xdec - 2**(self.N - 1 - i)
else:
fxdec = xdec
if fxdec in STG[xdec]:
STG[xdec][fxdec] += stochastic_weights[i]
else:
STG[xdec][fxdec] = stochastic_weights[i]
if fxdec!=xdec:
sped_up_STG[xdec][0] = np.append(sped_up_STG[xdec][0], fxdec)
sped_up_STG[xdec][1] = np.append(sped_up_STG[xdec][1], stochastic_weights[i])
else:
to_be_distributed += stochastic_weights[i]
if to_be_distributed < 1:
sped_up_STG[xdec][1] /= (1-to_be_distributed)
if len(STG[xdec])==1:
steady_state_dict[xdec] = len(steady_states)
steady_states.append(xdec)
sped_up_STG[xdec][0] = np.append(sped_up_STG[xdec][0], xdec)
sped_up_STG[xdec][1] = np.append(sped_up_STG[xdec][1], 1)
# Probability vectors for all states
final_probabilities = np.zeros((2**self.N, len(steady_states)), dtype=float)
# Boundary conditions: absorbing states have probability 1 of themselves
for xdec in steady_states:
final_probabilities[xdec, steady_state_dict[xdec]] = 1.0
transient_states = [xdec for xdec in range(2**self.N) if xdec not in steady_state_dict]
for it in range(1, max_iterations + 1):
max_delta = 0.0
# In-place Gauss–Seidel update:
for xdec in transient_states:
nxt, pr = sped_up_STG[xdec]
old = final_probabilities[xdec].copy()
final_probabilities[xdec] = np.dot(pr, final_probabilities[nxt, :]) # weighted average of successor probability vectors
# track convergence (infinity norm per row)
delta = np.max(np.abs(final_probabilities[xdec] - old))
if delta > max_delta:
max_delta = delta
if max_delta < tol:
basin_sizes = final_probabilities.sum(0)/2**self.N
return {
"SteadyStates": steady_states,
"NumberOfSteadyStates": len(steady_states),
"BasinSizes": basin_sizes,
"STGAsynchronous": STG,
"FinalTransitionProbabilities": final_probabilities,
}
raise RuntimeError(f"Did not converge in {max_iterations} iterations; last max_delta={max_delta:g}")
[docs]
def get_steady_states_asynchronous(
self,
nsim: int = 500,
initial_states: Sequence[int] | None = None,
search_depth: int = 50,
DEBUG: bool = False,
*,
rng=None,
) -> dict:
"""
Approximate steady states of a Boolean network under asynchronous updates.
This method performs a Monte Carlo–style exploration of the asynchronous
state space by simulating asynchronous updates from a collection of initial
states. Each simulation proceeds until a steady state is reached or until
a maximum search depth is exceeded.
Unlike ``get_steady_states_asynchronous_exact``, this method does *not*
exhaustively explore the full state space and does not guarantee that all
steady states will be found. It is intended for large networks where exact
enumeration is infeasible.
Parameters
----------
nsim : int, optional
Number of asynchronous simulations to perform (default is 500).
initial_states : sequence of int or None, optional
Initial states to use for the simulations, given as decimal
representations of network states. If None (default), ``nsim``
random initial states are generated.
search_depth : int, optional
Maximum number of asynchronous update steps per simulation before
giving up on convergence (default is 50).
DEBUG : bool, optional
If True, print detailed debugging information during simulation.
rng : optional
Random number generator or seed, passed to ``utils._coerce_rng``.
Returns
-------
dict
Dictionary with the following entries:
- ``"SteadyStates"`` : list of int
Decimal representations of steady states encountered.
- ``"NumberOfSteadyStates"`` : int
Number of unique steady states found.
- ``"BasinSizes"`` : list of int
Counts of how many simulations converged to each steady state.
- ``"STGAsynchronous"`` : dict
Partial cache of asynchronous transitions encountered during
simulation. Keys are ``(state, node_index)`` and values are
successor states (all in decimal form).
- ``"InitialSamplePoints"`` : list of int
Decimal initial states used in the simulations (either provided
explicitly or generated randomly).
Notes
-----
- This method detects only *steady states* (fixed points). If the
asynchronous dynamics contain limit cycles, simulations may fail
to converge within ``search_depth``.
- The returned asynchronous transition graph is generally incomplete
and should be interpreted as a cache of explored transitions rather
than the full STG.
"""
rng = utils._coerce_rng(rng)
sampled_states: list[int] = []
STG_asynchronous: dict[tuple[int, int], int] = {}
steady_states: list[int] = []
basin_sizes: list[int] = []
steady_state_dict: dict[int, int] = {}
for iteration in range(nsim):
# Initialize state
if initial_states is None:
x = rng.integers(2, size=self.N)
xdec = utils.bin2dec(x)
sampled_states.append(xdec)
else:
xdec = initial_states[iteration]
x = utils.dec2bin(xdec, self.N)
if DEBUG:
print(iteration, -1, -1, False, xdec, x)
for step in range(search_depth):
FOUND_NEW_STATE = False
# Check if state is already known to be steady
if xdec in steady_state_dict:
basin_sizes[steady_state_dict[xdec]] += 1
break
update_order = rng.permutation(self.N)
for i in map(int, update_order):
try:
fxdec = STG_asynchronous[(xdec, i)]
except KeyError:
fx_i = self.update_single_node(i, x[self.I[i]])
if fx_i > x[i]:
fxdec = xdec + 2 ** (self.N - 1 - i)
x[i] = 1
FOUND_NEW_STATE = True
elif fx_i < x[i]:
fxdec = xdec - 2 ** (self.N - 1 - i)
x[i] = 0
FOUND_NEW_STATE = True
else:
fxdec = xdec
STG_asynchronous[(xdec, i)] = fxdec
if fxdec != xdec:
xdec = fxdec
FOUND_NEW_STATE = True
break
if DEBUG:
print(iteration, step, i, FOUND_NEW_STATE, xdec, x)
if not FOUND_NEW_STATE:
# New steady state found
if xdec in steady_state_dict:
basin_sizes[steady_state_dict[xdec]] += 1
else:
steady_state_dict[xdec] = len(steady_states)
steady_states.append(xdec)
basin_sizes.append(1)
break
if DEBUG:
print()
if sum(basin_sizes) < nsim:
print(
f"Warning: only {sum(basin_sizes)} of the {nsim} simulations "
"reached a steady state. Consider increasing search_depth. "
"The network may also contain asynchronous limit cycles."
)
return {
"SteadyStates": steady_states,
"NumberOfSteadyStates": len(steady_states),
"BasinSizes": basin_sizes,
"STGAsynchronous": STG_asynchronous,
"InitialSamplePoints": (
initial_states if initial_states is not None else sampled_states
),
}
[docs]
def get_steady_states_asynchronous_given_one_initial_condition(
self,
initial_condition: int | Sequence[int] = 0,
nsim: int = 500,
stochastic_weights: Sequence[float] | None = None,
search_depth: int = 50,
DEBUG: bool = False,
*,
rng=None,
) -> dict:
"""
Approximate steady states reachable from a single initial condition under
asynchronous updates.
This method performs multiple asynchronous simulations starting from the
same initial condition. In each simulation, nodes are updated one at a time
according to either a uniform random order or node-specific stochastic
update propensities. The simulation proceeds until a steady state is reached
or a maximum number of update steps is exceeded.
The method is sampling-based and does *not* guarantee that all reachable
steady states are found. It is intended for exploratory analysis and for
networks where exhaustive asynchronous analysis is infeasible.
Parameters
----------
initial_condition : int or sequence of int, optional
Initial network state. If an integer is provided, it is interpreted as
the decimal encoding of a Boolean state. If a sequence is provided, it
must be a binary vector of length ``N``. Default is 0.
nsim : int, optional
Number of asynchronous simulation runs (default is 500).
stochastic_weights : sequence of float or None, optional
Relative update propensities for each node. If provided, must have
length ``N`` and be strictly positive. The weights are normalized
internally. If None (default), nodes are updated uniformly at random.
search_depth : int, optional
Maximum number of asynchronous update steps per simulation.
DEBUG : bool, optional
If True, print detailed debugging information during simulation.
rng : optional
Random number generator or seed, passed to ``utils._coerce_rng``.
Returns
-------
dict
Dictionary with the following entries:
- ``"SteadyStates"`` : list of int
Decimal representations of steady states reached.
- ``"NumberOfSteadyStates"`` : int
Number of unique steady states found.
- ``"BasinSizes"`` : list of int
Number of simulations converging to each steady state.
- ``"TransientTimes"`` : list of list of int
For each steady state, a list of transient lengths (number of update
steps before convergence).
- ``"STGAsynchronous"`` : dict
Partial cache of asynchronous transitions encountered during
simulation. Keys are ``(state, node_index)`` and values are successor
states (all in decimal form).
- ``"UpdateQueues"`` : list of list of int
For each simulation, the sequence of visited states (in decimal form).
Notes
-----
- Only steady states (fixed points) are detected. If the asynchronous
dynamics contain limit cycles, simulations may fail to converge within
``search_depth``.
- The returned asynchronous transition graph is incomplete and represents
only transitions encountered during sampling.
"""
rng = utils._coerce_rng(rng)
# --- Initialize initial condition ---
if isinstance(initial_condition, int):
x0 = utils.dec2bin(initial_condition, self.N)
x0dec = initial_condition
else:
x0 = np.asarray(initial_condition, dtype=int)
if x0.shape[0] != self.N:
raise ValueError(
f"Initial condition must have length {self.N}, got {x0.shape[0]}."
)
x0dec = utils.bin2dec(x0)
# --- Handle stochastic weights ---
if stochastic_weights is not None:
stochastic_weights = np.asarray(stochastic_weights, dtype=float)
if stochastic_weights.shape[0] != self.N:
raise ValueError("stochastic_weights must have length N.")
if np.any(stochastic_weights <= 0):
raise ValueError("stochastic_weights must be strictly positive.")
stochastic_weights = stochastic_weights / stochastic_weights.sum()
# --- Bookkeeping ---
STG_async: dict[tuple[int, int], int] = {}
steady_states: list[int] = []
basin_sizes: list[int] = []
transient_times: list[list[int]] = []
steady_state_dict: dict[int, int] = {}
queues: list[list[int]] = []
# --- Simulations ---
for iteration in range(nsim):
x = x0.copy()
xdec = x0dec
queue = [xdec]
for step in range(search_depth):
FOUND_NEW_STATE = False
# If already known steady state, stop
if xdec in steady_state_dict:
idx = steady_state_dict[xdec]
basin_sizes[idx] += 1
transient_times[idx].append(step)
queues.append(queue)
break
# Choose update order
if stochastic_weights is None:
update_order = rng.permutation(self.N)
else:
update_order = rng.choice(
self.N, size=self.N, replace=False, p=stochastic_weights
)
for i in map(int, update_order):
try:
fxdec = STG_async[(xdec, i)]
except KeyError:
fx_i = self.update_single_node(i, x[self.I[i]])
if fx_i > x[i]:
fxdec = xdec + 2 ** (self.N - 1 - i)
x[i] = 1
elif fx_i < x[i]:
fxdec = xdec - 2 ** (self.N - 1 - i)
x[i] = 0
else:
fxdec = xdec
STG_async[(xdec, i)] = fxdec
if fxdec != xdec:
xdec = fxdec
queue.append(xdec)
FOUND_NEW_STATE = True
break
if DEBUG:
print(iteration, step, i, FOUND_NEW_STATE, xdec, x)
if not FOUND_NEW_STATE:
# New steady state reached
if xdec in steady_state_dict:
idx = steady_state_dict[xdec]
basin_sizes[idx] += 1
transient_times[idx].append(step)
else:
steady_state_dict[xdec] = len(steady_states)
steady_states.append(xdec)
basin_sizes.append(1)
transient_times.append([step])
queues.append(queue)
break
if DEBUG:
print()
if sum(basin_sizes) < nsim:
print(
f"Warning: only {sum(basin_sizes)} of the {nsim} simulations "
"reached a steady state. Consider increasing search_depth. "
"The network may contain asynchronous limit cycles."
)
return {
"SteadyStates": steady_states,
"NumberOfSteadyStates": len(steady_states),
"BasinSizes": basin_sizes,
"TransientTimes": transient_times,
"STGAsynchronous": STG_async,
"UpdateQueues": queues,
}
[docs]
def get_attractors_synchronous(
self,
nsim: int = 500,
initial_sample_points: Sequence[int | Sequence[int]] | None = None,
n_steps_timeout: int = 1000,
INITIAL_SAMPLE_POINTS_AS_BINARY_VECTORS: bool = False,
USE_NUMBA: bool = True,
*,
rng=None,
) -> dict:
"""
Approximate synchronous attractors of a Boolean network via sampling.
This method estimates the synchronous attractors (fixed points and cycles)
of a Boolean network by simulating synchronous updates from a collection
of initial states. For each simulation, the network is updated until an
attractor is reached or a maximum number of update steps is exceeded.
The method is sampling-based and does *not* guarantee that all attractors
are found. Basin sizes are lower-bound estimates based on the sampled
initial conditions.
If Numba is available and ``USE_NUMBA=True``, synchronous updates are
accelerated using a compiled kernel.
Parameters
----------
nsim : int, optional
Number of initial conditions to simulate (default is 500). Ignored if
``initial_sample_points`` is provided.
initial_sample_points : sequence of int or sequence of sequence of int, optional
Initial states to use. If provided, its length determines the number
of simulations. Interpretation depends on
``INITIAL_SAMPLE_POINTS_AS_BINARY_VECTORS``.
n_steps_timeout : int, optional
Maximum number of synchronous update steps per simulation before
declaring a timeout (default is 1000).
INITIAL_SAMPLE_POINTS_AS_BINARY_VECTORS : bool, optional
If True, ``initial_sample_points`` are interpreted as binary vectors;
otherwise they are interpreted as decimal-encoded states.
USE_NUMBA : bool, optional
If True (default) and Numba is available, use a Numba-accelerated
synchronous update kernel.
rng : optional
Random number generator or seed, passed to ``utils._coerce_rng``.
Returns
-------
dict
Dictionary with the following entries:
- ``"Attractors"`` : list of list of int
Attractors found, each represented as a list of decimal states
(cycles are given in cyclic order).
- ``"NumberOfAttractors"`` : int
Number of distinct attractors found.
- ``"BasinSizes"`` : list of int
Number of sampled initial conditions converging to each attractor.
- ``"AttractorDict"`` : dict
Mapping from visited states (decimal) to attractor index.
- ``"InitialSamplePoints"`` : list of int
Decimal initial states used for sampling.
- ``"STG"`` : dict
Sampled synchronous state transition graph
(state → successor state).
- ``"NumberOfTimeouts"`` : int
Number of simulations that did not converge within
``n_steps_timeout``.
Notes
-----
- This method is intended for networks with long transient dynamics, where
exhaustive synchronous analysis is infeasible.
- Basin sizes are *sampling-based estimates* and should not be interpreted
as exact proportions of the state space.
"""
rng = utils._coerce_rng(rng)
# --- Bookkeeping ---
dictF: dict[int, int] = {} # memorized synchronous transitions
attractors: list[list[int]] = [] # attractor cycles
basin_sizes: list[int] = [] # basin counts
attr_dict: dict[int, int] = {} # state -> attractor index
STG: dict[int, int] = {} # sampled synchronous STG
n_timeout = 0
sampled_points: list[int] = []
INITIAL_SAMPLE_POINTS_EMPTY = initial_sample_points is None
if not INITIAL_SAMPLE_POINTS_EMPTY:
nsim = len(initial_sample_points)
# --- Decide update backend ---
use_numba = __LOADED_NUMBA__ and USE_NUMBA
if use_numba:
F_array_list = List([np.array(bf.f, dtype=np.uint8) for bf in self.F])
I_array_list = List([np.array(regs, dtype=np.int64) for regs in self.I])
# --- Main simulation loop ---
for sim_idx in range(nsim):
# Initialize state
if INITIAL_SAMPLE_POINTS_EMPTY:
x = rng.integers(2, size=self.N, dtype=np.uint8)
xdec = utils.bin2dec(x)
sampled_points.append(xdec)
else:
if INITIAL_SAMPLE_POINTS_AS_BINARY_VECTORS:
x = np.asarray(initial_sample_points[sim_idx], dtype=np.uint8)
if x.shape[0] != self.N:
raise ValueError(
f"Initial state must have length {self.N}, got {x.shape[0]}."
)
xdec = utils.bin2dec(x)
else:
xdec = int(initial_sample_points[sim_idx])
x = np.array(utils.dec2bin(xdec, self.N), dtype=np.uint8)
visited = {xdec: 0}
trajectory = [xdec]
count = 0
# --- Iterate until attractor or timeout ---
while count < n_steps_timeout:
if xdec in dictF:
fxdec = dictF[xdec]
else:
if use_numba:
fx = _update_network_synchronously_numba(
x, F_array_list, I_array_list
)
else:
fx = self._update_network_synchronously_unchecked(x)
fxdec = utils.bin2dec(fx)
dictF[xdec] = fxdec
x = fx
# record sampled STG edge (first visit only)
if count == 0:
STG[xdec] = fxdec
# already assigned to known attractor
if fxdec in attr_dict:
idx_attr = attr_dict[fxdec]
basin_sizes[idx_attr] += 1
for s in trajectory:
attr_dict[s] = idx_attr
break
# new attractor detected
if fxdec in visited:
cycle_start = visited[fxdec]
attractor_states = trajectory[cycle_start:]
attractors.append(attractor_states)
basin_sizes.append(1)
idx_attr = len(attractors) - 1
for s in attractor_states:
attr_dict[s] = idx_attr
break
# continue traversal
visited[fxdec] = len(trajectory)
trajectory.append(fxdec)
xdec = fxdec
count += 1
if count == n_steps_timeout:
n_timeout += 1
break
return {
"Attractors": attractors,
"NumberOfAttractors": len(attractors),
"BasinSizes": basin_sizes,
"AttractorDict": attr_dict,
"InitialSamplePoints": (
sampled_points if INITIAL_SAMPLE_POINTS_EMPTY else list(initial_sample_points)
),
"STG": STG,
"NumberOfTimeouts": n_timeout,
}
[docs]
def compute_synchronous_state_transition_graph(
self,
USE_NUMBA: bool = True,
) -> None:
"""
Compute the exact synchronous state transition graph (STG).
The STG is stored in ``self.STG`` as a one-dimensional NumPy array of length
``2**N``, where ``self.STG[x]`` is the decimal representation of the successor
state reached from state ``x`` under synchronous updating.
This computation is exact and requires memory proportional to ``2**N``.
It is therefore intended for small-to-moderate networks only.
Parameters
----------
USE_NUMBA : bool, optional
If True (default) and Numba is available, use a compiled kernel to
accelerate computation.
"""
# Optional: avoid recomputation
if self.STG is not None:
return
if __LOADED_NUMBA__ and USE_NUMBA:
# Preprocess data into Numba-friendly types
F_list = [np.array(bf.f, dtype=np.uint8) for bf in self.F]
I_list = [np.array(regs, dtype=np.int64) for regs in self.I]
if self.N <= 22:
self.STG = _compute_synchronous_stg_numba(F_list, I_list, self.N)
else:
self.STG = _compute_synchronous_stg_numba_low_memory(
F_list, I_list, self.N
)
return
# -------- Pure NumPy implementation --------
# 1. Enumerate all states (binary)
states = utils.get_left_side_of_truth_table(self.N)
# 2. Allocate next-state matrix
next_states = np.zeros_like(states, dtype=np.uint8)
# Binary-to-decimal weights
powers_of_two = (1 << np.arange(self.N))[::-1]
# 3. Compute next state for each node
for j, bf in enumerate(self.F):
regulators = self.I[j]
if len(regulators) == 0:
# Constant node
next_states[:, j] = bf.f[0]
continue
subspace = states[:, regulators]
idx = np.dot(subspace, powers_of_two[-len(regulators):])
next_states[:, j] = bf.f[idx]
# 4. Convert next states to decimal
self.STG = np.dot(next_states, powers_of_two).astype(np.int64)
[docs]
def get_attractors_synchronous_exact(
self,
USE_NUMBA: bool = True,
) -> dict:
"""
Compute all attractors and their exact basin sizes under synchronous updating.
This method computes the exact synchronous state transition graph (STG) and
analyzes it as a functional graph on ``2**N`` states. All attractors (cycles),
their basin sizes, and the attractor reached from each state are determined
exactly.
This computation requires memory and time proportional to ``2**N`` and is
intended for small-to-moderate networks only.
Parameters
----------
USE_NUMBA : bool, optional
If True (default) and Numba is available, use a compiled kernel for
attractor detection.
Returns
-------
dict
Dictionary with keys:
- Attractors : list[list[int]]
Each attractor represented as a list of decimal states forming a cycle.
- NumberOfAttractors : int
Total number of attractors.
- BasinSizes : np.ndarray[float]
Fraction of all states belonging to each attractor basin.
- AttractorID : np.ndarray[int]
For each of the ``2**N`` states, the index of the attractor it reaches.
- STG : np.ndarray[int]
The synchronous state transition graph.
"""
if self.STG is None:
self.compute_synchronous_state_transition_graph(USE_NUMBA=USE_NUMBA)
attractors = []
if __LOADED_NUMBA__ and USE_NUMBA:
attractor_id, basin_sizes, cycle_rep, cycle_len, n_attr = (
_attractors_functional_graph(self.STG)
)
for k in range(int(n_attr)):
rep = int(cycle_rep[k])
L = int(cycle_len[k])
cyc = [rep]
x = rep
for _ in range(L - 1):
x = int(self.STG[x])
cyc.append(x)
attractors.append(cyc)
else:
attractor_id = -np.ones(2**self.N, dtype=np.int32)
basin_sizes = []
n_attr = 0
for xdec in range(2**self.N):
if attractor_id[xdec] != -1:
continue
cur = xdec
queue = [cur]
while True:
fxdec = int(self.STG[cur])
if attractor_id[fxdec] != -1:
idx_attr = attractor_id[fxdec]
basin_sizes[idx_attr] += len(queue)
for q in queue:
attractor_id[q] = idx_attr
break
if fxdec in queue:
idx = queue.index(fxdec)
cycle = queue[idx:]
attractors.append(cycle)
basin_sizes.append(len(queue))
for q in queue:
attractor_id[q] = n_attr
n_attr += 1
break
queue.append(fxdec)
cur = fxdec
basin_sizes = np.array(basin_sizes, dtype=np.float64) / (2**self.N)
return {
"Attractors": attractors,
"NumberOfAttractors": len(attractors),
"BasinSizes": basin_sizes,
"AttractorID": attractor_id,
"STG": self.STG,
}
[docs]
def get_transient_lengths_exact(
self,
USE_NUMBA : bool = True
) -> np.ndarray:
"""
Compute exact transient length using:
- Full STG from get_attractors_synchronous_exact()
- Attractors (cycle states) from get_attractors_synchronous_exact()
This avoids indegree-pruning because cycle states are given explicitly.
"""
attractor_info = self.get_attractors_synchronous_exact(USE_NUMBA=USE_NUMBA)
stg = self.STG # full mapping: successor(s)
attractors = attractor_info["Attractors"] # list of cycles
if __LOADED_NUMBA__ and USE_NUMBA:
is_attr_mask = np.full(2**self.N, dtype=np.uint8)
for i, states in enumerate(attractors):
states_arr = np.asarray(states, dtype=np.int64)
is_attr_mask[states_arr] = 1
return _transient_lengths_functional_numba(
self.STG.astype(np.int64, copy=False),
is_attr_mask
)
# Normalize STG to an integer array/list succ where succ[u] = v
if isinstance(stg, np.ndarray):
succ = stg.astype(int, copy=False)
n = int(succ.shape[0])
else:
succ = list(stg)
n = len(succ)
# Build reverse adjacency list rev[v] = all u such that u -> v
rev = [[] for _ in range(n)]
for u in range(n):
v = int(succ[u])
if v < 0 or v >= n:
raise ValueError(f"Invalid successor: {u} -> {v}")
rev[v].append(u)
# Initialize distances: all cycle states have transient length 0
dist = np.full(n, -1, dtype=np.int64)
bfs = deque()
for cycle in attractors:
for s in cycle:
if dist[s] == -1:
dist[s] = 0
bfs.append(s)
# Multi-source BFS outward from cycle states
while bfs:
v = bfs.popleft()
for u in rev[v]:
if dist[u] == -1:
dist[u] = dist[v] + 1
bfs.append(u)
# If STG is complete, every state must get a distance
if any(d < 0 for d in dist):
raise RuntimeError("Some states did not receive a transient length. Is STG complete?")
return np.array(dist,dtype=int)
## Robustness measures: synchronous Derrida value, entropy of basin size distribution, coherence, fragility
[docs]
def get_attractors_and_robustness_measures_synchronous_exact(
self,
USE_NUMBA: bool = True,
GET_STRATIFIED_COHERENCES : bool = False
) -> dict:
"""
Compute attractors and exact robustness measures of a synchronously
updated Boolean network.
This method constructs the exact synchronous state transition graph
(STG) on ``2**N`` states and analyzes it as a functional graph. All
attractors (cycles), basin sizes, and the attractor reached from each
state are determined exactly. Based on this decomposition, exact
coherence and fragility measures are computed for the full network,
for each basin of attraction, and for each attractor.
Optionally, coherence can be stratified by the transient length
(distance from the attractor) of each state, allowing robustness to be
analyzed as a function of how far states lie from their eventual
attractor.
This computation requires memory and time proportional to ``2**N`` and
is intended for small-to-moderate networks. When Numba is enabled,
exact and stratified robustness measures remain feasible up to
moderate values of ``N`` (e.g., ``N ≈ 20`` on typical hardware).
Parameters
----------
USE_NUMBA : bool, optional
If True (default) and Numba is available, compiled kernels are used
for robustness and transient-length computations, resulting in
substantial speedups.
GET_STRATIFIED_COHERENCES : bool, optional
If True, coherence is additionally computed as a function of the
transient length (distance to the attractor) of each state.
When Numba is enabled, this option incurs only modest additional
computational cost. Default is False.
Returns
-------
dict
Dictionary with the following keys:
- Attractors : list[list[int]]
Each attractor represented as a list of decimal states forming
a cycle.
- NumberOfAttractors : int
Total number of attractors.
- BasinSizes : np.ndarray of float
Fraction of all states belonging to each attractor basin.
- AttractorID : np.ndarray of int
For each of the ``2**N`` states, the index of the attractor it
eventually reaches.
- Coherence : float
Exact global network coherence.
- Fragility : float
Exact global network fragility.
- BasinCoherence : np.ndarray of float
Exact coherence of each basin of attraction.
- BasinFragility : np.ndarray of float
Exact fragility of each basin of attraction.
- AttractorCoherence : np.ndarray of float
Exact coherence of each attractor.
- AttractorFragility : np.ndarray of float
Exact fragility of each attractor.
If ``GET_STRATIFIED_COHERENCES`` is True, the dictionary additionally
contains:
- StratifiedCoherences : np.ndarray of float
Coherence values stratified by attractor and transient length.
- DistanceFromAttractorCount : np.ndarray of int
Number of state–hypercube-edge incidences contributing to each
stratified coherence entry.
- DistanceFromAttractor : np.ndarray of int
Transient length (distance to attractor) for each state.
"""
# ------------------------------------------------------------------
# 0) Attractors and basins
# ------------------------------------------------------------------
result = self.get_attractors_synchronous_exact(USE_NUMBA=USE_NUMBA)
attractors = result["Attractors"]
n_attractors = int(result["NumberOfAttractors"])
basin_sizes = np.asarray(result["BasinSizes"], dtype=np.float64)
attractor_id = np.asarray(result["AttractorID"], dtype=np.int64)
n_states = 1 << self.N
# ------------------------------------------------------------------
# Single-attractor shortcut
# ------------------------------------------------------------------
if n_attractors == 1:
return {
"Attractors": attractors,
"NumberOfAttractors": 1,
"BasinSizes": basin_sizes,
"AttractorID": attractor_id,
"Coherence": 1.0,
"Fragility": 0.0,
"BasinCoherence": np.ones(1, dtype=np.float64),
"BasinFragility": np.zeros(1, dtype=np.float64),
"AttractorCoherence": np.ones(1, dtype=np.float64),
"AttractorFragility": np.zeros(1, dtype=np.float64),
}
# ------------------------------------------------------------------
# 1) Attractor membership and lengths
# ------------------------------------------------------------------
is_attr_mask = np.zeros(n_states, dtype=np.uint8)
len_attractors = np.empty(n_attractors, dtype=np.int64)
for i, states in enumerate(attractors):
states_arr = np.asarray(states, dtype=np.int64)
len_attractors[i] = states_arr.size
is_attr_mask[states_arr] = 1
# ------------------------------------------------------------------
# 2) Mean binary vector per attractor
# ------------------------------------------------------------------
mean_states_attractors = np.empty((n_attractors, self.N), dtype=np.float64)
for i, states in enumerate(attractors):
if len(states) == 1:
mean_states_attractors[i] = np.asarray(
utils.dec2bin(states[0], self.N), dtype=np.float64
)
else:
arr = np.asarray(
[utils.dec2bin(s, self.N) for s in states], dtype=np.float64
)
mean_states_attractors[i] = arr.mean(axis=0)
# ------------------------------------------------------------------
# 3) Distance matrix between attractors
# ------------------------------------------------------------------
diff = mean_states_attractors[:, None, :] - mean_states_attractors[None, :, :]
distance_between_attractors = np.sum(np.abs(diff), axis=2)
distance_between_attractors = np.asarray(
distance_between_attractors / float(self.N), dtype=np.float64
)
# ------------------------------------------------------------------
# 4) Hypercube edge traversal
# ------------------------------------------------------------------
if __LOADED_NUMBA__ and USE_NUMBA:
if GET_STRATIFIED_COHERENCES:
distances_from_attractor = _transient_lengths_functional_numba(
self.STG.astype(np.int64, copy=False),
is_attr_mask
)
max_distance_from_attractor = int(distances_from_attractor.max())
(
basin_coherences,
basin_fragilities,
attractor_coherences,
attractor_fragilities,
stratified_coherences,
n_states_with_specific_distance_from_attractor,
) = _robustness_edge_traversal_numba_stratified(
int(self.N),
attractor_id,
is_attr_mask,
distance_between_attractors,
distances_from_attractor,
max_distance_from_attractor,
)
stratified_coherences = np.asarray(stratified_coherences, dtype=np.float64)
n_states_with_specific_distance_from_attractor = np.asarray(n_states_with_specific_distance_from_attractor, dtype=int)
else:
(
basin_coherences,
basin_fragilities,
attractor_coherences,
attractor_fragilities,
) = _robustness_edge_traversal_numba(
int(self.N),
attractor_id,
is_attr_mask,
distance_between_attractors,
)
basin_coherences = np.asarray(basin_coherences, dtype=np.float64)
basin_fragilities = np.asarray(basin_fragilities, dtype=np.float64)
attractor_coherences = np.asarray(attractor_coherences, dtype=np.float64)
attractor_fragilities = np.asarray(attractor_fragilities, dtype=np.float64)
else:
basin_coherences = np.zeros(n_attractors, dtype=np.float64)
basin_fragilities = np.zeros(n_attractors, dtype=np.float64)
attractor_coherences = np.zeros(n_attractors, dtype=np.float64)
attractor_fragilities = np.zeros(n_attractors, dtype=np.float64)
if GET_STRATIFIED_COHERENCES:
distances_from_attractor = self.get_transient_lengths_exact(result)
max_distance_from_attractor = max(distances_from_attractor)
stratified_coherences = np.zeros((n_attractors,max_distance_from_attractor+1), dtype=np.float64)
n_states_with_specific_distance_from_attractor = np.zeros((n_attractors,max_distance_from_attractor+1), dtype=int)
for xdec in range(n_states):
for bitpos in range(self.N):
if (xdec >> bitpos) & 1:
continue
ydec = xdec | (1 << bitpos)
idx_x = attractor_id[xdec]
idx_y = attractor_id[ydec]
if GET_STRATIFIED_COHERENCES:
n_states_with_specific_distance_from_attractor[idx_x,distances_from_attractor[xdec]] += 1
n_states_with_specific_distance_from_attractor[idx_y,distances_from_attractor[ydec]] += 1
if idx_x == idx_y:
basin_coherences[idx_x] += 2.0
if is_attr_mask[xdec]:
attractor_coherences[idx_x] += 1.0
if is_attr_mask[ydec]:
attractor_coherences[idx_y] += 1.0
if GET_STRATIFIED_COHERENCES:
stratified_coherences[idx_x,distances_from_attractor[xdec]] += 1.0
stratified_coherences[idx_y,distances_from_attractor[ydec]] += 1.0
else:
dxy = float(distance_between_attractors[idx_x, idx_y])
basin_fragilities[idx_x] += dxy
basin_fragilities[idx_y] += dxy
if is_attr_mask[xdec]:
attractor_fragilities[idx_x] += dxy
if is_attr_mask[ydec]:
attractor_fragilities[idx_y] += dxy
# ------------------------------------------------------------------
# 5) Normalization
# ------------------------------------------------------------------
basin_counts = basin_sizes * float(n_states)
if GET_STRATIFIED_COHERENCES:
n_states_with_specific_distance_from_attractor //= self.N
for i in range(n_attractors):
if basin_counts[i] > 0.0:
basin_coherences[i] /= basin_counts[i] * self.N
basin_fragilities[i] /= basin_counts[i] * self.N
if len_attractors[i] > 0:
attractor_coherences[i] /= len_attractors[i] * self.N
attractor_fragilities[i] /= len_attractors[i] * self.N
if GET_STRATIFIED_COHERENCES:
for d in range(max_distance_from_attractor+1):
if n_states_with_specific_distance_from_attractor[i,d] > 0.0:
stratified_coherences[i,d] /= n_states_with_specific_distance_from_attractor[i,d] * self.N
else:
stratified_coherences[i,d] = np.nan
coherence = float(np.dot(basin_sizes, basin_coherences))
fragility = float(np.dot(basin_sizes, basin_fragilities))
# ------------------------------------------------------------------
# Final return
# ------------------------------------------------------------------
return_dict = {
"Attractors": attractors,
"NumberOfAttractors": int(n_attractors),
"BasinSizes": basin_sizes,
"AttractorID": attractor_id,
"Coherence": coherence,
"Fragility": fragility,
"BasinCoherence": basin_coherences,
"BasinFragility": basin_fragilities,
"AttractorCoherence": attractor_coherences,
"AttractorFragility": attractor_fragilities,
}
if GET_STRATIFIED_COHERENCES:
return_dict['StratifiedCoherences'] = stratified_coherences
return_dict['DistanceFromAttractorCount'] = n_states_with_specific_distance_from_attractor
return_dict['DistanceFromAttractor'] = distances_from_attractor
return return_dict
[docs]
def get_attractors_and_robustness_measures_synchronous(
self,
number_different_IC: int = 500,
RETURN_ATTRACTOR_COHERENCE: bool = True,
*,
rng=None,
) -> dict:
"""
Approximate attractors and robustness measures under synchronous updating.
This method samples the attractor landscape by simulating the network from
multiple random initial conditions (ICs) and their single-bit perturbations.
It returns Monte-Carlo approximations of global coherence, fragility, and a
final Hamming-distance-based measure, along with per-basin approximations.
Optionally, it additionally estimates attractor-level coherence and fragility
by perturbing attractor states found during sampling.
Notes
-----
- The attractor set returned is a *lower bound* on the true number of
attractors, because only the sampled portion of state space is explored.
- For ``N >= 64``, decimal encoding of states may exceed ``np.int64`` and
this method uses bitstrings (type ``str``) as state identifiers.
Parameters
----------
number_different_IC : int, optional
Number of random initial conditions to sample (default is 500). For each
IC, the method also simulates one randomly chosen single-bit perturbation.
RETURN_ATTRACTOR_COHERENCE : bool, optional
If True (default), also compute attractor-level coherence and fragility
by perturbing attractor states found during sampling.
rng : None or numpy.random.Generator, optional
Random number generator or seed-like object. Passed to
``utils._coerce_rng``.
Returns
-------
dict
Dictionary with keys:
- Attractors : list[list[int]] or list[list[str]]
List of discovered attractors, each represented as a list of states
forming a cycle. States are decimals (``int``) for ``N < 64`` and
bitstrings (``str``) for ``N >= 64``.
- LowerBoundOfNumberOfAttractors : int
Number of distinct attractors discovered (a lower bound on the true
number of attractors).
- BasinSizesApproximation : np.ndarray[float]
Approximate basin size (fraction of sampled trajectories that end in
each attractor). Sums to ~1 over discovered attractors.
- CoherenceApproximation : float
Approximate global coherence: probability that a random IC and its
single-bit perturbation reach the same attractor.
- FragilityApproximation : float
Approximate global fragility: expected normalized difference between
reached attractors when the IC and perturbation reach different
attractors. Normalized by ``N``.
- FinalHammingDistanceApproximation : float
Approximate final Hamming distance between the two periodic
trajectories when comparing the IC and its perturbation. This is a
*distance* in [0, 1], where 0 means identical and 1 means completely
different.
- BasinCoherenceApproximation : np.ndarray[float]
Approximate coherence per basin (same definition as coherence but
conditioned on having reached that basin).
- BasinFragilityApproximation : np.ndarray[float]
Approximate fragility per basin (same definition as fragility but
conditioned on having reached that basin).
- AttractorCoherence : np.ndarray[float], optional
If ``RETURN_ATTRACTOR_COHERENCE`` is True: estimated attractor-level
coherence (probability that a single-bit perturbation of an attractor
state returns to the same attractor).
- AttractorFragility : np.ndarray[float], optional
If ``RETURN_ATTRACTOR_COHERENCE`` is True: estimated attractor-level
fragility based on differences between the original attractor and the
attractor reached after perturbation.
References
----------
Park, K. H., Costa, F. X., Rocha, L. M., Albert, R., & Rozum, J. C. (2023).
Models of cell processes are far from the edge of chaos. PRX Life, 1(2), 023009.
Bavisetty, V. S. N., Wheeler, M., & Kadelka, C. (2025).
Attractors are less stable than their basins: Canalization creates a coherence
gap in gene regulatory networks. bioRxiv 2025-11.
"""
rng = utils._coerce_rng(rng)
def lcm(a: int, b: int) -> int:
return abs(a * b) // math.gcd(a, b)
# ------------------------------------------------------------------
# Initialization
# ------------------------------------------------------------------
dictF = {}
attractors = []
ICs_per_attractor_state = []
basin_sizes = []
attractor_dict = {}
attractor_state_dict = []
distance_from_attractor_state_dict = []
counter_phase_shifts = []
powers_of_2s = [
np.asarray([2**i for i in range(NN)][::-1], dtype=np.int64)
for NN in range(max(self.indegrees) + 1)
]
if self.N < 64:
powers_of_2 = np.asarray([2**i for i in range(self.N)][::-1], dtype=np.int64)
robustness_approximation = 0
fragility_sum = 0.0
basin_robustness = defaultdict(float)
basin_fragility = defaultdict(float)
final_hamming_distance_approximation = 0.0
mean_states_attractors = []
states_attractors = []
# ------------------------------------------------------------------
# Sampling phase
# ------------------------------------------------------------------
for _ in range(number_different_IC):
index_attractors = []
index_within_attr = []
dist_from_attr = []
for j in range(2):
if j == 0:
x = rng.integers(2, size=self.N, dtype=np.uint8)
if self.N < 64:
xdec = int(np.dot(x, powers_of_2))
else:
xdec = "".join(str(int(b)) for b in x)
x_old = x.copy()
else:
x = x_old.copy()
bit = int(rng.integers(self.N))
x[bit] ^= 1
if self.N < 64:
xdec = int(np.dot(x, powers_of_2))
else:
xdec = "".join(str(int(b)) for b in x)
queue = [xdec]
try:
idx_attr = attractor_dict[xdec]
except KeyError:
while True:
try:
fxdec = dictF[xdec]
except KeyError:
fx = np.empty(self.N, dtype=np.uint8)
for jj in range(self.N):
if self.indegrees[jj] > 0:
fx[jj] = self.F[jj].f[
int(
np.dot(
x[self.I[jj]],
powers_of_2s[self.indegrees[jj]],
)
)
]
else:
fx[jj] = self.F[jj].f[0]
if self.N < 64:
fxdec = int(np.dot(fx, powers_of_2))
else:
fxdec = "".join(str(int(b)) for b in fx)
dictF[xdec] = fxdec
try:
idx_attr = attractor_dict[fxdec]
idx_state = attractor_state_dict[idx_attr][fxdec]
dist_state = distance_from_attractor_state_dict[idx_attr][fxdec]
attractor_dict.update({q: idx_attr for q in queue})
attractor_state_dict[idx_attr].update(
{q: idx_state for q in queue}
)
distance_from_attractor_state_dict[idx_attr].update(
{
q: d
for q, d in zip(
queue,
range(len(queue) + dist_state, dist_state, -1),
)
}
)
break
except KeyError:
if fxdec in queue:
idx = queue.index(fxdec)
idx_attr = len(attractors)
attractors.append(queue[idx:])
basin_sizes.append(1)
ICs_per_attractor_state.append(
[0] * len(attractors[-1])
)
counter_phase_shifts.append(
[0] * len(attractors[-1])
)
attractor_dict.update({q: idx_attr for q in queue})
attractor_state_dict.append(
{
q: (0 if q in queue[:idx] else queue[idx:].index(q))
for q in queue
}
)
distance_from_attractor_state_dict.append(
{
q: (idx - queue.index(q))
if q in queue[:idx]
else 0
for q in queue
}
)
if len(attractors[-1]) == 1:
fp = (
np.asarray(
utils.dec2bin(queue[idx], self.N),
dtype=np.float64,
)
if self.N < 64
else np.asarray(list(queue[idx]), dtype=np.float64)
)
states_attractors.append(fp.reshape(1, self.N))
mean_states_attractors.append(fp)
else:
lc = (
np.asarray(
[
utils.dec2bin(s, self.N)
for s in queue[idx:]
],
dtype=np.float64,
)
if self.N < 64
else np.asarray(
[list(s) for s in queue[idx:]],
dtype=np.float64,
)
)
states_attractors.append(lc)
mean_states_attractors.append(lc.mean(axis=0))
break
else:
x = fx.copy()
queue.append(fxdec)
xdec = fxdec
index_attractors.append(idx_attr)
index_within_attr.append(attractor_state_dict[idx_attr][xdec])
dist_from_attr.append(
distance_from_attractor_state_dict[idx_attr][xdec]
)
basin_sizes[idx_attr] += 1
ICs_per_attractor_state[idx_attr][
attractor_state_dict[idx_attr][xdec]
] += 1
if index_attractors[0] == index_attractors[1]:
robustness_approximation += 1
basin_robustness[index_attractors[0]] += 1
ps = max(index_within_attr) - min(index_within_attr)
counter_phase_shifts[index_attractors[0]][ps] += 1
else:
d = np.sum(
np.abs(
mean_states_attractors[index_attractors[0]]
- mean_states_attractors[index_attractors[1]]
)
)
fragility_sum += d
basin_fragility[index_attractors[0]] += d
L = lcm(
len(attractors[index_attractors[0]]),
len(attractors[index_attractors[1]]),
)
s0 = states_attractors[index_attractors[0]]
s1 = states_attractors[index_attractors[1]]
p0 = np.tile(s0, (L // len(s0) + 1, 1))[
index_within_attr[0] : index_within_attr[0] + L
]
p1 = np.tile(s1, (L // len(s1) + 1, 1))[
index_within_attr[1] : index_within_attr[1] + L
]
final_hamming_distance_approximation += np.mean(p0 == p1)
# ------------------------------------------------------------------
# Aggregation
# ------------------------------------------------------------------
lower_bound_number_of_attractors = len(attractors)
approximate_basin_sizes = (
np.asarray(basin_sizes, dtype=np.float64)
/ (2.0 * float(number_different_IC))
)
approximate_coherence = robustness_approximation / float(number_different_IC)
approximate_fragility = fragility_sum / float(number_different_IC) / float(self.N)
approximate_basin_coherence = np.asarray(
[
2.0 * basin_robustness[i] / basin_sizes[i]
for i in range(lower_bound_number_of_attractors)
],
dtype=np.float64,
)
approximate_basin_fragility = np.asarray(
[
2.0 * basin_fragility[i] / basin_sizes[i] / float(self.N)
for i in range(lower_bound_number_of_attractors)
],
dtype=np.float64,
)
final_hamming_distance_approximation /= float(number_different_IC)
results = [
attractors,
lower_bound_number_of_attractors,
approximate_basin_sizes,
approximate_coherence,
approximate_fragility,
final_hamming_distance_approximation,
approximate_basin_coherence,
approximate_basin_fragility,
]
if not RETURN_ATTRACTOR_COHERENCE:
return dict(
zip(
[
"Attractors",
"LowerBoundOfNumberOfAttractors",
"BasinSizesApproximation",
"CoherenceApproximation",
"FragilityApproximation",
"FinalHammingDistanceApproximation",
"BasinCoherenceApproximation",
"BasinFragilityApproximation",
],
results,
)
)
# ------------------------------------------------------------------
# Attractor-level coherence / fragility (FIXED)
# ------------------------------------------------------------------
attractor_coherence = np.zeros(lower_bound_number_of_attractors, dtype=np.float64)
attractor_fragility = np.zeros(lower_bound_number_of_attractors, dtype=np.float64)
attractors_original = attractors[:]
for idx0, attractor in enumerate(attractors_original):
for state in attractor:
for i in range(self.N):
x = (
np.asarray(utils.dec2bin(state, self.N), dtype=np.uint8)
if self.N < 64
else np.asarray(list(state), dtype=np.uint8)
)
x[i] ^= 1
if self.N < 64:
xdec = int(np.dot(x, powers_of_2))
else:
xdec = "".join(str(int(b)) for b in x)
try:
idx1 = attractor_dict[xdec]
except KeyError:
# --- safe forward-walk without touching basin counts
queue = [xdec]
x_local = x.copy()
while True:
try:
fxdec = dictF[xdec]
except KeyError:
fx = np.empty(self.N, dtype=np.uint8)
for jj in range(self.N):
if self.indegrees[jj] > 0:
fx[jj] = self.F[jj].f[
int(
np.dot(
x_local[self.I[jj]],
powers_of_2s[self.indegrees[jj]],
)
)
]
else:
fx[jj] = self.F[jj].f[0]
if self.N < 64:
fxdec = int(np.dot(fx, powers_of_2))
else:
fxdec = "".join(str(int(b)) for b in fx)
dictF[xdec] = fxdec
if fxdec in attractor_dict:
idx1 = attractor_dict[fxdec]
break
if fxdec in queue:
idx1 = len(attractors)
attractors.append(queue[queue.index(fxdec):])
attractor_dict.update(
{q: idx1 for q in queue}
)
break
queue.append(fxdec)
xdec = fxdec
x_local = fx.copy()
if idx0 == idx1:
attractor_coherence[idx0] += 1.0
else:
attractor_fragility[idx0] += np.sum(
np.abs(
mean_states_attractors[idx0]
- mean_states_attractors[idx1]
)
)
attractor_coherence /= (
float(self.N)
* np.asarray(list(map(len, attractors_original)), dtype=np.float64)
)
attractor_fragility /= (
float(self.N) ** 2
* np.asarray(list(map(len, attractors_original)), dtype=np.float64)
)
results[0] = attractors_original
return dict(
zip(
[
"Attractors",
"LowerBoundOfNumberOfAttractors",
"BasinSizesApproximation",
"CoherenceApproximation",
"FragilityApproximation",
"FinalHammingDistanceApproximation",
"BasinCoherenceApproximation",
"BasinFragilityApproximation",
"AttractorCoherence",
"AttractorFragility",
],
results + [attractor_coherence, attractor_fragility],
)
)
[docs]
def get_derrida_value(
self,
nsim: int = 1000,
EXACT: bool = False,
USE_NUMBA: bool = True,
*,
rng=None,
) -> float:
"""
Compute the Derrida value of a Boolean network.
The Derrida value measures the average Hamming distance between the
one-step synchronous updates of two states that differ by a single-bit
perturbation. It quantifies the short-term sensitivity of the network
dynamics to small perturbations.
If ``EXACT`` is True, the Derrida value is computed exactly as the mean
(unnormalized) average sensitivity of the Boolean update functions.
Otherwise, it is approximated via Monte Carlo simulation.
Parameters
----------
nsim : int, optional
Number of Monte Carlo simulations to perform (default is 1000).
Ignored if ``EXACT`` is True.
EXACT : bool, optional
If True, compute the exact Derrida value. If False (default),
approximate the Derrida value using Monte Carlo simulation.
USE_NUMBA : bool, optional
If True (default) and Numba is available, use a compiled kernel for
Monte Carlo simulation.
rng : None or np.random.Generator, optional
Random number generator, passed through ``utils._coerce_rng``.
Returns
-------
float
The Derrida value, defined as the average Hamming distance after
one synchronous update following a single-bit perturbation.
References
----------
Derrida, B., & Pomeau, Y. (1986).
Random networks of automata: a simple annealed approximation.
*Europhysics Letters*, 1(2), 45.
"""
# ------------------------------------------------------------------
# Exact computation
# ------------------------------------------------------------------
if EXACT:
return float(
np.mean(
[
bf.get_average_sensitivity(
EXACT=True, NORMALIZED=False
)
for bf in self.F
]
)
)
# ------------------------------------------------------------------
# Monte Carlo approximation
# ------------------------------------------------------------------
rng = utils._coerce_rng(rng)
if __LOADED_NUMBA__ and USE_NUMBA:
# Prepare Numba-friendly inputs
F_array_list = List(
[np.asarray(bf.f, dtype=np.uint8) for bf in self.F]
)
I_array_list = List(
[np.asarray(regs, dtype=np.int64) for regs in self.I]
)
seed = int(rng.integers(0, 2**31 - 1))
return float(
_derrida_simulation(
F_array_list,
I_array_list,
int(self.N),
int(nsim),
seed,
)
)
# ------------------------------------------------------------------
# Pure Python fallback
# ------------------------------------------------------------------
total_dist: float = 0.0
for _ in range(int(nsim)):
x = rng.integers(0, 2, size=self.N, dtype=np.uint8)
y = x.copy()
idx = int(rng.integers(0, self.N))
y[idx] ^= np.uint8(1)
fx = np.asarray(
self._update_network_synchronously_unchecked(x),
dtype=np.uint8,
)
fy = np.asarray(
self._update_network_synchronously_unchecked(y),
dtype=np.uint8,
)
total_dist += float(np.sum(fx != fy))
return float(total_dist / float(nsim))