#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Aug 13 11:08:44 2025
@author: Benjamin Coberly, Claus Kadelka
"""
import itertools
import math
from collections import defaultdict
from copy import deepcopy
import numpy as np
import networkx as nx
import pandas as pd
from typing import Union, Optional
try:
import boolforge.utils as utils
from boolforge.boolean_function import BooleanFunction
except ModuleNotFoundError:
import utils as utils
from boolean_function import BooleanFunction
try:
import cana.boolean_network
__LOADED_CANA__=True
except ModuleNotFoundError:
print('The module cana cannot be found. Ensure it is installed to use all functionality of this toolbox.')
__LOADED_CANA__=False
try:
from numba import njit
from numba.typed import List
__LOADED_NUMBA__=True
except ModuleNotFoundError:
print('The module numba cannot be found. Ensure it is installed to increase the run time of critical code in this toolbox.')
__LOADED_NUMBA__=False
[docs]
def get_entropy_of_basin_size_distribution(basin_sizes : Union[list, np.array]) -> float:
"""
Compute the Shannon entropy of the basin size distribution.
This function calculates the Shannon entropy of a probability distribution derived from the basin sizes.
First, the basin sizes are normalized to form a probability distribution, and then the entropy is computed
using the formula: H = - sum(p_i * log(p_i)), where p_i is the proportion of the basin size i.
**Parameters:**
- basin_sizes (list | np.array): A list where each element
represents the size of a basin, i.e., the number of initial
conditions that converge to a particular attractor.
**Returns:**
- float: The Shannon entropy of the basin size distribution.
"""
total = sum(basin_sizes)
probabilities = [size * 1.0 / total for size in basin_sizes]
return sum([-np.log(p) * p for p in probabilities])
if __LOADED_NUMBA__:
@njit(fastmath=True) #can safely use fastmath because computations are integers only
def _update_network_synchronously_numba(x, F_array_list, I_array_list, N):
"""
Compute one synchronous network update for a given binary state vector x.
Returns a new binary vector (uint8).
"""
fx = np.empty(N, dtype=np.uint8)
for j in range(N):
regulators = I_array_list[j]
if regulators.shape[0] == 0:
fx[j] = F_array_list[j][0]
else:
n_reg = regulators.shape[0]
idx = 0
# convert substate bits → integer index
for k in range(n_reg):
idx = (idx << 1) | x[regulators[k]]
fx[j] = F_array_list[j][idx]
return fx
@njit
def _compute_synchronous_stg_numba(F_list, I_list, N_variables):
"""
Compute synchronous state transition graph (STG)
in a fully numba-jitted function.
"""
nstates = 2 ** N_variables
states = np.zeros((nstates, N_variables), dtype=np.uint8)
for i in range(nstates):
# binary representation of i
for j in range(N_variables):
states[i, N_variables - 1 - j] = (i >> j) & 1
next_states = np.zeros_like(states)
powers_of_two = 2 ** np.arange(N_variables - 1, -1, -1)
# Compute next state for each node
for j in range(N_variables):
regulators = I_list[j]
if len(regulators) == 0:
# constant node
next_states[:, j] = F_list[j][0]
continue
n_reg = len(regulators)
reg_powers = 2 ** np.arange(n_reg - 1, -1, -1)
for s in range(nstates):
idx = 0
for k in range(n_reg):
idx += states[s, regulators[k]] * reg_powers[k]
next_states[s, j] = F_list[j][idx]
# Convert each next state to integer index
next_indices = np.zeros(nstates, dtype=np.int64)
for s in range(nstates):
val = 0
for j in range(N_variables):
val += next_states[s, j] * powers_of_two[j]
next_indices[s] = val
return next_indices
@njit
def _compute_synchronous_stg_numba_low_memory(F_array_list, I_array_list, N_variables):
"""
Compute synchronous state transition graph (STG) without storing all states.
For each integer state i in [0, 2^N):
- decode i into its binary vector
- compute its next state vector
- encode back to integer
"""
nstates = 2 ** N_variables
next_indices = np.zeros(nstates, dtype=np.int64)
powers_of_two = 2 ** np.arange(N_variables - 1, -1, -1)
state = np.zeros(N_variables, dtype=np.uint8)
next_state = np.zeros(N_variables, dtype=np.uint8)
for i in range(nstates):
# --- Decode i into binary vector (most-significant bit first)
tmp = i
for j in range(N_variables):
state[N_variables - 1 - j] = tmp & 1
tmp >>= 1
# --- Compute next-state values
for j in range(N_variables):
regulators = I_array_list[j]
if regulators.shape[0] == 0:
next_state[j] = F_array_list[j][0]
else:
n_reg = regulators.shape[0]
idx = 0
for k in range(n_reg):
idx = (idx << 1) | state[regulators[k]]
next_state[j] = F_array_list[j][idx]
# --- Encode next_state back to integer
val = 0
for j in range(N_variables):
val += next_state[j] * powers_of_two[j]
next_indices[i] = val
return next_indices
@njit(fastmath=True) #can safely use fastmath because computations are integers only
def _hamming_distance(a, b):
"""Fast Hamming distance for uint8 arrays."""
dist = 0
for i in range(a.size):
dist += a[i] != b[i]
return dist
@njit(fastmath=True) #can safely use fastmath because computations are integers only
def _derrida_simulation(F_array_list, I_array_list, N, nsim, seed):
"""
Monte Carlo loop for Derrida value, using Numba-compatible RNG.
"""
# Numba RNG: seed once
np.random.seed(seed)
total_dist = 0.0
X = np.empty(N, dtype=np.uint8)
Y = np.empty(N, dtype=np.uint8)
for _ in range(nsim):
# Random initial state
for i in range(N):
X[i] = np.random.randint(0, 2)
Y[:] = X
# Flip one random bit
idx = np.random.randint(0, N)
Y[idx] = 1 - Y[idx]
# Synchronous updates
FX = _update_network_synchronously_numba(X, F_array_list, I_array_list, N)
FY = _update_network_synchronously_numba(Y, F_array_list, I_array_list, N)
total_dist += _hamming_distance(FX, FY)
return total_dist / nsim
[docs]
class WiringDiagram(object):
"""
A class representing a Wiring Diagram
**Constructor Parameters:**
- I (list[list[int]] | np.ndarray[list[int]]): A list of N lists
representing the regulators (or inputs) for each Boolean function.
- variables (list[str] | np.array[str], optional): A list of N strings
representing the names of each variable, default = None.
- weights (list[list[int | np.nan]]): #TODO
**Members:**
- I (list[np.array[int]]): As passed by the constructor.
- variables (np.array[str]): As passed by the constructor.
- N_variables (int): The number of variables in the Boolean network.
- N_constants (int): The number of constants in the Boolean network.
- N (int): The number of variables and constants in the Boolean network.
- indegrees (list[int]): The indegrees for each node.
- outdegrees (list[int]): The outdegrees of each node.
- weights (list[list[int | np.nan]]): As passed by the constructor.
"""
def __init__(self, I : Union[list, np.ndarray],
variables : Union[list, np.array, None] = None, weights = None):
assert isinstance(I, (list, np.ndarray)), "I must be an array"
#assert (len(I[i]) == ns[i] for i in range(len(ns))), "Malformed wiring diagram I"
assert variables is None or len(I)==len(variables), "len(I)==len(variables) required if variable names are provided"
assert weights is None or True, "weights assertion" # TODO: if weights are given, they must be valid
self.I = [np.array(regulators,dtype=int) for regulators in I]
self.N = len(I)
self.indegrees = np.array(list(map(len, self.I)))
if variables is None:
variables = ['x'+str(i) for i in range(self.N)]
self.N_constants = len(self.get_constants(False))
self.N_variables = self.N - self.N_constants
self.variables = np.array(variables)
self.outdegrees = self.get_outdegrees()
self.weights = weights
[docs]
@classmethod
def from_DiGraph(cls, nx_DiGraph : "nx.DiGraph") -> "WiringDiagram":
"""
**Compatibility Method:**
Converts a `networkx.DiGraph` instance into a `WiringDiagram` object.
Each node in the DiGraph represents a Boolean variable, and each
directed edge (u → v) indicates that variable `u` regulates variable `v`.
**Parameters:**
- nx_DiGraph (nx.DiGraph): A directed graph where edges represent
regulatory influences (u → v).
- Node attributes (optional):
- `'name'`: a string name of the variable (defaults to node label).
- `'weight'`: numerical edge weights (stored in `weights` matrix, optional).
**Returns:**
- WiringDiagram: An instance constructed from the graph structure.
**Example:**
>>> import networkx as nx
>>> G = nx.DiGraph()
>>> G.add_edges_from([(0, 1), (1, 2), (2, 0)])
>>> WD = WiringDiagram.from_DiGraph(G)
>>> WD.I
[array([2]), array([0]), array([1])]
>>> WD.variables
array(['x0', 'x1', 'x2'], dtype='<U2')
"""
#BEN: #TODO
# Ensure input is a DiGraph
assert isinstance(nx_DiGraph, nx.DiGraph), "Input must be a networkx.DiGraph instance."
# Sort nodes to ensure deterministic ordering
nodes = list(nx_DiGraph.nodes)
N = len(nodes)
# Extract variable names, defaulting to "x0", "x1", ...
variables = []
for node in nodes:
if isinstance(node, str):
variables.append(node)
elif 'name' in nx_DiGraph.nodes[node]:
variables.append(nx_DiGraph.nodes[node]['name'])
# Build regulator list I: for each node i, collect its predecessors (inputs)
I = []
for node in nodes:
regulators = list(nx_DiGraph.predecessors(node))
# Convert regulators to integer indices if nodes are not already 0..N-1
if not all(isinstance(r, int) for r in regulators):
regulators = [nodes.index(r) for r in regulators]
I.append(regulators)
# Optional: extract weights if available
weights = None
has_weights = all('weight' in nx_DiGraph[u][v] for u, v in nx_DiGraph.edges)
if has_weights:
weights = []
for node in nodes:
regs = list(nx_DiGraph.predecessors(node))
w = [nx_DiGraph[u][node]['weight'] for u in regs]
weights.append(w)
# Instantiate WiringDiagram
return cls(I=I, variables=variables, weights=weights)
def __getitem__(self, index):
return self.I[index]
[docs]
def get_outdegrees(self) -> np.array:
"""
Returns the outdegree of each node.
**Returns:**
- np.array[int]: Outdegree of each node.
"""
outdegrees = np.zeros(self.N, int)
for regulators in self.I:
for regulator in regulators:
outdegrees[regulator] += 1
return outdegrees
[docs]
def get_constants(self, AS_DICT : bool = True) -> Union[dict, np.array]:
"""
Identify constants in a Boolean network.
A node is considered a constant if it has no regulators.
**Parameters:**
- AS_DICT (bool, optional): Whether to return the indices of constants
as a dictionary or array. If true, returns as a dictionary. Defaults
to True.
**Returns:**
If AS_DICT is True:
- dict[int:bool]: Dictionary determining if an index is a
constant or not.
else:
- np.array[int]: Array of node indices that are constants.
"""
rlI = range(len(self.I))
is_constant = [self.indegrees[i] == 0 for i in rlI]
if AS_DICT:
return dict(zip(rlI, is_constant))
return np.where(is_constant)[0]
[docs]
def get_strongly_connected_components(self) -> list:
"""
Determine the strongly connected components of a wiring diagram.
**Returns:**
- list[set[int]]: A list of sets, each representing a strongly
connected component.
"""
edges_wiring_diagram = []
for target, regulators in enumerate(self.I):
for regulator in regulators:
edges_wiring_diagram.append((int(regulator), target))
subG = nx.from_edgelist(edges_wiring_diagram, create_using=nx.MultiDiGraph())
return [scc for scc in nx.strongly_connected_components(subG)]
[docs]
def get_modular_structure(self):
"""
Determine the modular structure of a Boolean network.
The modular structure is defined by a directed acyclic graph (DAG) whose
nodes are the strongly connected components (SCCs) of the underlying wiring
diagram and whose directed edges indicate a regulation from one SCC to another SCC.
**Returns:**
- set[tuple[int]]: A set of edges, describing a directed acyclic graph
indicating the regulations between modules (i.e., strongly connected
components of the underlying wiring diagram).
"""
sccs = self.get_strongly_connected_components()
scc_dict = {}
for j,s in enumerate(sccs):
for el in s:
scc_dict.update({el:j})
dag = set()
for target,regulators in enumerate(self.I):
for regulator in regulators:
edge = (scc_dict[regulator],scc_dict[target])
if edge[0]!=edge[1] and (self.weights is None or not np.isnan(self.weights[target][list(self.weights[target]).index(regulator)])):
dag.add(edge)
return dag
# def get_signed_effective_graph(self, type_of_each_regulation : list,
# constants : list = [], IGNORE_SELFLOOPS : bool = False,
# IGNORE_CONSTANTS : bool = True) -> np.array:
# """
# Construct the signed effective graph of a Boolean network.
# This function computes an effective graph in which each edge is
# weighted by its effectiveness. Effectiveness is obtained via
# get_edge_effectiveness on the corresponding Boolean function. Edges
# are signed according to the type of regulation ('increasing' or
# 'decreasing').
# **Parameters:**
# - type_of_each_regulation (list[str]): List of lists specifying
# the type of regulation for each edge.
# - constants (list[int], optional): List of constant nodes.
# - IGNORE_SELFLOOPS (bool, optional): If True, self-loops are ignored.
# - IGNORE_CONSTANTS (bool, optional): If True, constant nodes
# are excluded.
# **Returns:**
# - np.array[float]: The signed effective graph as a matrix of edge
# effectiveness values.
# """
# n = len(self.I)
# n_constants = len(constants)
# if IGNORE_CONSTANTS:
# m = np.zeros((n - n_constants, n - n_constants), dtype=float)
# for i, (regulators, type_of_regulation) in enumerate(zip(self.I, type_of_each_regulation)):
# effectivenesses = self.F[i].get_edge_effectiveness() #TODO: F does not exist here
# for j, t, e in zip(regulators, type_of_regulation, effectivenesses):
# if j < n - n_constants and (not IGNORE_SELFLOOPS or i != j):
# if t == 'increasing':
# m[j, i] = e
# elif t == 'decreasing':
# m[j, i] = -e
# else:
# m[j, i] = np.nan
# return m
# else:
# return self.get_signed_effective_graph(type_of_each_regulation, [], IGNORE_CONSTANTS=True)
[docs]
def get_ffls(self) -> Union[tuple, list]:
"""
Identify feed-forward loops (FFLs) in a Boolean network based solely
on the wiring diagram.
The function uses the inverted wiring diagram to identify common
targets and returns the FFLs found. If types_I (the type of each
regulation) is provided, it also returns the corresponding regulation
types.
**Parameters:**
- types_I (list[list[str]], optional): List of lists specifying
the type (e.g., 'increasing' or 'decreasing') for each regulation.
**Returns:**
If self.weights is not None:
- tuple[list[int], list[int]]: (ffls, types) where ffls is a
list of identified FFLs (each as a list [master regulator,
intermediate, target]), and types is a list of regulation type
triplets (master -> target, master -> intermediate,
intermediate -> target).
Otherwise:
- list[list[int]]: A list of identified FFLs.
"""
I_inv = [[] for _ in self.N]
for target, regulators in enumerate(self.I):
for regulator in regulators:
I_inv[regulator].append(target)
ffls = []
types = []
for i in range(self.N): # master regulators
for j in I_inv[i]:
if i == j:
continue
common_targets = list(set(I_inv[i]) & set(I_inv[j]))
for k in common_targets:
if j == k or i == k:
continue
ffls.append([i, j, k])
if self.weights is not None:
direct = self.weights[k][self.I[k].index(i)]
indirect1 = self.weights[j][self.I[j].index(i)]
indirect2 = self.weights[k][self.I[k].index(j)]
types.append([direct, indirect1, indirect2])
if self.weights is not None:
return (ffls, types)
else:
return ffls
[docs]
def generate_networkx_graph(self) -> nx.DiGraph:
"""
Generate a NetworkX directed graph from a wiring diagram.
Nodes are labeled with variable names (from variables) and constant
names (from constants). Edges are added from each regulator to its
target based on the wiring diagram I.
**Parameters:**
- constants (list[str]): List of constant names.
- variables (list[str]): List of variable names.
**Returns:**
- networkx.DiGraph: The wiring diagram as directed graph.
"""
G = nx.DiGraph()
G.add_nodes_from(self.variables)
G.add_edges_from([(self.variables[self.I[i][j]], self.variables[i]) for i in range(self.N) for j in range(self.indegrees[i])])
return G
[docs]
def generate_networkx_graph_from_edges(self, n_variables : int) -> nx.DiGraph:
"""
Generate a NetworkX directed graph from an edge list derived from the
wiring diagram.
Only edges among the first n_variables (excluding constant self-loops)
are included.
**Parameters:**
- n_variables (int): Number of variable nodes (constants are
excluded).
**Returns:**
- networkx.DiGraph: The generated directed graph.
"""
edges = []
for j, regulators in enumerate(self.I):
if j >= n_variables: # Exclude constant self-loops
break
for i in regulators:
edges.append((i, j))
return nx.DiGraph(edges)
[docs]
def get_type_of_loop(self, loop : list) -> list:
"""
Determine the regulation types along a feedback loop.
For a given loop (a list of node indices), this function returns a
list containing the type (e.g., 'increasing' or 'decreasing') of each
regulation along the loop. The loop is assumed to be ordered such that
the first node is repeated at the end.
**Parameters:**
- loop (list[int]): List of node indices representing the loop.
**Returns:**
- list[int]: A list of regulation types corresponding to each edge
in the loop.
"""
n = len(loop)
dummy = loop[:]
dummy.append(loop[0])
res = []
for i in range(n):
# Assumes is_monotonic returns a tuple with the monotonicity information.
#TODO: F does not exist here
res.append(self.F[dummy[i+1]].is_monotonic(True)[1][list(self.I[dummy[i+1]]).index(dummy[i])])
return res
[docs]
class BooleanNetwork(WiringDiagram):
"""
A class representing a Boolean network with N variables.
**Constructor Parameters:**
- F (list[BooleanFunction | list[int]] | np.ndarray[BooleanFunction |
list[int]]): A list of N Boolean functions, or of N lists of length
2^n representing the outputs of a Boolean function with n inputs.
- I (list[list[int]] | np.ndarray[list[int]] | WiringDiagram):
A list of N lists representing the regulators (or inputs) for each
Boolean function.
- variables (list[str] | np.array[str], optional): A list of N strings
representing the names of each variable, default = None.
- SIMPLIFY_FUNCTIONS (bool, optional): Constructs this Boolean Network
to only include its essential components. Defaults to False
**Members:**
- F (list[BooleanFunction]): As passed by the constructor.
- I (list[np.array[int]]): As passed by the constructor.
- variables (np.array[str]): As passed by the constructor.
- N (int): The number of variables in the Boolean network.
- N_constants (int): The number of constants in the Boolean network.
- size (int): The number of variables and constants in the Boolean network.
- indegrees (list[int]): The indegrees for each node.
- outdegrees (list[int]): The outdegrees of each node.
- STG (dict): The state transition graph.
- weights (np.array[float] | None): Inherited from WiringDiagram. Default None.
"""
def __init__(self, F : Union[list, np.ndarray], I : Union[list, np.ndarray, WiringDiagram],
variables : Union[list, np.array, None] = None,
SIMPLIFY_FUNCTIONS : Optional[bool] = False):
assert isinstance(F, (list, np.ndarray)), "F must be an array or list."
assert isinstance(I, (list, np.ndarray, WiringDiagram)), "I must be an array or list, or an instance of WiringDiagram."
if isinstance(I, (list, np.ndarray)):
super().__init__(I, variables)
else:
if variables is not None:
print('Warning: Values of provided variables ignored. Variales of WiringDiagram I used instead.')
super().__init__(I.I, I.variables)
assert len(F)==self.N, "len(F)==len(I) required"
self.F = []
for ii,f in enumerate(F):
if isinstance(f, (list, np.ndarray, str)):
self.F.append(BooleanFunction(f,name = self.variables[ii]))
elif isinstance(f, BooleanFunction):
f.name = self.variables[ii]
self.F.append(f)
else:
raise TypeError(f"F holds invalid data type {type(f)} : Expected either list, np.array, or BooleanFunction")
if not hasattr(self, 'constants'): #keeps track of all constants and nodes set to constants
self.constants = {}
if self.N_constants > 0:
self.remove_constants()
self.STG = None
if SIMPLIFY_FUNCTIONS:
self.simplify_functions()
[docs]
def remove_constants(self, values_constants : Optional[list] = None) -> None:
"""
Removes constants from this Boolean network.
**Parameters:**
- values_constants (list, optional): The values to fix for each constant
node in the network. If None, takes the value provided by the constant
function.
"""
if values_constants is None:
indices_constants = self.get_constants(AS_DICT=False)
dict_constants = self.get_constants(AS_DICT=True)
values_constants = [self.F[c][0] for c in indices_constants]
else:
indices_constants = self.get_source_nodes(AS_DICT=False)
dict_constants = self.get_source_nodes(AS_DICT=True)
assert len(values_constants)==len(indices_constants),'The network contains {len(indices_constants)} source nodes but {len(values_constants)} values were provided.'
#self.constants = dict(zip(self.variables[indices_constants],values_constants))
for id_constant,value in zip(indices_constants,values_constants):
regulated_nodes = []
for i in range(self.N): # for all variables
if dict_constants[i]:
continue
try:
index = list(self.I[i]).index(id_constant) #check if the constant is part of regulators
except ValueError:
continue
truth_table = utils.get_left_side_of_truth_table(self.indegrees[i])
indices_to_keep = np.where(truth_table[:,index]==value)[0]
self.F[i].f = self.F[i].f[indices_to_keep]
if self.weights is not None:
self.weights[i] = self.weights[i][self.I[i]!=id_constant]
self.I[i] = self.I[i][self.I[i]!=id_constant]
self.indegrees[i] -= 1
self.F[i].n -= 1
regulated_nodes.append(self.variables[i])
self.constants[self.variables[id_constant]] = {'value' : value, 'regulatedNodes': regulated_nodes}
for i in range(self.N): #check if any node has lost all its regulators, add an artificial non-essential regulation of the node by itself to avoid deletion of the node
if dict_constants[i]:
continue
if self.indegrees[i] == 0:
self.indegrees[i] = 1
self.F[i].n = 1
self.F[i].f = np.array([self.F[i][0],self.F[i][0]],dtype=int)
self.I[i] = np.array([i],dtype=int)
if self.weights is not None:
self.weights[i] = np.array([np.nan],dtype=int)
self.F = [self.F[i] for i in range(self.N) if dict_constants[i]==False]
adjustment_for_I = np.cumsum([dict_constants[i] for i in range(self.N)])
self.I = [self.I[i]-adjustment_for_I[self.I[i]] for i in range(self.N) if dict_constants[i]==False]
if self.weights is not None:
self.weights = [self.weights[i] for i in range(self.N) if dict_constants[i]==False]
self.variables = [self.variables[i] for i in range(self.N) if dict_constants[i]==False]
self.outdegrees = [self.outdegrees[i] for i in range(self.N) if dict_constants[i]==False]
self.indegrees = [self.indegrees[i] for i in range(self.N) if dict_constants[i]==False]
self.N -= len(indices_constants)
self.N_constants = 0
[docs]
@classmethod
def from_cana(cls, cana_BooleanNetwork : "cana.boolean_network.BooleanNetwork") -> "BooleanNetwork":
"""
**Compatability Method:**
Converts an instance of cana.boolean_network.BooleanNetwork from
the cana module into a Boolforge BooleanNetwork object.
**Returns**:
- A BooleanNetwork object.
"""
F = []
I = []
variables = []
for entry in cana_BooleanNetwork.logic.values():
try:
variables.append(entry['name'])
except KeyError:
pass
try:
F.append(entry['out'])
I.append(entry['in'])
except KeyError:
pass
return cls(F = F, I = I, variables=variables)
[docs]
@classmethod
def from_string(cls, network_string : str, separator : Union[str, list, np.array] = ',',
max_degree : int = 24, original_not : Union[str, list, np.array] = 'NOT',
original_and : Union[str, list, np.array] = 'AND',
original_or : Union[str, list, np.array] = 'OR') -> "BooleanNetwork":
"""
**Compatability Method:**
Converts a string into a Boolforge BooleanNetwork object.
**Returns**:
- A BooleanNetwork object.
"""
sepstr, andop, orop, notop = "@", "∧", "∨", "¬"
get_dummy_var = lambda i: "x%sy"%str(int(i))
# reformat network string
lines = network_string.replace('\t', ' ',).replace('(', ' ( ').replace(')', ' ) ')
def __replace__(string, original, replacement):
if isinstance(original, (list, np.ndarray)):
for s in original:
string = string.replace(s, " %s "%replacement)
elif isinstance(original, str):
string = string.replace(original, " %s "%replacement)
return string
lines = __replace__(lines, separator, sepstr)
lines = __replace__(lines, original_not, notop)
lines = __replace__(lines, original_and, andop)
lines = __replace__(lines, original_or, orop)
lines = lines.splitlines()
# remove empty lines
while '' in lines:
lines.remove('')
# remove comments
for i in range(len(lines)-1, -1, -1):
if lines[i][0] == '#':
lines.pop(i)
n = len(lines)
# find variables and constants
var = ["" for i in range(n)]
for i in range(n):
var[i] = lines[i].split(sepstr)[0].replace(' ', '')
consts_and_vars = []
for line in lines:
words = line.split(' ')
for word in words:
if word not in ['(', ')', sepstr, andop, orop, notop, ''] and not utils.is_float(word):
consts_and_vars.append(word)
consts = list(set(consts_and_vars)-set(var))
dict_var_const = dict(list(zip(var, [get_dummy_var(i) for i in range(len(var))])))
dict_var_const.update(dict(list(zip(consts, [get_dummy_var(i+len(var)) for i in range(len(consts))]))))
# replace all variables and constants with dummy names
for i, line in enumerate(lines):
words = line.split(' ')
for j, word in enumerate(words):
if word not in ['(', ')', sepstr, andop, orop, notop, ''] and not utils.is_float(word):
words[j] = dict_var_const[word]
lines[i] = ' '.join(words)
# update line to only be function
for i in range(n):
lines[i] = lines[i].split(sepstr)[1]
# generate wiring diagram I
I = []
for i in range(n):
try:
idcs_open = utils.find_all_indices(lines[i], 'x')
idcs_end = utils.find_all_indices(lines[i], 'y')
regs = np.sort(np.array(list(map(int,list(set([lines[i][(begin+1):end] for begin,end in zip(idcs_open,idcs_end)]))))))
I.append(regs)
except ValueError:
I.append(np.array([], int))
deg = list(map(len, I))
# generate functions F
F = []
for i in range(n):
if deg[i] == 0:
f = np.array([int(lines[i])], int)
elif deg[i] <= max_degree:
tt = utils.get_left_side_of_truth_table(deg[i])
ldict = { get_dummy_var(I[i][j]) : tt[:, j].astype(bool) for j in range(deg[i]) }
f = eval(lines[i].replace(andop, '&').replace(orop, '|').replace(notop, '~').replace(' ', ''), {"__builtins__" : None}, ldict)
else:
f = np.array([], int)
F.append(f.astype(int))
for i in range(len(consts)):
F.append(np.array([0, 1], int))
I.append(np.array([len(var) + i]))
return cls(F, I, var+consts)
[docs]
def to_cana(self) -> "cana.boolean_network.BooleanNetwork":
"""
**Compatability method:**
Returns an instance of the class cana.BooleanNetwork from the
cana module.
**Returns:**
- An instance of cana.boolean_network.BooleanNetwork
"""
logic_dicts = []
for bf,regulators,var in zip(self.F,self.I,self.variables):
logic_dicts.append({'name':var, 'in': list(regulators), 'out': list(bf.f)})
return cana.boolean_network.BooleanNetwork(Nnodes = self.N, logic = dict(zip(range(self.N),logic_dicts)))
[docs]
def to_bnet(self, separator=',\t', AS_POLYNOMIAL : bool = True) -> str:
"""
**Compatability method:**
Returns a bnet string formatted as a polynomial.
**Parameters:**
- separator (str): A string used to separate the target variable
from the function. Defaults to ',\t'.
- AS_POLYNOMIAL (bool, optional): Determines whether to return
the function as a polynomial or logical expression. If true,
returns as a polynomial, and if false, returns as a logical
expression. Defaults to true.
**Returns:**
- str: A string describing a bnet.
"""
lines = []
constants_indices = self.get_constants()
for i in range(self.N):
if constants_indices[i]:
function = str(self.F[i].f[0])
elif AS_POLYNOMIAL:
function = utils.bool_to_poly(self.F[i], self.variables[self.I[i]])
else:
function = self.F[i].to_expression(" & ", " | ")
lines.append(f'{self.variables[i]}{separator}{function}')
return '\n'.join(lines)
[docs]
def to_truth_table(self,RETURN : bool = True, filename : str = None) -> pd.DataFrame:
"""
Determines the full truth table of the Boolean network as pandas DataFrame.
Each row shows the input combination (x1, x2, ..., xN)
and the corresponding output(s) f(x).
The output is returned as a pandas DataFrame and can optionally be
exported to a file in CSV or Excel format.
**Parameters:**
- RETURN (bool, optional):
Whether to return the truth table as a pandas DataFrame.
Defaults to True.
- filename (str, optional):
If provided, the truth table is written to a file. The file
extension determines the format and must be one of:
`'csv'`, `'xls'`, or `'xlsx'`.
Example: `"truth_table.csv"` or `"truth_table.xlsx"`.
If `None` (default), no file is created.
**Returns:**
- pd.DataFrame: The full truth table with shape (2^N, 2N).
Returned only if `RETURN=True`.
**Notes:**
- The function automatically computes the synchronous
state transition graph (`STG`) if it has not been computed yet.
- Each output row represents a deterministic transition from the
current state to its next state under synchronous updating.
- Exporting to Excel requires the `openpyxl` package to be installed.
"""
columns = [name + '(t)' for name in self.variables]
columns += [name + '(t+1)' for name in self.variables]
if self.STG is None:
self.compute_synchronous_state_transition_graph()
data = np.zeros((2**self.N,2*self.N),dtype=int)
data[:,:self.N] = utils.get_left_side_of_truth_table(self.N)
for i in range(2**self.N):
data[i,self.N:] = utils.dec2bin(self.STG[i],self.N)
truth_table = pd.DataFrame(data,columns=columns)
if filename is not None:
ending = filename.split('.')[-1]
assert ending in ['csv','xls','xlsx'],"filename must end in 'csv','xls', or 'xlsx'"
if ending == 'csv':
truth_table.to_csv(filename)
else:
truth_table.to_excel(filename)
if RETURN:
return truth_table
def __len__(self):
return self.N
[docs]
def __str__(self):
return f"Boolean network of {self.N} nodes with indegrees {self.indegrees}"
def __getitem__(self, index):
return self.F[index]
def __copy__(self):
cls = self.__class__
result = cls.__new__(cls)
result.__dict__.update(self.__dict__)
return result
[docs]
def __call__(self, state):
"""
Perform a synchronous update of a Boolean network.
Each node's new state is determined by applying its Boolean function
to the current states of its regulators.
**Parameters:**
- X (list[int] | np.array[int]): Current state vector of the network.
**Returns:**
- np.array[int]: New state vector after the update.
"""
return self.update_network_synchronously(state)
[docs]
def get_types_of_regulation(self) -> np.array:
"""
Computes the weights of this Boolean network and assigns them to the
weights member variable.
**Returns:**
- weights (np.array): The weights of this network.
"""
weights = []
dict_weights = {'non-essential' : np.nan, 'conditional' : 0, 'positive' : 1, 'negative' : -1}
for bf in self.F:
weights.append(np.array([dict_weights[el] for el in bf.get_type_of_inputs()]))
self.weights = weights
return weights
## Transform Boolean networks
[docs]
def simplify_functions(self) -> None:
"""
Remove all non-essential inputs, i.e., inoperative edges from the Boolean network.
For each node in a Boolean network, represented by its Boolean function
and its regulators, this function extracts the “essential” part of the
function by removing non-essential regulators. The resulting network
contains, for each node, a reduced truth table (with only the essential
inputs) and a corresponding list of essential regulators.
**Returns:**
- BooleanNetwork: A Boolean network object where:
- F is a list of N Boolean functions containing functions of
length 2^(m_i), with m_i ≤ n_i, representing the functions
restricted to the essential regulators.
- I is a list of N lists containing the indices of the
essential regulators for each node.
"""
self.get_types_of_regulation() #ensuring that self.weights is updated
for i in range(self.N):
regulator_is_non_essential = np.isnan(self.weights[i])
if sum(regulator_is_non_essential)==0: #all variables are essential, nothing to change
continue
non_essential_variables = np.where(regulator_is_non_essential)[0]
essential_variables = np.where(~regulator_is_non_essential)[0]
self.outdegrees[non_essential_variables] -= 1
if len(essential_variables)==0: #no variables are essential, introduce ``fake" auto-regulation to keep this variable and do not delete it as a constant
self.indegrees[i] = 1
self.F[i].f = np.array([self.F[i][0],self.F[i][0]],dtype=int)
self.F[i].n = 1
self.F[i].variables = self.variables[i]
self.I[i] = np.array([i],dtype=int)
self.weights[i] = np.array([np.nan],dtype=float)
self.outdegrees[i] += 1 #add this, even though it's a fake regulation to keep sum(self.outdegrees)==sum(self.indegrees)
continue
left_side_of_truth_table = utils.get_left_side_of_truth_table(self.indegrees[i])
self.F[i].f = self.F[i][np.sum(left_side_of_truth_table[:, non_essential_variables], 1) == 0]
self.F[i].n = len(essential_variables)
self.F[i].variables = self.F[i].variables[~regulator_is_non_essential]
self.I[i] = self.I[i][essential_variables]
self.weights[i] = self.weights[i][essential_variables]
self.indegrees[i] = len(essential_variables)
[docs]
def get_source_nodes(self, AS_DICT : bool = False) -> Union[dict, np.array]:
"""
Identify source nodes in a Boolean network.
A node is considered a source node if it does not change over time. It has
exactly one regulator and that regulator is the node itself.
**Parameters:**
- AS_DICT (bool, optional): Whether to return the indices of source nodes
as a dictionary or array. If true, returns as a dictionary. Defaults
to False.
**Returns:**
If AS_DICT is True:
- dict[int:bool]: Dictionary determining if an index is a
source nodes or not.
else:
- np.array[int]: Array of all indices of source nodes.
"""
rlI = range(self.N)
is_source_node = [self.indegrees[i] == 1 and self.I[i][0] == i and self.F[i][0]==0 and self.F[i][1]==1 for i in rlI]
if AS_DICT:
return dict(zip(rlI, is_source_node))
return np.where(is_source_node)[0]
[docs]
def get_network_with_fixed_source_nodes(self,values_source_nodes : Union[list, np.array]) -> "BooleanNetwork":
"""
Fix the values of source nodes within this Boolean Network.
**Parameters:**
- values_source_nodes (list | np.array): The values to fix for each
source node within this network. Must be of length equivalent to
the number of source nodes in the network, and each element must
be either 0 or 1.
**Returns:**
- BooleanNetwork: A BooleanNetwork object with fixed source nodes.
"""
indices_source_nodes = self.get_source_nodes(AS_DICT=False)
assert len(values_source_nodes)==len(indices_source_nodes),f"The length of 'values_source_nodes', which is {len(values_source_nodes)}, must equal the number of source nodes, which is {len(indices_source_nodes)}."
assert set(values_source_nodes) in set([0,1]),"Controlled node values must be 0 or 1."
F = deepcopy(self.F)
I = deepcopy(self.I)
for source_node,value in zip(indices_source_nodes,values_source_nodes):
F[source_node].f = [value]
I[source_node] = []
bn = self.__class__(F, I, self.variables)
bn.constants.update(self.constants)
return bn
[docs]
def get_network_with_node_controls(self,indices_controlled_nodes : Union[list, np.array],
values_controlled_nodes : Union[list, np.array],
KEEP_CONTROLLED_NODES : bool = False) -> "BooleanNetwork":
"""
Fix the values of nodes within this BooleanNetwork.
**Parameters:**
- indices_controlled_nodes (list | np.array): The indices of the nodes
to fix the value of.
- values_controlled_nodes : (list | np.array): The values to fix for
each specified node in the network.
- KEEP_CONTROLLED_NODES : (bool, optional): Whether to turn controlled
nodes into constants or not. If true, controlled nodes become constants
and will be baked into the network. If false, they will not be considered
as constants. Defaults to false.
**Returns:**
- BooleanNetwork: A BooleanNetwork object with specified nodes controlled.
"""
assert len(values_controlled_nodes)==len(indices_controlled_nodes),f"The length of 'values_controlled_nodes', which is {len(values_controlled_nodes)}, must equal the length of 'indices_controlled_nodes', which is {len(indices_controlled_nodes)}."
assert set(values_controlled_nodes) in set([0,1]),"Controlled node values must be 0 or 1."
F = deepcopy(self.F)
I = deepcopy(self.I)
for node,value in zip(indices_controlled_nodes,values_controlled_nodes):
if KEEP_CONTROLLED_NODES:
F[node].f = [value,value]
I[node] = [node]
else:
F[node].f = [value]
I[node] = []
bn = self.__class__(F, I, self.variables)
if not KEEP_CONTROLLED_NODES:
bn.constants.update(self.constants)
return bn
[docs]
def get_network_with_edge_controls(self,
control_targets : Union[int,list,np.array],
control_sources : Union[int,list,np.array],
type_of_edge_controls : Union[int,list,np.array,None] = None) -> "BooleanNetwork":
"""
Generate a perturbed Boolean network by removing the influence of
specified regulators on specified targets.
The function modifies the Boolean function for target nodes by
restricting it to those entries in its truth table where the input
from given regulators equals the specified type_of_control. The
regulators are then removed from the wiring diagram for that node.
**Parameters:**
- control_targets (int | list[int] | np.array[int]):
Index of the target node(s) to be perturbed.
- control_sources (int | list[int] | np.array[int]):
Index of the regulator(s) whose influence is to be fixed.
- type_of_edge_controls (int | list[int] | np.array[int]) | None):
Source value in regulation of target after control.
Default is None (which is interpreted as 0).
**Returns:**
- BooleanNetwork object where:
- F is the updated list of Boolean functions after perturbation.
- I is the updated wiring diagram after removing the control
regulator from the target node.
"""
# Normalize arguments to lists
if np.isscalar(control_targets):
control_targets = [control_targets]
control_sources = [control_sources]
type_of_edge_controls = [0 if type_of_edge_controls is None else type_of_edge_controls]
elif type_of_edge_controls is None:
type_of_edge_controls = [0] * len(control_targets)
assert len(control_targets) == len(control_sources) == len(type_of_edge_controls), \
"control_targets, control_sources, and type_of_edge_controls must have equal length."
F_new = deepcopy(self.F)
I_new = deepcopy(self.I)
indegrees = np.copy(self.indegrees)
for target, source, fixed_value in zip(control_targets, control_sources, type_of_edge_controls):
assert fixed_value in [0, 1], f"type_of_edge_control must be 0 or 1 (got {fixed_value})."
assert source in I_new[target], f"control_source={source} not in regulators of target={target}"
idx_reg = list(I_new[target]).index(source)
n_inputs = indegrees[target]
# Compute bitmask indices efficiently
indices = np.arange(2 ** n_inputs, dtype=np.uint32)
mask = ((indices >> (n_inputs - 1 - idx_reg)) & 1) == fixed_value
F_new[target] = F_new[target][mask]
# Remove the regulator
I_new[target] = np.delete(I_new[target], idx_reg)
indegrees[target] -= 1
return self.__class__(F_new, I_new, self.variables)
[docs]
def update_single_node(self, index : int,
states_regulators : Union[list, np.array]) -> int:
"""
Update the state of a single node.
The new state is obtained by applying the Boolean function f to the
states of its regulators. The regulator states are converted to a
decimal index using utils.bin2dec.
**Parameters:**
- index (int): The index of the Boolean Function in F.
- states_regulators (list[int] | np.array[int]): Binary vector
representing the states of the node's regulators.
**Returns:**
- int: Updated state of the node (0 or 1).
"""
return self.F[index].f[utils.bin2dec(states_regulators)].item()
[docs]
def update_network_synchronously(self, X : Union[list, np.array]) -> np.array:
"""
Perform a synchronous update of a Boolean network.
Each node's new state is determined by applying its Boolean function
to the current states of its regulators.
**Parameters:**
- X (list[int] | np.array[int]): Current state vector of the network.
**Returns:**
- np.array[int]: New state vector after the update.
"""
if type(X)==list:
X = np.array(X)
Fx = np.zeros(self.N, dtype=int)
for i in range(self.N):
Fx[i] = self.update_single_node(index = i, states_regulators = X[self.I[i]])
return Fx
[docs]
def update_network_synchronously_many_times(self, X : Union[list, np.array],
n_steps : int) -> np.array:
"""
Update the state of a Boolean network sychronously multiple time steps.
Starting from the initial state, the network is updated synchronously
n_steps times using the update_network_synchronously function.
**Parameters:**
- X (list[int] | np.array[int]): Initial state vector of the network.
- n_steps (int): Number of update iterations to perform.
**Returns:**
- np.array[int]: Final state vector after n_steps updates.
"""
for i in range(n_steps):
X = self.update_network_synchronously(X)
return X
[docs]
def update_network_SDDS(self, X : Union[list, np.array], P : np.array,
*, rng=None) -> np.array:
"""
Perform a stochastic update (SDDS) on a Boolean network.
For each node, the next state is computed as nextstep = F[i] evaluated
on the current states of its regulators. If nextstep > X[i], the node
is activated with probability P[i,0]; if nextstep < X[i], the node is
degraded with probability P[i,1]. Otherwise, the state remains unchanged.
**Parameters:**
- X (list[int] | np.array[int]): Current state vector.
- P (np.array[float]): A len(F)×2 array of probabilities; for each
node i, P[i,0] is the activation probability, and P[i,1] is the
degradation probability.
- rng (None, optional): Argument for the random number generator,
implemented in 'utils._coerce_rng'.
**Returns:**
- np.array[int]: Updated state vector after applying the
stochastic update.
"""
rng = utils._coerce_rng(rng)
if type(X)==list:
X = np.array(X)
Fx = X.copy()
for i in range(self.N):
nextstep = self.update_single_node(index = i, states_regulators = X[self.I[i]])
if nextstep > X[i] and rng.random() < P[i, 0]: # activation
Fx[i] = nextstep
elif nextstep < X[i] and rng.random() < P[i, 1]: # degradation
Fx[i] = nextstep
return Fx
[docs]
def get_steady_states_asynchronous(self, nsim : int = 500, EXACT : bool = False,
initial_sample_points : list = [], search_depth : int = 50,
DEBUG : bool = False, *, rng=None) -> dict:
"""
Compute the steady states of a Boolean network under asynchronous updates.
This function simulates asynchronous updates of a Boolean network
(with N nodes) for a given number of initial conditions (nsim). For
each initial state, the network is updated asynchronously until a
steady state (or attractor) is reached or until a maximum search depth
is exceeded. The simulation can be performed either approximately
(by sampling nsim random initial conditions) or exactly (by iterating
over the entire state space when EXACT == True).
**Parameters:**
- nsim (int, optional): Number of initial conditions to simulate
(default is 500).
- EXACT (bool, optional): If True, iterate over the entire state
space and guarantee finding all steady states (2^N initial
conditions); otherwise, use nsim random initial conditions.
(Default is False.)
- initial_sample_points (list[list[int]], optional): List of
initial states (as binary vectors) to use. If provided and EXACT
is False, these override random sampling.
- search_depth (int, optional): Maximum number of asynchronous
update iterations to attempt per simulation.
- DEBUG (bool, optional): If True, print debugging information
during simulation.
- rng (None, optional): Argument for the random number generator,
implemented in 'utils._coerce_rng'.
**Returns:**
- dict[str:Variant]: A dictionary containing:
- SteadyStates (list[int]): List of steady state
values (in decimal form) found.
- NumberOfSteadyStates (int): Total number of unique steady states.
- BasinSizes (list[int]): List of counts showing how many
initial conditions converged to each steady state.
- STGAsynchronous (dict[tuple(int, int):int]):
The asynchronous state transition graph.
STGAsynchronous[(a,i)] = c implies that state a transitions
to state c when the ith variable is updated. Here, a and c
are decimal representations of the state and i is in {0, 1,
..., self.N-1}.
- InitialSamplePoints (list[int]): The list of initial sample
points used (if provided) or those generated during simulation.
"""
rng = utils._coerce_rng(rng)
if EXACT:
left_side_of_truth_table = utils.get_left_side_of_truth_table(self.N)
sampled_points = []
assert initial_sample_points == [] or not EXACT, (
"Warning: sample points were provided but, with option EXACT==True, the entire state space is computed "
"(and initial sample points ignored)"
)
STG_asynchronous = dict()
steady_states = []
basin_sizes = []
steady_state_dict = dict()
for iteration in range(nsim if not EXACT else 2**self.N):
if EXACT:
x = left_side_of_truth_table[iteration]
xdec = iteration
else:
if initial_sample_points == []: # generate random initial states on the fly
x = rng.integers(2, size=self.N)
xdec = utils.bin2dec(x)
sampled_points.append(xdec)
else:
x = initial_sample_points[iteration]
xdec = utils.bin2dec(x)
if DEBUG:
print(iteration, -1, -1, False, xdec, x)
for jj in range(search_depth): # update until a steady state is reached or search_depth is exceeded
FOUND_NEW_STATE = False
try:
# Check if this state is already recognized as a steady state.
index_ss = steady_state_dict[xdec]
except KeyError:
# Asynchronously update the state until a new state is found.
update_order_to_try = rng.permutation(self.N)
for i in update_order_to_try:
try:
fxdec = STG_asynchronous[(xdec, i.item())]
if fxdec != xdec:
FOUND_NEW_STATE = True
x[i] = 1 - x[i]
except KeyError:
fx_i = self.update_single_node(i, x[self.I[i]])
if fx_i > x[i]:
fxdec = xdec + 2**(self.N - 1 - i.item())
x[i] = 1
FOUND_NEW_STATE = True
elif fx_i < x[i]:
fxdec = xdec - 2**(self.N - 1 - i.item())
x[i] = 0
FOUND_NEW_STATE = True
else:
fxdec = xdec
STG_asynchronous.update({(xdec, i.item()): fxdec})
if FOUND_NEW_STATE:
xdec = fxdec
break
if DEBUG:
print(iteration, jj, i, FOUND_NEW_STATE, xdec, x)
if FOUND_NEW_STATE == False: # steady state reached
try:
index_ss = steady_state_dict[xdec]
basin_sizes[index_ss] += 1
break
except KeyError:
steady_state_dict.update({xdec: len(steady_states)})
steady_states.append(xdec)
basin_sizes.append(1)
break
if DEBUG:
print()
if sum(basin_sizes) < (nsim if not EXACT else 2**self.N):
print('Warning: only %i of the %i tested initial conditions eventually reached a steady state. Try increasing the search depth. '
'It may however also be the case that your asynchronous state space contains a limit cycle.' %
(sum(basin_sizes), nsim if not EXACT else 2**self.N))
return dict(zip(["SteadyStates", "NumberOfSteadyStates", "BasinSizes", "STGAsynchronous", "InitialSamplePoints"],
(steady_states, len(steady_states), basin_sizes, STG_asynchronous,
initial_sample_points if initial_sample_points != [] else sampled_points)))
[docs]
def get_steady_states_asynchronous_given_one_initial_condition(self,
initial_condition : Union[int, list, np.array] = 0,
nsim : int = 500, stochastic_weights : list = [],search_depth : int = 50,
DEBUG : bool = False,*, rng = None) -> dict:
"""
Determine the steady states reachable from one initial condition using
weighted asynchronous updates.
This function is similar to steady_states_asynchronous_given_one_IC but
allows the update order to be influenced by provided stochastic weights
(one per node). A weight vector (of length N) may be provided, and if
given, it is normalized and used to bias the random permutation of
node update order.
**Parameters:**
- initial_condition (int | list[int] | np.array[int], optional):
The initial state for all simulations. If an integer, it is
converted to a binary vector. Default is 0.
- nsim (int, optional): Number of simulation runs (default is 500).
- stochastic_weights (list[float], optional): List of stochastic
weights (one per node) used to bias update order. If empty,
uniform random order is used.
- search_depth (int, optional): Maximum number of asynchronous
update iterations per simulation (default is 50).
- DEBUG (bool, optional): If True, print debugging information
(default is False).
- rng (None, optional): Argument for the random number generator,
implemented in 'utils._coerce_rng'.
**Returns:**
- dict[str:Variant]: A dictionary containing:
- SteadyStates (list[int]): List of steady state values (in
decimal form) reached.
- NumberOfSteadyStates (int): Total number of unique steady states.
- BasinSizes (list[int]): List of counts of how many
simulations reached each steady state.
- TransientTimes (list[list[int]]): List of lists with
transient times (number of updates) for each steady state.
- STGAsynchronous (dict[tuple(int, int):int]):
A sample of the asynchronous state transition graph.
STGAsynchronous[(a,i)] = c implies that state a transitions
to state c when the ith variable is updated. Here, a and c
are decimal representations of the state and i is in {0, 1,
..., self.N-1}.
- UpdateQueues (list[list[int]]): List of state update queues
(the sequence of states encountered) for each simulation.
"""
rng = utils._coerce_rng(rng)
if type(initial_condition) == int:
initial_condition = np.array(utils.dec2bin(initial_condition, self.N))
initial_condition_bin = utils.bin2dec(initial_condition)
else:
initial_condition = np.array(initial_condition, dtype=int)
initial_condition_bin = utils.bin2dec(initial_condition)
assert stochastic_weights == [] or len(stochastic_weights) == self.N, "one stochastic weight per node is required"
if stochastic_weights != []:
stochastic_weights = np.array(stochastic_weights) / sum(stochastic_weights)
STG_async = dict()
steady_states = []
basin_sizes = []
transient_times = []
steady_state_dict = dict()
queues = []
for iteration in range(nsim):
x = initial_condition.copy()
xdec = initial_condition_bin
queue = [xdec]
for jj in range(search_depth): # update until a steady state is reached or search_depth is exceeded
FOUND_NEW_STATE = False
try:
index_ss = steady_state_dict[xdec]
except KeyError:
if stochastic_weights != []:
update_order_to_try = rng.choice(self.N, size=self.N, replace=False, p=stochastic_weights)
else:
update_order_to_try = rng.permutation(self.N)
for i in update_order_to_try:
try:
fxdec = STG_async[(xdec, i.item())]
if fxdec != xdec:
FOUND_NEW_STATE = True
x[i] = 1 - x[i]
except KeyError:
fx_i = self.update_single_node(i, x[self.I[i]])
if fx_i > x[i]:
fxdec = xdec + 2**(self.N - 1 - i.item())
x[i] = 1
FOUND_NEW_STATE = True
elif fx_i < x[i]:
fxdec = xdec - 2**(self.N - 1 - i.item())
x[i] = 0
FOUND_NEW_STATE = True
else:
fxdec = xdec
STG_async.update({(xdec, i.item()): fxdec})
if FOUND_NEW_STATE:
xdec = fxdec
queue.append(xdec)
break
if DEBUG:
print(iteration, jj, i, FOUND_NEW_STATE, xdec, x)
if not FOUND_NEW_STATE: # steady state reached
queues.append(queue[:])
try:
index_ss = steady_state_dict[xdec]
basin_sizes[index_ss] += 1
transient_times[index_ss].append(jj)
break
except KeyError:
steady_state_dict.update({xdec: len(steady_states)})
steady_states.append(xdec)
basin_sizes.append(1)
transient_times.append([jj])
break
if FOUND_NEW_STATE:
print(jj)
break
if DEBUG:
print()
if sum(basin_sizes) < nsim:
print('Warning: only %i of the %i tested initial conditions eventually reached a steady state. '
'Try increasing the search depth. It may also be that your asynchronous state space contains a limit cycle.' % (sum(basin_sizes), nsim))
return dict(zip(["SteadyStates", "NumberOfSteadyStates", "BasinSizes", "TransientTimes", "STGAsynchronous", "UpdateQueues"],
(steady_states, len(steady_states), basin_sizes, transient_times, STG_async, queues)))
if __LOADED_NUMBA__:
def get_attractors_synchronous(self,
nsim: int = 500,
initial_sample_points: list = [],
n_steps_timeout: int = 1000,
INITIAL_SAMPLE_POINTS_AS_BINARY_VECTORS: bool = False,
*,
rng=None
) -> dict:
"""
Compute the number of attractors in a Boolean network.
This version is optimized for networks with longer average path
lengths. For each of nb initial conditions, the network is updated
synchronously until an attractor is reached or until n_steps_timeout
is exceeded. The function returns the attractors found, their basin
sizes, a mapping of states to attractors, the set of initial sample
points used, the explored state space, and the number of simulations
that timed out.
Hybrid Numba-accelerated version:
- Python logic for bookkeeping (dicts, cycles, basins)
- Compiled numeric kernel for Boolean updates
**Parameters:**
- nsim (int, optional): Number of initial conditions to simulate
(default is 500). Ignored if 'initial_sample_points' are provided.
- initial_sample_points (list[int | list[int]], optional): List of
initial states to use. 'INITIAL_SAMPLE_POINTS_AS_BINARY_VECTORS'
specifies whether these points are given as vectors or decimals.
- n_steps_timeout (int, optional): Maximum number of update steps
allowed per simulation (default 1000).
- INITIAL_SAMPLE_POINTS_AS_BINARY_VECTORS (bool, optional): If
True, initial_sample_points are provided as binary vectors; if
False, they are given as decimal numbers. Default is False.
- rng (None, optional): Argument for the random number generator,
implemented in 'utils._coerce_rng'.
**Returns:**
- dict[str:Variant]: A dictionary containing:
- Attractors (list[list[int]]): List of attractors (each as a
list of states in the attractor cycle).
- NumberOfAttractors (int): Total number of unique attractors
found. This is a lower bound.
- BasinSizes (list[int]): List of counts for each attractor.
This is an unbiased estimator.
- AttractorDict (dict[int:int]): Dictionary mapping states
(in decimal) to the index of their attractor.
- InitialSamplePoints (list[int]): The initial sample points
used (if provided, they are returned; otherwise, the 'nsim'
generated points are returned).
- STG (dict[int:int]):
A sample of the state transition graph as dictionary, with
each state represented by its decimal representation.
- NumberOfTimeouts (int): Number of simulations that timed out
before reaching an attractor. Increase 'n_steps_timeout' to
reduce this number.
"""
rng = utils._coerce_rng(rng)
dictF = {}
attractors = []
basin_sizes = []
attr_dict = {}
STG = {}
n_timeout = 0
sampled_points = []
INITIAL_SAMPLE_POINTS_EMPTY = utils.check_if_empty(initial_sample_points)
if not INITIAL_SAMPLE_POINTS_EMPTY:
nsim = len(initial_sample_points)
# --- Prepare numba-friendly lookup tables
F_array_list = List([np.array(bf.f, dtype=np.uint8) for bf in self.F])
I_array_list = List([np.array(regs, dtype=np.int64) for regs in self.I])
N = self.N_variables
# --- Simulation loop
for sim_idx in range(nsim):
# Initialize state
if INITIAL_SAMPLE_POINTS_EMPTY:
x = rng.integers(2, size=N, dtype=np.uint8)
xdec = utils.bin2dec(x)
sampled_points.append(xdec)
else:
if INITIAL_SAMPLE_POINTS_AS_BINARY_VECTORS:
x = np.asarray(initial_sample_points[sim_idx], dtype=np.uint8)
xdec = utils.bin2dec(x)
else:
xdec = int(initial_sample_points[sim_idx])
x = np.array(utils.dec2bin(xdec, N), dtype=np.uint8)
visited = {xdec: 0}
trajectory = [xdec]
count = 0
# Iterate until attractor or timeout
while count < n_steps_timeout:
if xdec in dictF:
fxdec = dictF[xdec]
else:
fx = _update_network_synchronously_numba(x, F_array_list, I_array_list, N)
fxdec = utils.bin2dec(fx)
dictF[xdec] = fxdec
x = fx
if count == 0:
STG[xdec] = fxdec
# Already mapped to attractor
if fxdec in attr_dict:
idx_attr = attr_dict[fxdec]
basin_sizes[idx_attr] += 1
for s in trajectory:
attr_dict[s] = idx_attr
break
# New attractor detected
if fxdec in visited:
cycle_start = visited[fxdec]
attractor_states = trajectory[cycle_start:]
attractors.append(attractor_states)
basin_sizes.append(1)
idx_attr = len(attractors) - 1
for s in attractor_states:
attr_dict[s] = idx_attr
break
# Continue traversal
visited[fxdec] = len(trajectory)
trajectory.append(fxdec)
xdec = fxdec
count += 1
if count == n_steps_timeout:
n_timeout += 1
break
return {
"Attractors": attractors,
"NumberOfAttractors": len(attractors),
"BasinSizes": basin_sizes,
"AttractorDict": attr_dict,
"InitialSamplePoints": (
sampled_points if INITIAL_SAMPLE_POINTS_EMPTY else initial_sample_points
),
"STG": STG,
"NumberOfTimeouts": n_timeout,
}
else:
[docs]
def get_attractors_synchronous(
self,
nsim: int = 500,
initial_sample_points: list = [],
n_steps_timeout: int = 1000,
INITIAL_SAMPLE_POINTS_AS_BINARY_VECTORS: bool = False,
*,
rng=None
) -> dict:
"""
Compute the number of attractors in a Boolean network.
This version is optimized for networks with longer average path
lengths. For each of nb initial conditions, the network is updated
synchronously until an attractor is reached or until n_steps_timeout
is exceeded. The function returns the attractors found, their basin
sizes, a mapping of states to attractors, the set of initial sample
points used, the explored state space, and the number of simulations
that timed out.
**Parameters:**
- nsim (int, optional): Number of initial conditions to simulate
(default is 500). Ignored if 'initial_sample_points' are provided.
- initial_sample_points (list[int | list[int]], optional): List of
initial states to use. 'INITIAL_SAMPLE_POINTS_AS_BINARY_VECTORS'
specifies whether these points are given as vectors or decimals.
- n_steps_timeout (int, optional): Maximum number of update steps
allowed per simulation (default 1000).
- INITIAL_SAMPLE_POINTS_AS_BINARY_VECTORS (bool, optional): If
True, initial_sample_points are provided as binary vectors; if
False, they are given as decimal numbers. Default is False.
- rng (None, optional): Argument for the random number generator,
implemented in 'utils._coerce_rng'.
**Returns:**
- dict[str:Variant]: A dictionary containing:
- Attractors (list[list[int]]): List of attractors (each as a
list of states in the attractor cycle).
- NumberOfAttractors (int): Total number of unique attractors
found. This is a lower bound.
- BasinSizes (list[int]): List of counts for each attractor.
This is an unbiased estimator.
- AttractorDict (dict[int:int]): Dictionary mapping states
(in decimal) to the index of their attractor.
- InitialSamplePoints (list[int]): The initial sample points
used (if provided, they are returned; otherwise, the 'nsim'
generated points are returned).
- STG (dict[int:int]):
A sample of the state transition graph as dictionary, with
each state represented by its decimal representation.
- NumberOfTimeouts (int): Number of simulations that timed out
before reaching an attractor. Increase 'n_steps_timeout' to
reduce this number.
"""
rng = utils._coerce_rng(rng)
dictF = {} # memorized transitions
attractors = [] # list of attractor cycles
basin_sizes = [] # basin counts
attr_dict = {} # map: state -> attractor index
STG = {} # sampled state transitions
n_timeout = 0
sampled_points = []
INITIAL_SAMPLE_POINTS_EMPTY = utils.check_if_empty(initial_sample_points)
if not INITIAL_SAMPLE_POINTS_EMPTY:
nsim = len(initial_sample_points)
# Main simulation loop
for sim_idx in range(nsim):
# --- Initial state setup
if INITIAL_SAMPLE_POINTS_EMPTY:
x = rng.integers(2, size=self.N, dtype=np.uint8)
xdec = utils.bin2dec(x)
sampled_points.append(xdec)
else:
if INITIAL_SAMPLE_POINTS_AS_BINARY_VECTORS:
x = np.asarray(initial_sample_points[sim_idx], dtype=np.uint8)
xdec = utils.bin2dec(x)
else:
xdec = int(initial_sample_points[sim_idx])
x = np.array(utils.dec2bin(xdec, self.N), dtype=np.uint8)
visited = {xdec: 0} # maps state→position in trajectory
trajectory = [xdec]
count = 0
# --- Iterate until attractor or timeout
while count < n_steps_timeout:
if xdec in dictF:
fxdec = dictF[xdec]
else:
# use vectorized network update
fx = self.update_network_synchronously(x)
fxdec = utils.bin2dec(fx)
dictF[xdec] = fxdec
x = fx
if count == 0:
STG[xdec] = fxdec
# already mapped to attractor?
if fxdec in attr_dict:
idx_attr = attr_dict[fxdec]
basin_sizes[idx_attr] += 1
for s in trajectory:
attr_dict[s] = idx_attr
break
# cycle found in trajectory → new attractor
if fxdec in visited:
cycle_start = visited[fxdec]
attractor_states = trajectory[cycle_start:]
attractors.append(attractor_states)
basin_sizes.append(1)
idx_attr = len(attractors) - 1
for s in attractor_states:
attr_dict[s] = idx_attr
break
# continue traversal
visited[fxdec] = len(trajectory)
trajectory.append(fxdec)
xdec = fxdec
count += 1
if count == n_steps_timeout:
n_timeout += 1
break
return {
"Attractors": attractors,
"NumberOfAttractors": len(attractors),
"BasinSizes": basin_sizes,
"AttractorDict": attr_dict,
"InitialSamplePoints": sampled_points if INITIAL_SAMPLE_POINTS_EMPTY else initial_sample_points,
"STG": STG,
"NumberOfTimeouts": n_timeout,
}
if __LOADED_NUMBA__:
def compute_synchronous_state_transition_graph(self) -> dict:
"""
Compute the entire synchronous state transition graph (STG)
using Numba for high performance.
**Returns:**
- dict[int:int]: A dictionary representing the state transition
graph of the network, where each key represents the current state
and its corresponding value the next state.
"""
# Preprocess data into Numba-friendly types
F_list = [np.array(bf.f, dtype=np.uint8) for bf in self.F]
I_list = [np.array(regs, dtype=np.int64) for regs in self.I]
if self.N_variables <= 22:
next_indices = _compute_synchronous_stg_numba(F_list, I_list, self.N_variables)
else:
next_indices = _compute_synchronous_stg_numba_low_memory(F_list, I_list, self.N_variables)
# Build the dictionary {current_state: next_state}
self.STG = dict(zip(range(2 ** self.N_variables), next_indices.tolist()))
else:
[docs]
def compute_synchronous_state_transition_graph(self) -> dict:
"""
Compute the entire synchronous state transition graph for all 2^N states.
**Returns:**
- dict[int:int]: A dictionary representing the state transition
graph of the network, where each key represents the current state
and its corresponding value the next state.
"""
# 1. Represent all possible network states as binary matrix
# shape = (2**n, n), each row = one state
states = utils.get_left_side_of_truth_table(self.N_variables)
# 2. Preallocate array for next states
next_states = np.zeros_like(states)
powers_of_two = 2 ** np.arange(self.N_variables)[::-1]
# 3. Compute next value for each node in vectorized form
for j, bf in enumerate(self.F):
regulators = self.I[j]
if len(regulators) == 0:
# constant node
next_states[:, j] = bf.f[0]
continue
# Extract substate of regulators for all states
subspace = states[:, regulators]
# Convert each substate to integer index (row of truth table)
idx = np.dot(subspace, powers_of_two[-len(regulators):])
# Lookup next-state value from Boolean function truth table
next_states[:, j] = bf.f[idx]
# 4. Convert each next-state binary vector to integer index
next_indices = np.dot(next_states, powers_of_two)
self.STG = dict(zip(list(range(2**self.N_variables)), next_indices.tolist()))
[docs]
def get_attractors_synchronous_exact(self) -> dict:
"""
Compute the exact number of attractors in a Boolean network using a
fast, vectorized approach.
This function computes all attractors and their basin sizes from the
the full state transition graph.
**Returns:**
- dict[str:Variant]: A dictionary containing:
- Attractors (list[list[int]]): List of attractors (each
attractor is represented as a list of states forming the
cycle).
- NumberOfAttractors (int): Total number of unique attractors.
- BasinSizes (list[int]): List of counts for each attractor.
- AttractorDict (dict[int:int]): Dictionary mapping each state
(in decimal) to its attractor index.
- STG (dict[int:int]):
The state transition graph as dictionary, with each state
represented by its decimal representation.
"""
if self.STG is None:
self.compute_synchronous_state_transition_graph()
attractors = []
basin_sizes = []
attractor_dict = dict()
for xdec in range(2**self.N):
queue = [xdec]
while True:
fxdec = self.STG[xdec]
try:
index_attr = attractor_dict[fxdec]
basin_sizes[index_attr] += 1
attractor_dict.update(list(zip(queue, [index_attr] * len(queue))))
break
except KeyError:
try:
index = queue.index(fxdec)
attractor_dict.update(list(zip(queue, [len(attractors)] * len(queue))))
attractors.append(queue[index:])
basin_sizes.append(1)
break
except ValueError:
pass
queue.append(fxdec)
xdec = fxdec
return dict(zip(["Attractors", "NumberOfAttractors", "BasinSizes", "AttractorDict", "STG"],
(attractors, len(attractors), basin_sizes, attractor_dict, self.STG)))
## Robustness measures: synchronous Derrida value, entropy of basin size distribution, coherence, fragility
[docs]
def get_derrida_value(self, nsim : int = 1000, EXACT : bool = False,*, rng = None) -> float:
"""
Estimate the Derrida value for a Boolean network.
The Derrida value is computed by perturbing a single node in a randomly
chosen state and measuring the average Hamming distance between the
resulting updated states of the original and perturbed networks.
**Parameters:**
- nsim (int, optional): Number of simulations to perform. Default
is 1000.
- EXACT (bool, optional): If True, the exact Derrida value is
computed and 'nsim' is ignored. Otherwise, 'nsim' simulations
are used to approximate the Derrida value.
- rng (None, optional): Argument for the random number generator,
implemented in 'utils._coerce_rng'.
**Returns:**
- float: The average Hamming distance (Derrida value) over
nsim simulations.
**References:**
#. Derrida, B., & Pomeau, Y. (1986). Random networks of automata:
a simple annealed approximation. Europhysics letters, 1(2), 45.
"""
if EXACT:
return np.mean([
bf.get_average_sensitivity(EXACT=True, NORMALIZED=False) for bf in self.F
])
else:
# --- Numba-friendly preparation
F_array_list = List([np.array(bf.f, dtype=np.uint8) for bf in self.F])
I_array_list = List([np.array(regs, dtype=np.int64) for regs in self.I])
N = self.N_variables if hasattr(self, "N_variables") else self.N
# Derive reproducible seed from rng
rng = utils._coerce_rng(rng)
seed = int(rng.integers(19891989))
return _derrida_simulation(F_array_list, I_array_list, N, nsim, seed)
[docs]
def get_attractors_and_robustness_measures_synchronous_exact(self) -> dict:
"""
Compute the attractors and several robustness measures of a Boolean network.
This function computes the exact attractors and robustness (coherence
and fragility) of the entire network, as well as robustness measures
for each basin of attraction and each attractor.
**Returns:**
- dict[str:Variant]: A dictionary containing:
- Attractors (list[list[int]]): List of attractors (each
attractor is represented as a list of state decimal numbers).
- ExactNumberOfAttractors (int): The exact number of network
attractors.
- BasinSizes (list[int]): List of exact basin sizes for each
attractor.
- AttractorDict (dict[int:int]): Dictionary mapping each state
(in decimal) to its attractor index.
- Coherence (float): overall exact network coherence
- Fragility (float): overall exact network fragility
- BasinCoherence (list[float]): exact coherence of each basin.
- BasinFragility (list[float]): exact fragility of each basin.
- AttractorCoherence (list[float]): exact coherence of each
attractor.
- AttractorFragility (list[float]): exact fragility of each
attractor.
**References:**
#. Park, K. H., Costa, F. X., Rocha, L. M., Albert, R., & Rozum,
J. C. (2023). Models of cell processes are far from the edge of
chaos. PRX life, 1(2), 023009.
#. Bavisetty, V. S. N., Wheeler, M., & Kadelka, C. (2025). xxxx
arXiv preprint arXiv:xxx.xxx.
"""
left_side_of_truth_table = utils.get_left_side_of_truth_table(self.N)
result = self.get_attractors_synchronous_exact()
attractors = result["Attractors"]
n_attractors = result["NumberOfAttractors"]
basin_sizes = np.array(result["BasinSizes"], dtype=np.int64)
attractor_dict = result["AttractorDict"]
len_attractors = np.array([len(a) for a in attractors], dtype=np.int64)
# --- Single-attractor shortcut
if n_attractors == 1:
return dict(zip(
["Attractors", "ExactNumberOfAttractors", "BasinSizes",
"AttractorDict", "BasinCoherence", "BasinFragility",
"AttractorCoherence", "AttractorFragility", "Coherence", "Fragility"],
(attractors, n_attractors, basin_sizes / 2 ** self.N,
attractor_dict, np.ones(1), np.zeros(1),
np.ones(1), np.zeros(1), 1.0, 0.0)
))
# -------------------------------------------------------------------------
# 1. Convert attractor_dict -> numeric array for O(1) lookup
# -------------------------------------------------------------------------
attractor_idx = np.full(2 ** self.N, -1, dtype=np.int32)
for k, v in attractor_dict.items():
attractor_idx[k] = v
# 2. Compute mean binary vector for each attractor (needed for fragility computation)
mean_states_attractors = []
is_attr_mask = np.zeros(2 ** self.N, dtype=bool)
for i, states in enumerate(attractors):
if len(states) == 1:
mean_states_attractors.append(np.array(utils.dec2bin(states[0], self.N), dtype=float))
else:
arr = np.array([utils.dec2bin(s, self.N) for s in states], dtype=float)
mean_states_attractors.append(arr.mean(axis=0))
is_attr_mask[states] = True
mean_states_attractors = np.stack(mean_states_attractors)
# 3. Distance matrix between attractors (vectorized)
diff = mean_states_attractors[:, None, :] - mean_states_attractors[None, :, :]
distance_between_attractors = np.sum(np.abs(diff), axis=2) / self.N
# -------------------------------------------------------------------------
# 4. Edge traversal (same logic, now with array lookups)
# -------------------------------------------------------------------------
n_attractors = len(attractors)
basin_coherences = np.zeros(n_attractors)
basin_fragilities = np.zeros(n_attractors)
attractor_coherences = np.zeros(n_attractors)
attractor_fragilities = np.zeros(n_attractors)
powers_of_2 = (2 ** np.arange(self.N))[::-1]
for xdec, x in enumerate(left_side_of_truth_table):
for i in range(self.N):
if x[i] == 1:
continue # skip to avoid double-counting
ydec = xdec + powers_of_2[i]
idx_x = attractor_idx[xdec]
idx_y = attractor_idx[ydec]
if idx_x == idx_y:
basin_coherences[idx_x] += 2 # count both directions
if is_attr_mask[xdec]:
attractor_coherences[idx_x] += 1
if is_attr_mask[ydec]:
attractor_coherences[idx_y] += 1
else:
dxy = distance_between_attractors[idx_x, idx_y]
basin_fragilities[idx_x] += dxy
basin_fragilities[idx_y] += dxy
if is_attr_mask[xdec]:
attractor_fragilities[idx_x] += dxy
if is_attr_mask[ydec]:
attractor_fragilities[idx_y] += dxy
# -------------------------------------------------------------------------
# 5. Normalize
# -------------------------------------------------------------------------
for i, (basin_size, length_attractor) in enumerate(zip(basin_sizes, len_attractors)):
basin_coherences[i] /= basin_size * self.N
basin_fragilities[i] /= basin_size * self.N
attractor_coherences[i] /= length_attractor * self.N
attractor_fragilities[i] /= length_attractor * self.N
basin_sizes_norm = basin_sizes / (2 ** self.N)
coherence = np.dot(basin_sizes_norm, basin_coherences)
fragility = np.dot(basin_sizes_norm, basin_fragilities)
return dict(zip(
["Attractors", "ExactNumberOfAttractors",
"BasinSizes", "AttractorDict",
"Coherence", "Fragility",
"BasinCoherence", "BasinFragility",
"AttractorCoherence", "AttractorFragility"],
(attractors, n_attractors,
basin_sizes_norm, attractor_dict,
coherence, fragility,
basin_coherences, basin_fragilities,
attractor_coherences, attractor_fragilities)
))
[docs]
def get_attractors_and_robustness_measures_synchronous(self, number_different_IC : int = 500,
RETURN_ATTRACTOR_COHERENCE : bool = True, *, rng=None) -> dict:
"""
Approximate global robustness measures and attractors.
This function samples the attractor landscape by simulating the network
from a number of different initial conditions. It computes:
- The coherence: the proportion of neighboring states (in the
Boolean hypercube) that, after synchronous update, transition to
the same attractor.
- The fragility: a measure of how much the attractor state changes
(assumed under synchronous update) in response to perturbations.
- The final time-step Hamming distance between perturbed trajectories.
In addition, it collects several details about each attractor (such as
basin sizes, coherence of each basin, etc.).
**Parameters:**
- number_different_IC (int, optional): Number of different initial
conditions to sample (default is 500).
- RETURN_ATTRACTOR_COHERENCE (bool, optional): Determines whether
the attractor coherence should also be computed (default True,
i.e., Yes).
- rng (None, optional): Argument for the random number generator,
implemented in 'utils._coerce_rng'.
**Returns:**
- dict[str:Variant]: A dictionary containing:
- Attractors (list[list[int]]): List of attractors (each
attractor is represented as a list of state decimal numbers).
- LowerBoundOfNumberOfAttractors (int): The lower bound on the
number of attractors found.
- BasinSizes (list[int]): List of basin sizes for each attractor.
- CoherenceApproximation (float): The approximate overall
network coherence.
- FragilityApproximation (float): The approximate overall
network fragility.
- FinalHammingDistanceApproximation (float): The approximate
final Hamming distance measure.
- BasinCoherenceApproximation (list[float]): The approximate
coherence of each basin.
- BasinFragilityApproximation (list[float]): The approximate
fragility of each basin.
- AttractorCoherence (list[float]): The exact coherence of
each attractor (only computed and returned if
RETURN_ATTRACTOR_COHERENCE == True).
- AttractorFragility (list[float]): The exact fragility of
each attractor (only computed and returned if
RETURN_ATTRACTOR_COHERENCE == True).
**References:**
#. Park, K. H., Costa, F. X., Rocha, L. M., Albert, R., & Rozum,
J. C. (2023). Models of cell processes are far from the edge of
chaos. PRX life, 1(2), 023009.
#. Bavisetty, V. S. N., Wheeler, M., & Kadelka, C. (2025). xxxx
arXiv preprint arXiv:xxx.xxx.
"""
rng = utils._coerce_rng(rng)
def lcm(a, b):
return abs(a*b) // math.gcd(a, b)
dictF = dict()
attractors = []
ICs_per_attractor_state = []
basin_sizes = []
attractor_dict = dict()
attractor_state_dict = []
distance_from_attractor_state_dict = []
counter_phase_shifts = []
height = []
powers_of_2s = [np.array([2**i for i in range(NN)])[::-1] for NN in range(max(self.indegrees)+1)]
if self.N<64:
powers_of_2 = np.array([2**i for i in range(self.N)])[::-1]
robustness_approximation = 0
fragility_sum = 0
basin_robustness = defaultdict(float)
basin_fragility = defaultdict(float)
final_hamming_distance_approximation = 0
mean_states_attractors = []
states_attractors = []
for i in range(number_different_IC):
index_attractors = []
index_of_state_within_attractor_reached = []
distance_from_attractor = []
for j in range(2):
if j == 0:
x = rng.integers(2, size=self.N)
if self.N<64:
xdec = np.dot(x, powers_of_2).item()
else: #out of range of np.int64
xdec = ''.join(str(bit) for bit in x)
x_old = x.copy()
else:
x = x_old
random_flipped_bit = rng.integers(self.N)
x[random_flipped_bit] = 1 - x[random_flipped_bit]
if self.N<64:
xdec = np.dot(x, powers_of_2).item()
else: #out of range of np.int64
xdec = ''.join(str(bit) for bit in x)
queue = [xdec]
try:
index_attr = attractor_dict[xdec]
except KeyError:
while True:
try: #check if we already know F(xdec)
fxdec = dictF[xdec]
except KeyError: #if not, then compute the F(xdec)
fx = []
for jj in range(self.N):
if self.indegrees[jj]>0:
fx.append(self.F[jj].f[np.dot(x[self.I[jj]], powers_of_2s[self.indegrees[jj]]).item()])
else:#constant functions whose regulators were all fixed to a specific value
fx.append(self.F[jj].f[0])
if self.N<64:
fxdec = np.dot(fx, powers_of_2).item()
else:
fxdec = ''.join(str(bit) for bit in fx)
dictF.update({xdec: fxdec})
try: #check if we already know the attractor of F(xdec)
index_attr = attractor_dict[fxdec]
dummy_index_within_attractor_reached = attractor_state_dict[index_attr][fxdec]
dummy_distance_from_attractor = distance_from_attractor_state_dict[index_attr][fxdec]
attractor_dict.update(list(zip(queue, [index_attr]*len(queue))))
attractor_state_dict[index_attr].update(list(zip(queue, [dummy_index_within_attractor_reached]*len(queue))))
distance_from_attractor_state_dict[index_attr].update(
list(zip(queue, list(range(len(queue) + dummy_distance_from_attractor, dummy_distance_from_attractor, -1))))
)
break
except KeyError:
try: #if not, then check if F(xdec) is already in the queue, i.e., if F(xdec) is part of an attractor itself
index = queue.index(fxdec)
index_attr = len(attractors)
attractor_dict.update(list(zip(queue, [index_attr]*len(queue))))
attractors.append(queue[index:])
basin_sizes.append(1)
attractor_state_dict.append(dict(zip(queue, [0]*index + list(range(len(attractors[-1])))))
)
distance_from_attractor_state_dict.append(
dict(zip(queue, list(range(index, 0, -1)) + [0]*len(attractors[-1])))
)
ICs_per_attractor_state.append([0] * len(attractors[-1]))
counter_phase_shifts.append([0] * len(attractors[-1]))
if len(attractors[-1]) == 1:
if self.N<64:
fixed_point = np.array(utils.dec2bin(queue[index], self.N))
else:
fixed_point = np.array(list(queue[index]), dtype=int)
states_attractors.append(fixed_point.reshape((1, self.N)))
mean_states_attractors.append(fixed_point)
else:
if self.N<64:
limit_cycle = np.array([utils.dec2bin(state, self.N) for state in queue[index:]])
else:
limit_cycle = np.array([np.array(list(state), dtype=int) for state in queue[index:]])
states_attractors.append(limit_cycle)
mean_states_attractors.append(limit_cycle.mean(0))
break
except ValueError: #if not, proceed by setting x = F(x)
x = np.array(fx)
queue.append(fxdec)
xdec = fxdec
index_attractors.append(index_attr)
index_of_state_within_attractor_reached.append(attractor_state_dict[index_attr][xdec])
distance_from_attractor.append(distance_from_attractor_state_dict[index_attr][xdec])
basin_sizes[index_attr] += 1
ICs_per_attractor_state[index_attr][attractor_state_dict[index_attr][xdec]] += 1
if index_attractors[0] == index_attractors[1]:
robustness_approximation += 1
basin_robustness[index_attractors[0]] += 1
length_phaseshift = max(index_of_state_within_attractor_reached) - min(index_of_state_within_attractor_reached)
counter_phase_shifts[index_attr][length_phaseshift] += 1
else:
fragility_sum += np.sum(np.abs(mean_states_attractors[index_attractors[0]] - mean_states_attractors[index_attractors[1]]))
basin_fragility[index_attractors[0]] += np.sum(np.abs(mean_states_attractors[index_attractors[0]] - mean_states_attractors[index_attractors[1]]))
required_n_states = lcm(len(attractors[index_attractors[0]]), len(attractors[index_attractors[1]]))
index_j0 = index_of_state_within_attractor_reached[0]
periodic_states_j0 = np.tile(states_attractors[index_attractors[0]],
(required_n_states // len(attractors[index_attractors[0]]) + 1, 1))[index_j0:(index_j0 + required_n_states), :]
index_j1 = index_of_state_within_attractor_reached[1]
periodic_states_j1 = np.tile(states_attractors[index_attractors[1]],
(required_n_states // len(attractors[index_attractors[1]]) + 1, 1))[index_j1:(index_j1 + required_n_states), :]
final_hamming_distance_approximation += np.mean(periodic_states_j1 == periodic_states_j0)
height.extend(distance_from_attractor)
lower_bound_number_of_attractors = len(attractors)
approximate_basin_sizes = np.array(basin_sizes)
approximate_coherence = robustness_approximation * 1.0 / number_different_IC
approximate_fragility = fragility_sum * 1.0 / number_different_IC / self.N
approximate_basin_coherence = np.array([basin_robustness[index_att] * 2.0 / basin_sizes[index_att] for index_att in range(len(attractors))])
approximate_basin_fragility = np.array([basin_fragility[index_att] * 2.0 / basin_sizes[index_att] / self.N for index_att in range(len(attractors))])
for index_attr in range(len(attractors)):
periodic_states_two_periods = np.tile(states_attractors[index_attr], (2, 1))
for length_phaseshift, num_IC_with_that_phaseshift in enumerate(counter_phase_shifts[index_attr]):
if num_IC_with_that_phaseshift > 0 and length_phaseshift > 0:
final_hamming_distance_approximation += num_IC_with_that_phaseshift * np.mean(
states_attractors[index_attr] ==
periodic_states_two_periods[length_phaseshift:(length_phaseshift + len(attractors[index_attr])), :]
)
final_hamming_distance_approximation = final_hamming_distance_approximation / number_different_IC
#fixing the results here because the subsequent attractor coherence computation could in theory identify additional attractors,
#which would screw things up because the attractor regions of the state space have then been oversampled
results = [attractors, lower_bound_number_of_attractors, approximate_basin_sizes/2./number_different_IC,
approximate_coherence, approximate_fragility, final_hamming_distance_approximation,
approximate_basin_coherence, approximate_basin_fragility]
if RETURN_ATTRACTOR_COHERENCE == False:
return dict(zip(["Attractors", "LowerBoundOfNumberOfAttractors", "BasinSizesApproximation",
"CoherenceApproximation", "FragilityApproximation", "FinalHammingDistanceApproximation",
"BasinCoherenceApproximation", "BasinFragilityApproximation"],
tuple(results)))
else:
attractor_coherence = np.zeros(lower_bound_number_of_attractors)
attractor_fragility = np.zeros(lower_bound_number_of_attractors)
attractors_original = attractors[:] #needed because new attractors may be found
for index_attr_original,attractor in enumerate(attractors_original):
for attractor_state in attractor: #perturb each attractor state
for i in range(self.N):
if self.N<64:
x = np.array(utils.dec2bin(attractor_state, self.N))
else:
x = np.array(list(attractor_state), dtype=int)
x[i] = 1 - x[i]
if self.N<64:
xdec = np.dot(x, powers_of_2).item()
else:
xdec = ''.join(str(bit) for bit in x)
queue = [xdec]
try:
index_attr = attractor_dict[xdec]
except KeyError:
while True:
try: #check if we already know F(xdec)
fxdec = dictF[xdec]
except KeyError: #if not, then compute the F(xdec)
fx = []
for jj in range(self.N):
if self.indegrees[jj]>0:
fx.append(self.F[jj].f[np.dot(x[self.I[jj]], powers_of_2s[self.indegrees[jj]]).item()])
else:#constant functions whose regulators were all fixed to a specific value
fx.append(self.F[jj].f[0])
if self.N<64:
fxdec = np.dot(fx, powers_of_2).item()
else:
fxdec = ''.join(str(bit) for bit in fx)
dictF.update({xdec: fxdec})
try: #check if we already know the attractor of F(xdec)
index_attr = attractor_dict[fxdec]
dummy_index_within_attractor_reached = attractor_state_dict[index_attr][fxdec]
dummy_distance_from_attractor = distance_from_attractor_state_dict[index_attr][fxdec]
attractor_dict.update(list(zip(queue, [index_attr]*len(queue))))
attractor_state_dict[index_attr].update(list(zip(queue, [dummy_index_within_attractor_reached]*len(queue))))
distance_from_attractor_state_dict[index_attr].update(
list(zip(queue, list(range(len(queue) + dummy_distance_from_attractor, dummy_distance_from_attractor, -1))))
)
break
except KeyError:
try: #if not, then check if F(xdec) is already in the queue, i.e., if F(xdec) is part of an attractor itself
index = queue.index(fxdec)
index_attr = len(attractors)
attractor_dict.update(list(zip(queue, [index_attr]*len(queue))))
attractors.append(queue[index:])
#basin_sizes.append(1)
attractor_state_dict.append(dict(zip(queue, [0]*index + list(range(len(attractors[-1])))))
)
distance_from_attractor_state_dict.append(
dict(zip(queue, list(range(index, 0, -1)) + [0]*len(attractors[-1])))
)
ICs_per_attractor_state.append([0] * len(attractors[-1]))
counter_phase_shifts.append([0] * len(attractors[-1]))
if len(attractors[-1]) == 1:
if self.N<64:
fixed_point = np.array(utils.dec2bin(queue[index], self.N))
else:
fixed_point = np.array(list(queue[index]), dtype=int)
states_attractors.append(fixed_point.reshape((1, self.N)))
mean_states_attractors.append(fixed_point)
else:
if self.N<64:
limit_cycle = np.array([utils.dec2bin(state, self.N) for state in queue[index:]])
else:
limit_cycle = np.array([np.array(list(state), dtype=int) for state in queue[index:]])
states_attractors.append(limit_cycle)
mean_states_attractors.append(limit_cycle.mean(0))
break
except ValueError: #if not, proceed by setting x = F(x)
x = np.array(fx)
queue.append(fxdec)
xdec = fxdec
if index_attr_original == index_attr:
attractor_coherence[index_attr_original] += 1
else:
attractor_fragility[index_attr_original] += np.sum(np.abs(mean_states_attractors[index_attr_original] - mean_states_attractors[index_attr]))
attractor_coherence = np.array([s/self.N/size_attr for s,size_attr in zip(attractor_coherence,map(len,attractors_original))])
attractor_fragility = np.array([s/self.N**2/size_attr for s,size_attr in zip(attractor_fragility,map(len,attractors_original))]) #something is wrong with attractor fragility, it returns values > 1 for small basins
results[0] = attractors_original #important! It may be that new attractors were found, reset the count
return dict(zip(["Attractors", "LowerBoundOfNumberOfAttractors", "BasinSizesApproximation",
"CoherenceApproximation", "FragilityApproximation", "FinalHammingDistanceApproximation",
"BasinCoherenceApproximation", "BasinFragilityApproximation",
"AttractorCoherence", "AttractorFragility"],
tuple(results + [attractor_coherence,attractor_fragility])))
# def get_attractors_and_robustness_measures_synchronous_vectorized(
# self, number_different_IC: int = 500,
# RETURN_ATTRACTOR_COHERENCE: bool = True,
# *, rng=None
# ) -> dict:
# """
# Vectorized approximation of attractors, coherence, and fragility.
# 10–30× faster than the dict-based version, identical outputs statistically.
# """
# rng = utils._coerce_rng(rng)
# N = self.N
# powers_of_2 = 2 ** np.arange(N)[::-1]
# # --- 1. Sample random initial conditions and their one-bit flips
# X0 = rng.integers(0, 2, size=(number_different_IC, N), dtype=np.uint8)
# flip_indices = rng.integers(0, N, size=number_different_IC)
# X1 = X0.copy()
# X1[np.arange(number_different_IC), flip_indices] ^= 1 # bit-flip
# # --- 2. Compute next states for all X0 and X1 in vectorized batches
# FX0 = np.zeros_like(X0)
# FX1 = np.zeros_like(X1)
# powers_cache = {deg: 2 ** np.arange(deg)[::-1] for deg in range(max(self.indegrees) + 1)}
# for j, bf in enumerate(self.F):
# regs = self.I[j]
# deg = len(regs)
# if deg == 0:
# FX0[:, j] = bf.f[0]
# FX1[:, j] = bf.f[0]
# else:
# idx0 = np.dot(X0[:, regs], powers_cache[deg])
# idx1 = np.dot(X1[:, regs], powers_cache[deg])
# FX0[:, j] = bf.f[idx0]
# FX1[:, j] = bf.f[idx1]
# # --- 3. Compute Hamming distances between updated pairs
# diff = np.abs(FX0 != FX1)
# hamming = diff.sum(axis=1)
# mean_hamming = hamming.mean() / N
# # --- 4. Estimate coherence and fragility
# coherence_approx = np.mean(hamming == 0)
# fragility_approx = np.mean(hamming > 0) * mean_hamming
# # --- 5. Optionally estimate attractor coherence/fragility (via FX0 only)
# if RETURN_ATTRACTOR_COHERENCE:
# # Treat each unique FX0 state as an attractor representative
# uniq, inv = np.unique(FX0, axis=0, return_inverse=True)
# n_attr = len(uniq)
# basin_sizes = np.bincount(inv) / number_different_IC
# # Compute average internal coherence within each basin
# basin_coherence = np.zeros(n_attr)
# basin_fragility = np.zeros(n_attr)
# for k in range(n_attr):
# members = FX0[inv == k]
# if len(members) > 1:
# dmat = np.sum(np.abs(members[:, None, :] - members[None, :, :]), axis=2) / N
# basin_fragility[k] = dmat.mean()
# basin_coherence[k] = np.mean(dmat == 0)
# else:
# basin_coherence[k] = 1.0
# basin_fragility[k] = 0.0
# attractor_coherence = basin_coherence.copy()
# attractor_fragility = basin_fragility.copy()
# return dict(
# Attractors=[np.dot(a, powers_of_2).astype(int).tolist() for a in uniq],
# LowerBoundOfNumberOfAttractors=n_attr,
# BasinSizesApproximation=basin_sizes,
# CoherenceApproximation=coherence_approx,
# FragilityApproximation=fragility_approx,
# FinalHammingDistanceApproximation=mean_hamming,
# BasinCoherenceApproximation=basin_coherence,
# BasinFragilityApproximation=basin_fragility,
# AttractorCoherence=attractor_coherence,
# AttractorFragility=attractor_fragility,
# )
# else:
# uniq, inv = np.unique(FX0, axis=0, return_inverse=True)
# n_attr = len(uniq)
# basin_sizes = np.bincount(inv) / number_different_IC
# return dict(
# Attractors=[np.dot(a, powers_of_2).astype(int).tolist() for a in uniq],
# LowerBoundOfNumberOfAttractors=n_attr,
# BasinSizesApproximation=basin_sizes,
# CoherenceApproximation=coherence_approx,
# FragilityApproximation=fragility_approx,
# FinalHammingDistanceApproximation=mean_hamming,
# )
# n = 14
# k=4
# bn = boolforge.random_network(N=10,n=4)
# bn_new = BooleanNetwork(bn.F,bn.I)
# bn_new.compute_synchronous_state_transition_graph_old()
# STG_old = bn_new.STG
# bn_new.compute_synchronous_state_transition_graph()
# STG = bn_new.STG
# print(STG_old == STG)