Source code for msm_we.utils

"""
Miscellaneous convenience functions

Some functionality was adapted from the original NMpathAnalysis package,
https://github.com/ZuckermanLab/NMpathAnalysis
"""
from copy import deepcopy
import numpy as np
import operator

from scipy.sparse import csr_matrix
import scipy.sparse as sparse

# used to check connectivity
import scipy.sparse.csgraph as csgraph
from scipy.sparse.sputils import isdense

from ._logging import log


[docs]def find_connected_sets(C, directed=True): r""" This implementation is taken from msmtools.estimation.sparse.connectivity, at commit 9312660. See the original at https://github.com/markovmodel/msmtools/blob/devel/msmtools/estimation/sparse/connectivity.py#L30 Compute connected components for a directed graph with weights represented by the given count matrix. Parameters ---------- C : scipy.sparse matrix or numpy ndarray square matrix specifying edge weights. directed : bool, optional Whether to compute connected components for a directed or undirected graph. Default is True. Returns ------- cc : list of arrays of integers Each entry is an array containing all vertices (states) in the corresponding connected component. """ if isdense(C): C = csr_matrix(C) M = C.shape[0] """ Compute connected components of C. nc is the number of components, indices contain the component labels of the states """ nc, indices = csgraph.connected_components( C, directed=directed, connection="strong" ) states = np.arange(M) # Discrete states """Order indices""" ind = np.argsort(indices) indices = indices[ind] """Order states""" states = states[ind] """ The state index tuple is now of the following form (states, indices)=([s_23, s_17,...,s_3, s_2, ...], [0, 0, ..., 1, 1, ...]) """ """Find number of states per component""" count = np.bincount(indices) """Cumulative sum of count gives start and end indices of components""" csum = np.zeros(len(count) + 1, dtype=int) csum[1:] = np.cumsum(count) """Generate list containing components, sort each component by increasing state label""" cc = [] for i in range(nc): cc.append(np.sort(states[csum[i] : csum[i + 1]])) """Sort by size of component - largest component first""" cc = sorted(cc, key=lambda x: -len(x)) return cc
[docs]def is_connected(matrix, source_states, target_states, directed=True): """ Check for connectivity between two states. If directed is True, then checks for directional connectivity from source_states to target_states. Parameters ---------- matrix: np.array, NxN Transition matrix source_states: array-like, N Source states target_states: array-like, N Target states directed: bool, default=True Compute directional connectivity Returns ------- bool """ return ( np.inf not in csgraph.shortest_path(matrix, directed=directed, indices=source_states)[ :, target_states ] )
[docs]def inverse_iteration(guess, matrix, mu=1): """ Do one iteration of inverse iteration. Parameters ---------- guess: array-like (N elements) Vector of weights to be used as the initial guess. matrix: array-like (NxN elements) Transition matrix to use for inverse iteration. Returns ------- The new vector of weights after one iteration of inverse iteration. """ # Looking for eigenvector corresponding to eigenvalue 1 identity = sparse.eye(guess.shape[0]) # Inverse try: inverse = sparse.linalg.inv(matrix.T - mu * identity) except RuntimeError as e: if not mu == 1: filename = "bad_matrix.npy" log.error( f"Inverse iteration still failed with mu={mu} -- examine your transition matrix for why it could " f"be unsolvable. Saving the transition matrix to {filename}." ) np.save(filename, matrix) raise e elif mu == 1: log.error( f"When solving steady-state, failed to perform inverse iteration! " f"Trying again with mu=0.999 instead of {mu}." ) return inverse_iteration(guess, matrix, mu=0.999) result = inverse @ guess result = result.squeeze() # Normalize result /= sum(result) return result
[docs]class Interval: """Intervals are in general defined as half-open interval [start,end). in any case, in each dimension the interval is specified using a list [a,b] where a < b - For 1D (single) interval a single list in the form [a,b] has to be given - The union of multiple (1D) intervals can be specified as: [[a,b],[c,d],...] - A list of lists [[a,b],[c,d],...] are used for a n-dimensional intervals, one for each dimension, i.e, len(interval) = n_variables - A list-of-lists-of-lists for the mathematical union of n-dimensional intervals' [ [[a,b],[c,d],...], [[e,f],[g,h],...], ... ] """ def __init__(self, interval_set, n_variables): self.interval_set = interval_set self.n_variables = n_variables def __contains__(self, item): shape = np.array(self.interval_set).shape len_shape = len(shape) if (self.n_variables == 1) and (len_shape == 1): # single 1D interval return self.interval_set[0] <= item < self.interval_set[1] elif (self.n_variables == 1) and ( len_shape == 2 ): # union of multiple 1D intervals return any( [(item in Interval(self.interval_set[i], 1)) for i in range(shape[0])] ) elif (self.n_variables > 1) and len_shape == 2: # n-dimensional interval return all( [ (item[i] in Interval(self.interval_set[i], 1)) for i in range(shape[0]) ] ) elif len(shape) == 3: # union of n-dimensional intervals return any( [ (item in Interval(self.interval_set[i], self.n_variables)) for i in range(shape[0]) ] ) else: raise Exception("The given interval has not the expected shape")
[docs]def reverse_sort_lists(list_1, list_2): """Reverse sorting two list based on the first one""" list_1_sorted, list_2_sorted = zip( *sorted(zip(list_1, list_2), key=operator.itemgetter(0), reverse=True) ) return list_1_sorted, list_2_sorted
[docs]def weighted_choice(list_, weights=None): """Select an element from a list with probability from weights""" size = len(list_) if weights is not None: assert size == len(weights) if weights is None: probs = np.array([1 / float(size) for i in range(size)]) else: probs = np.array(weights) / sum(weights) # just in case rand = np.random.random() _sum = 0 for i in range(size): if _sum <= rand < _sum + probs[i]: choice = i break else: _sum += probs[i] return list_[choice]
[docs]def get_shape(trajectory): """Get the shape of a trajectory array in tuple (n_snapshots, n_variables)""" shape = np.array(trajectory).shape if len(shape) == 1: n_snapshots = shape[0] n_variables = 1 if n_variables == 0: raise Exception( "The shape {} of the trajectory/array \ given is not as expected".format( shape ) ) elif len(shape) == 2: n_snapshots = shape[0] n_variables = shape[1] else: raise Exception( "The shape {} of the trajectory/array given is not \ as expected".format( shape ) ) return n_snapshots, n_variables
[docs]def num_of_nonzero_elements(my_vector): """Returns the number of non-zero elements in a vector""" counter = 0 for element in my_vector: if element != 0: counter += 1 return counter
[docs]def normalize_markov_matrix(transition_matrix, reversible=False): """Transform a matrix of positive elements to a markov-like matrix by dividing each row by the sum of the elements of the row. """ t_matrix = np.array(transition_matrix, dtype=np.float64) if reversible: t_matrix = t_matrix.T + t_matrix n_states = len(t_matrix) assert n_states == len(t_matrix[0]) for i in range(n_states): if (t_matrix[i, :] < 0).any(): raise ValueError( "All the elements in the input \ matrix must be non-negative" ) t_matrix[i, :] = normalize(t_matrix[i, :]) return t_matrix
[docs]def normalize(my_vector): """Normalize a vector by dividing each element by the total sum of all its elements """ my_vector = np.array(my_vector) size = len(my_vector) sum_ = sum(my_vector) if sum_ != 0.0: for i in range(size): my_vector[i] = my_vector[i] / sum_ return my_vector
[docs]def random_markov_matrix(n_states=5, seed=None): """Returns a random transition markov matrix""" if seed is not None: np.random.seed(seed) t_matrix = np.random.random((n_states, n_states)) return normalize_markov_matrix(t_matrix)
[docs]def check_tmatrix(t_matrix, accept_null_rows=True): """Check if the given matrix is actually a row-stochastic transition matrix i.e, all the elements are non-negative and the rows add to one. If the keyword argument accept_null_rows is True, is going to accept rows where all the elements are zero. Those "problematic" states are going to be removed later if necessary by clean_tmatrix. """ def value_error(): raise ValueError("The object given is not a transition matrix") n_states = len(t_matrix) if not (n_states == len(t_matrix[0])): value_error() for index, row in enumerate(t_matrix): sum_ = 0.0 for element in row: if element < 0.0: value_error() sum_ += element if accept_null_rows: if not (np.isclose(sum_, 1.0, atol=1e-6) or sum_ == 0.0): value_error() else: if not np.isclose(sum_, 1.0, atol=1e-6): value_error() return False
[docs]def clean_tmatrix(transition_matrix, rm_absorbing=True): """Removes the states/indexes with no transitions and that are absorbing if the the keyword argument rm_absorbing is true Returns the "clean" transition matrix and a list with the removed states/indexes (clean_tmatrix, removed_states) """ t_matrix = deepcopy(transition_matrix) n_states = len(transition_matrix) # Removing the non-visited states and absorbing states removed_states = [] for index in range(n_states - 1, -1, -1): if not any(t_matrix[index]): # non-visited t_matrix = np.delete(t_matrix, index, axis=1) t_matrix = np.delete(t_matrix, index, axis=0) removed_states.append(index) elif t_matrix[index, index] == 1.0: # absorbing state if not all( [t_matrix[index, j] == 0.0 for j in range(n_states) if j != index] ): raise ValueError( "The sum of the elements in a row of the \ transition matrix must be one" ) t_matrix = np.delete(t_matrix, index, axis=1) t_matrix = np.delete(t_matrix, index, axis=0) removed_states.append(index) # Renormalizing just in case t_matrix = normalize_markov_matrix(t_matrix) return t_matrix, removed_states
[docs]def pops_from_tmatrix(transition_matrix): """Calculate the eigen values and eigen vectors of the transposed transition matrix Parameters ---------- transition_matrix : ndarray with shape = (n_states, n_states) Returns ------- the solution, p, of K.T p = p where K.T is the transposed transition matrix """ check_tmatrix(transition_matrix) n_states = len(transition_matrix) # Cleaning the transition matrix cleaned_matrix, removed_states = clean_tmatrix(transition_matrix) # Computing eig_vals, eig_vecs = np.linalg.eig(cleaned_matrix.T) eig_vecs = eig_vecs.T # for convenience, now every row is an eig_vector eig_vals_close_to_one = np.isclose(eig_vals, 1.0, atol=1e-6) real_eig_vecs = [not np.iscomplex(row).any() for row in eig_vecs] new_n_states = n_states - len(removed_states) ss_solution = np.zeros(new_n_states) # steady-state solution for is_close_to_one, is_real, eigv in zip( eig_vals_close_to_one, real_eig_vecs, eig_vecs ): if ( is_close_to_one and is_real and num_of_nonzero_elements(eigv) > num_of_nonzero_elements(ss_solution) and ((eigv <= 0).all() or (eigv >= 0).all()) ): ss_solution = eigv if (ss_solution == 0.0).all(): raise Exception( "No steady-state solution found for \ the given transition matrix" ) ss_solution = normalize(ss_solution).real # Now we have to insert back in the solution, the missing # elements with zero probabilities for index in sorted(removed_states): ss_solution = np.insert(ss_solution, index, 0.0) return ss_solution
[docs]def pops_from_nm_tmatrix(transition_matrix): """Computes the populations of the real/physical states from a non-Markovian transtion matrix with shape (2*n_states, 2*n_states) """ check_tmatrix(transition_matrix, accept_null_rows=True) size = len(transition_matrix) if size % 2 != 0: raise ValueError( "The non-Markovian transition matrix has to " "have an even number of columns/rows" ) n_states = size // 2 # Real/physical microstates pops_nm = pops_from_tmatrix(transition_matrix) pops = np.zeros(n_states) for i in range(n_states): pops[i] = pops_nm[2 * i] + pops_nm[2 * i + 1] return pops
[docs]def map_to_integers(sequence, mapping_dict=None): """Map a sequence of elements to a sequence of integers for intance, maps [1, 'a', 1, 'b', 2.2] to [0, 1, 0, 2, 3] """ if mapping_dict is None: mapping_dict = {} new_sequence = np.zeros(len(sequence), dtype="int64") counter = 0 for i, element in enumerate(sequence): if element not in mapping_dict.keys(): mapping_dict[element] = counter counter += 1 new_sequence[i] = mapping_dict[element] return new_sequence, mapping_dict
[docs]def pseudo_nm_tmatrix(markovian_tmatrix, stateA, stateB): """Obtain a pseudo non-Markovian transition matrix from a Markovian transiton matrix The pseudo Markovian matrix has a shape of (2 n_states, 2 n_states) """ check_tmatrix(markovian_tmatrix) n_states = len(markovian_tmatrix) # pseudo non-Markovian transition matrix p_nm_tmatrix = np.zeros((2 * n_states, 2 * n_states)) for i in range(2 * n_states): for j in range(2 * n_states): p_nm_tmatrix[i, j] = markovian_tmatrix[i // 2, j // 2] for i in range(n_states): for j in range(n_states): if (i in stateB) or (j in stateB): p_nm_tmatrix[2 * i, 2 * j] = 0.0 if (i in stateA) or (j in stateA): p_nm_tmatrix[2 * i + 1, 2 * j + 1] = 0.0 if (not (j in stateA)) or (i in stateA): p_nm_tmatrix[2 * i + 1, 2 * j] = 0.0 if (not (j in stateB)) or (i in stateB): p_nm_tmatrix[2 * i, 2 * j + 1] = 0.0 check_tmatrix(p_nm_tmatrix) # just in case return p_nm_tmatrix