"""
Miscellaneous convenience functions
Some functionality was adapted from the original NMpathAnalysis package,
https://github.com/ZuckermanLab/NMpathAnalysis
"""
from copy import deepcopy
import numpy as np
import operator
from scipy.sparse import csr_matrix
import scipy.sparse as sparse
# used to check connectivity
import scipy.sparse.csgraph as csgraph
from scipy.sparse.sputils import isdense
from ._logging import log
[docs]def find_connected_sets(C, directed=True):
r"""
This implementation is taken from msmtools.estimation.sparse.connectivity, at commit 9312660.
See the original at https://github.com/markovmodel/msmtools/blob/devel/msmtools/estimation/sparse/connectivity.py#L30
Compute connected components for a directed graph with weights
represented by the given count matrix.
Parameters
----------
C : scipy.sparse matrix or numpy ndarray
square matrix specifying edge weights.
directed : bool, optional
Whether to compute connected components for a directed or
undirected graph. Default is True.
Returns
-------
cc : list of arrays of integers
Each entry is an array containing all vertices (states) in
the corresponding connected component.
"""
if isdense(C):
C = csr_matrix(C)
M = C.shape[0]
""" Compute connected components of C. nc is the number of
components, indices contain the component labels of the states
"""
nc, indices = csgraph.connected_components(
C, directed=directed, connection="strong"
)
states = np.arange(M) # Discrete states
"""Order indices"""
ind = np.argsort(indices)
indices = indices[ind]
"""Order states"""
states = states[ind]
""" The state index tuple is now of the following form (states,
indices)=([s_23, s_17,...,s_3, s_2, ...], [0, 0, ..., 1, 1, ...])
"""
"""Find number of states per component"""
count = np.bincount(indices)
"""Cumulative sum of count gives start and end indices of
components"""
csum = np.zeros(len(count) + 1, dtype=int)
csum[1:] = np.cumsum(count)
"""Generate list containing components, sort each component by
increasing state label"""
cc = []
for i in range(nc):
cc.append(np.sort(states[csum[i] : csum[i + 1]]))
"""Sort by size of component - largest component first"""
cc = sorted(cc, key=lambda x: -len(x))
return cc
[docs]def is_connected(matrix, source_states, target_states, directed=True):
"""
Check for connectivity between two states.
If directed is True, then checks for directional connectivity from source_states to target_states.
Parameters
----------
matrix: np.array, NxN
Transition matrix
source_states: array-like, N
Source states
target_states: array-like, N
Target states
directed: bool, default=True
Compute directional connectivity
Returns
-------
bool
"""
return (
np.inf
not in csgraph.shortest_path(matrix, directed=directed, indices=source_states)[
:, target_states
]
)
[docs]def inverse_iteration(guess, matrix, mu=1):
"""
Do one iteration of inverse iteration.
Parameters
----------
guess: array-like (N elements)
Vector of weights to be used as the initial guess.
matrix: array-like (NxN elements)
Transition matrix to use for inverse iteration.
Returns
-------
The new vector of weights after one iteration of inverse iteration.
"""
# Looking for eigenvector corresponding to eigenvalue 1
identity = sparse.eye(guess.shape[0])
# Inverse
try:
inverse = sparse.linalg.inv(matrix.T - mu * identity)
except RuntimeError as e:
if not mu == 1:
filename = "bad_matrix.npy"
log.error(
f"Inverse iteration still failed with mu={mu} -- examine your transition matrix for why it could "
f"be unsolvable. Saving the transition matrix to {filename}."
)
np.save(filename, matrix)
raise e
elif mu == 1:
log.error(
f"When solving steady-state, failed to perform inverse iteration! "
f"Trying again with mu=0.999 instead of {mu}."
)
return inverse_iteration(guess, matrix, mu=0.999)
result = inverse @ guess
result = result.squeeze()
# Normalize
result /= sum(result)
return result
[docs]class Interval:
"""Intervals are in general defined as half-open interval [start,end).
in any case, in each dimension the interval is specified using a list [a,b]
where a < b
- For 1D (single) interval a single list in the form [a,b] has to be given
- The union of multiple (1D) intervals can be specified as:
[[a,b],[c,d],...]
- A list of lists [[a,b],[c,d],...] are used for a n-dimensional
intervals, one for each dimension, i.e, len(interval) = n_variables
- A list-of-lists-of-lists for the mathematical union of n-dimensional
intervals'
[ [[a,b],[c,d],...], [[e,f],[g,h],...], ... ]
"""
def __init__(self, interval_set, n_variables):
self.interval_set = interval_set
self.n_variables = n_variables
def __contains__(self, item):
shape = np.array(self.interval_set).shape
len_shape = len(shape)
if (self.n_variables == 1) and (len_shape == 1): # single 1D interval
return self.interval_set[0] <= item < self.interval_set[1]
elif (self.n_variables == 1) and (
len_shape == 2
): # union of multiple 1D intervals
return any(
[(item in Interval(self.interval_set[i], 1)) for i in range(shape[0])]
)
elif (self.n_variables > 1) and len_shape == 2: # n-dimensional interval
return all(
[
(item[i] in Interval(self.interval_set[i], 1))
for i in range(shape[0])
]
)
elif len(shape) == 3: # union of n-dimensional intervals
return any(
[
(item in Interval(self.interval_set[i], self.n_variables))
for i in range(shape[0])
]
)
else:
raise Exception("The given interval has not the expected shape")
[docs]def reverse_sort_lists(list_1, list_2):
"""Reverse sorting two list based on the first one"""
list_1_sorted, list_2_sorted = zip(
*sorted(zip(list_1, list_2), key=operator.itemgetter(0), reverse=True)
)
return list_1_sorted, list_2_sorted
[docs]def weighted_choice(list_, weights=None):
"""Select an element from a list with probability from weights"""
size = len(list_)
if weights is not None:
assert size == len(weights)
if weights is None:
probs = np.array([1 / float(size) for i in range(size)])
else:
probs = np.array(weights) / sum(weights) # just in case
rand = np.random.random()
_sum = 0
for i in range(size):
if _sum <= rand < _sum + probs[i]:
choice = i
break
else:
_sum += probs[i]
return list_[choice]
[docs]def get_shape(trajectory):
"""Get the shape of a trajectory array in tuple (n_snapshots, n_variables)"""
shape = np.array(trajectory).shape
if len(shape) == 1:
n_snapshots = shape[0]
n_variables = 1
if n_variables == 0:
raise Exception(
"The shape {} of the trajectory/array \
given is not as expected".format(
shape
)
)
elif len(shape) == 2:
n_snapshots = shape[0]
n_variables = shape[1]
else:
raise Exception(
"The shape {} of the trajectory/array given is not \
as expected".format(
shape
)
)
return n_snapshots, n_variables
[docs]def num_of_nonzero_elements(my_vector):
"""Returns the number of non-zero elements in a vector"""
counter = 0
for element in my_vector:
if element != 0:
counter += 1
return counter
[docs]def normalize_markov_matrix(transition_matrix, reversible=False):
"""Transform a matrix of positive elements to a markov-like matrix
by dividing each row by the sum of the elements of the row.
"""
t_matrix = np.array(transition_matrix, dtype=np.float64)
if reversible:
t_matrix = t_matrix.T + t_matrix
n_states = len(t_matrix)
assert n_states == len(t_matrix[0])
for i in range(n_states):
if (t_matrix[i, :] < 0).any():
raise ValueError(
"All the elements in the input \
matrix must be non-negative"
)
t_matrix[i, :] = normalize(t_matrix[i, :])
return t_matrix
[docs]def normalize(my_vector):
"""Normalize a vector
by dividing each element by the total sum of all its elements
"""
my_vector = np.array(my_vector)
size = len(my_vector)
sum_ = sum(my_vector)
if sum_ != 0.0:
for i in range(size):
my_vector[i] = my_vector[i] / sum_
return my_vector
[docs]def random_markov_matrix(n_states=5, seed=None):
"""Returns a random transition markov matrix"""
if seed is not None:
np.random.seed(seed)
t_matrix = np.random.random((n_states, n_states))
return normalize_markov_matrix(t_matrix)
[docs]def check_tmatrix(t_matrix, accept_null_rows=True):
"""Check if the given matrix is actually a row-stochastic transition matrix
i.e, all the elements are non-negative and the rows add to one.
If the keyword argument accept_null_rows is True, is going
to accept rows where all the elements are zero. Those "problematic"
states are going to be removed later if necessary by clean_tmatrix.
"""
def value_error():
raise ValueError("The object given is not a transition matrix")
n_states = len(t_matrix)
if not (n_states == len(t_matrix[0])):
value_error()
for index, row in enumerate(t_matrix):
sum_ = 0.0
for element in row:
if element < 0.0:
value_error()
sum_ += element
if accept_null_rows:
if not (np.isclose(sum_, 1.0, atol=1e-6) or sum_ == 0.0):
value_error()
else:
if not np.isclose(sum_, 1.0, atol=1e-6):
value_error()
return False
[docs]def clean_tmatrix(transition_matrix, rm_absorbing=True):
"""Removes the states/indexes with no transitions and that are absorbing
if the the keyword argument rm_absorbing is true
Returns the "clean" transition matrix and a list with the
removed states/indexes (clean_tmatrix, removed_states)
"""
t_matrix = deepcopy(transition_matrix)
n_states = len(transition_matrix)
# Removing the non-visited states and absorbing states
removed_states = []
for index in range(n_states - 1, -1, -1):
if not any(t_matrix[index]): # non-visited
t_matrix = np.delete(t_matrix, index, axis=1)
t_matrix = np.delete(t_matrix, index, axis=0)
removed_states.append(index)
elif t_matrix[index, index] == 1.0: # absorbing state
if not all(
[t_matrix[index, j] == 0.0 for j in range(n_states) if j != index]
):
raise ValueError(
"The sum of the elements in a row of the \
transition matrix must be one"
)
t_matrix = np.delete(t_matrix, index, axis=1)
t_matrix = np.delete(t_matrix, index, axis=0)
removed_states.append(index)
# Renormalizing just in case
t_matrix = normalize_markov_matrix(t_matrix)
return t_matrix, removed_states
[docs]def pops_from_tmatrix(transition_matrix):
"""Calculate the eigen values and eigen vectors of the transposed transition matrix
Parameters
----------
transition_matrix : ndarray with shape = (n_states, n_states)
Returns
-------
the solution, p, of K.T p = p where K.T is the transposed transition matrix
"""
check_tmatrix(transition_matrix)
n_states = len(transition_matrix)
# Cleaning the transition matrix
cleaned_matrix, removed_states = clean_tmatrix(transition_matrix)
# Computing
eig_vals, eig_vecs = np.linalg.eig(cleaned_matrix.T)
eig_vecs = eig_vecs.T # for convenience, now every row is an eig_vector
eig_vals_close_to_one = np.isclose(eig_vals, 1.0, atol=1e-6)
real_eig_vecs = [not np.iscomplex(row).any() for row in eig_vecs]
new_n_states = n_states - len(removed_states)
ss_solution = np.zeros(new_n_states) # steady-state solution
for is_close_to_one, is_real, eigv in zip(
eig_vals_close_to_one, real_eig_vecs, eig_vecs
):
if (
is_close_to_one
and is_real
and num_of_nonzero_elements(eigv) > num_of_nonzero_elements(ss_solution)
and ((eigv <= 0).all() or (eigv >= 0).all())
):
ss_solution = eigv
if (ss_solution == 0.0).all():
raise Exception(
"No steady-state solution found for \
the given transition matrix"
)
ss_solution = normalize(ss_solution).real
# Now we have to insert back in the solution, the missing
# elements with zero probabilities
for index in sorted(removed_states):
ss_solution = np.insert(ss_solution, index, 0.0)
return ss_solution
[docs]def pops_from_nm_tmatrix(transition_matrix):
"""Computes the populations of the real/physical states
from a non-Markovian transtion matrix with shape (2*n_states, 2*n_states)
"""
check_tmatrix(transition_matrix, accept_null_rows=True)
size = len(transition_matrix)
if size % 2 != 0:
raise ValueError(
"The non-Markovian transition matrix has to "
"have an even number of columns/rows"
)
n_states = size // 2 # Real/physical microstates
pops_nm = pops_from_tmatrix(transition_matrix)
pops = np.zeros(n_states)
for i in range(n_states):
pops[i] = pops_nm[2 * i] + pops_nm[2 * i + 1]
return pops
[docs]def map_to_integers(sequence, mapping_dict=None):
"""Map a sequence of elements to a sequence of integers
for intance, maps [1, 'a', 1, 'b', 2.2] to [0, 1, 0, 2, 3]
"""
if mapping_dict is None:
mapping_dict = {}
new_sequence = np.zeros(len(sequence), dtype="int64")
counter = 0
for i, element in enumerate(sequence):
if element not in mapping_dict.keys():
mapping_dict[element] = counter
counter += 1
new_sequence[i] = mapping_dict[element]
return new_sequence, mapping_dict
[docs]def pseudo_nm_tmatrix(markovian_tmatrix, stateA, stateB):
"""Obtain a pseudo non-Markovian transition matrix from a Markovian transiton matrix
The pseudo Markovian matrix has a shape of (2 n_states, 2 n_states)
"""
check_tmatrix(markovian_tmatrix)
n_states = len(markovian_tmatrix)
# pseudo non-Markovian transition matrix
p_nm_tmatrix = np.zeros((2 * n_states, 2 * n_states))
for i in range(2 * n_states):
for j in range(2 * n_states):
p_nm_tmatrix[i, j] = markovian_tmatrix[i // 2, j // 2]
for i in range(n_states):
for j in range(n_states):
if (i in stateB) or (j in stateB):
p_nm_tmatrix[2 * i, 2 * j] = 0.0
if (i in stateA) or (j in stateA):
p_nm_tmatrix[2 * i + 1, 2 * j + 1] = 0.0
if (not (j in stateA)) or (i in stateA):
p_nm_tmatrix[2 * i + 1, 2 * j] = 0.0
if (not (j in stateB)) or (i in stateB):
p_nm_tmatrix[2 * i, 2 * j + 1] = 0.0
check_tmatrix(p_nm_tmatrix) # just in case
return p_nm_tmatrix