Source code for msm_we.fpt

"""
First-passage time (FPT) calculations from trajectories or matrices

Adapted from the original NMpathAnalysis package,
https://github.com/ZuckermanLab/NMpathAnalysis
"""
import numpy as np
from copy import deepcopy
import tqdm.auto as tqdm

import msm_we.utils as utils
from msm_we.utils import Interval


class DirectFPT:
    @classmethod
    def mean_fpts(
        cls,
        trajectories,
        stateA=None,
        stateB=None,
        discrete=True,
        n_variables=None,
        lag_time=1,
    ):
        """Empirical mean first passage times (MFPTs) calculation (no model
        involved) by tracing the trajectories. Notice the difference between
        notation between FPTs and MFPTs.

        Parameters:
        -----------
        trajectories:   List of trajectories [traj1, traj2, traj4], each trajectory
                        can be a one dimensional array, e.g.,
                            [[1,2,1, ...], [0,1,1, ...], ... ]
                        or a multidimensional array (matrix) where each column
                        represents the evolution of a variable.

                        Important: If a single trajectory is given as argument it
                        also has to be inside a list (e.g. [traj1])

        stateA, stateB: List of integers
                        If the trajectories are discrete (discrete = True), both
                        states are a list of indexes. However, if the trajectories
                        are not discrete, the states are "interval" objects
                        (see Interval class).

        lag_time:       integer
                        Lag time used, the trajectory is "observed" every lag_time
                        time steps

        discrete:       boolean
                        False when the trajectories are are not discrete. In that
                        case the macrostates stateA and stateB are considered
                        interval objects.

        n_variables:    integer
                        If the trajectory is space continuous,the number of
                        variables/dimensions is needed. In this case every
                        trajectory inside "trajectories" should have the same
                        number of dimensions.

        Returns
        -------
        A dictionary with the keys: 'mfptAB', 'std_err_mfptAB', 'mfptBA',
        'std_err_mfptBA' and the corresponding values. Those values are already
        multiplied by the lag_time used (not the physical units).
        """

        passage_timesAB, passage_timesBA, tb_values = cls.fpts(
            trajectories, stateA, stateB, discrete, n_variables, lag_time
        )
        n_AB = len(passage_timesAB)
        n_BA = len(passage_timesBA)

        if sum(passage_timesAB):
            mfptAB = float(sum(passage_timesAB)) / n_AB
            std_err_mfptAB = np.std(passage_timesAB) / np.sqrt(n_AB)
        else:
            print("WARNING: No A->B events observed")
            mfptAB = "NaN"
            std_err_mfptAB = "NaN"

        if sum(passage_timesBA):
            mfptBA = float(sum(passage_timesBA)) / n_BA
            std_err_mfptBA = np.std(passage_timesBA) / np.sqrt(n_BA)
        else:
            print("WARNING: No B->A events observed")
            mfptBA = "NaN"
            std_err_mfptBA = "NaN"

        kinetics = {
            "mfptAB": mfptAB,
            "std_err_mfptAB": std_err_mfptAB,
            "mfptBA": mfptBA,
            "std_err_mfptBA": std_err_mfptBA,
        }

        print("Number of A->B/B->A  events: {}/{}".format(n_AB, n_BA))

        return kinetics

    @classmethod
    def fpts(
        cls,
        trajectories,
        stateA=None,
        stateB=None,
        discrete=True,
        n_variables=None,
        lag_time=1,
    ):
        """Empirical first passage times (FPTs) calculation (no model involved)
        by tracing the trajectories. IMPORTANT: Notice the difference in notation
        between FPTs and MFPTs.

        Parameters:
        -----------
        trajectories:   List of trajectories [traj1, traj2, traj4], each trajectory
                        can be a one dimensional array, e.g.,
                            [[1,2,1, ...], [0,1,1, ...], ... ]
                        or a mutidimensional array (matrix) where each column
                        represents the evolution of a variable.

                        Important: If a single trajectory is given as argument it
                        also has to be inside a list (e.g. [traj1])

        stateA, stateB: List of integers
                        If the trajectories are discrete (discrete = True), both
                        states are a list of indexes. However, if the trajectories
                        are not discrete, the states are "interval" objects
                        (see Interval class).

        lag_time:       integer
                        Lag time used, the trajectory is "observed" every lag_time
                        time steps

        discrete:       boolean
                        False when the trajectories are are not discrete. In that
                        case the macrostates stateA and stateB are considered
                        interval objects.

        n_variables:    integer
                        If the trajectory is space continuous,the number of
                        variables/dimensions is needed. In this case every
                        trajectory inside "trajectories" should have the same
                        number of dimensions.

        Returns
        -------
        A tuple of two 1D-ndarray (array1, array2), the first one contains the
        observed first passage times A->B and the second one the FPTs B->A. Those
        values are already multiplied by the lag_time used (not the physical units)
        """

        if (stateA is None) or (stateB is None):
            raise Exception(
                "The final and initial states have " "to be defined to compute the MFPT"
            )

        if not discrete:
            """
            The states are considered/transformed-to intervals if the Ensemble
            is a set of continuous trajectories
            """
            if n_variables is None:
                raise Exception(
                    "In continuous trajectories the number of " "variables is needed"
                )

            stateA = Interval(stateA, n_variables)
            stateB = Interval(stateB, n_variables)

        passage_timesAB = []
        passage_timesBA = []
        tb_values = []

        for traj in trajectories:
            previous_color = "Unknown"
            tb_counter = 0  # event duration counter
            fpt_counter = 0  # first passage time counter
            for i in range(0, len(traj), lag_time):
                snapshot = traj[i]
                tb_counter += 1
                # state and color determination
                if snapshot in stateA:
                    color = "A"
                elif snapshot in stateB:
                    color = "B"
                else:
                    color = previous_color
                    tb_counter += 1

                # passage times
                if (color == "A") or (color == "B"):
                    fpt_counter += 1

                if previous_color == "A" and color == "B":
                    tb_values.append(tb_counter)
                    passage_timesAB.append(fpt_counter)
                    fpt_counter = 0
                elif previous_color == "B" and color == "A":
                    tb_values.append(tb_counter)
                    passage_timesBA.append(fpt_counter)
                    fpt_counter = 0
                elif previous_color == "Unknown" and (color == "A" or color == "B"):
                    fpt_counter = 0

                if (snapshot in stateA) or (snapshot in stateB):
                    tb_counter = 0

                previous_color = color

        passage_timesAB = np.array(passage_timesAB) * lag_time
        passage_timesBA = np.array(passage_timesBA) * lag_time

        return passage_timesAB, passage_timesBA, tb_values


[docs]class MatrixFPT: """Define a base class for calculating FPTs using transition matrix"""
[docs] @classmethod def mean_fpts(cls, tmatrix, stateA, stateB, lag_time=1): """Calculting mean-first passave time for at transition matrix Derived class will implement this function differently. """ pass
[docs] @classmethod def directional_mfpt( cls, transition_matrix, stateA, stateB, ini_probs=None, lag_time=1 ): """Computes the mean-first passage time in a single direction using a recursive procedure This method is useful when there is no B->A ensemble but only A->B transitions, for instance when B is absorbing. Parameters ---------- transition_matrix: Numpy 2D array stateA, stateB: List of integers Both states are a list of indexes. ini_probs: List of float, default is None initial probabilities in stateA lag_time: integer Lag time used, the trajectory is "observed" every lag_time time steps Returns ------- mean-first passage time from A->B """ lenA = len(stateA) lenB = len(stateB) if ini_probs is None: ini_probs = [1.0 / lenA for i in range(lenA)] t_matrix = deepcopy(transition_matrix) ini_state = list(stateA) f_state = sorted(list(stateB)) assert lenA == len(ini_probs) for i in range(lenB - 1, -1, -1): t_matrix = np.delete(t_matrix, f_state[i], axis=1) t_matrix = np.delete(t_matrix, f_state[i], axis=0) for j in range(lenA): if f_state[i] < ini_state[j]: ini_state[j] = ini_state[j] - 1 new_size = len(t_matrix) mfptAB = 0.0 m = np.zeros(new_size) idty = np.identity(new_size) c = np.array([1.0 for i in range(new_size)]) m = np.dot(np.linalg.inv(idty - t_matrix), c) for i in range(len(ini_state)): k = ini_state[i] mfptAB += ini_probs[i] * m[k] mfptAB = mfptAB / sum(ini_probs) return mfptAB * lag_time
[docs] @classmethod def mfpts_to_target_microstate(cls, transition_matrix, target, lag_time=1): """Computes all the mean-first passage to a target microstate (k) Returns a list where the i-element is mfpt(i->k). This function is useful to compute the mfpt matrix. Parameters ---------- transition_matrix: Numpy 2D array target: Integer number that specifies the index of the state. The indexes should be consistent with the transition matrix and python (i.e. starting from 0) lag_time: Integer Lag time used, the trajectory is "observed" every lag_time time steps Returns ------- a list where the i-element is mfpt(i->k). This function is useful to compute the mfpt matrix. """ t_matrix = deepcopy(transition_matrix) t_matrix = np.delete(t_matrix, target, axis=1) t_matrix = np.delete(t_matrix, target, axis=0) new_size = len(t_matrix) m = np.zeros(new_size) idty = np.identity(new_size) c = np.array([1.0 for i in range(new_size)]) m = np.dot(np.linalg.inv(idty - t_matrix), c) m = np.insert(m, target, 0.0) return m * lag_time
[docs] @classmethod def mfpts_matrix(cls, transition_matrix, lag_time=1): """Calculate MFPT matrix, i.e., the matrix where the ij-element is MFPT(i->j) Parameters ---------- transition_matrix: Numpy 2D array lag_time: Integer Lag time used, the trajectory is "observed" every lag_time time steps Returns ------- mean-first passage time matrix with ij-element of MFPT(i->j) """ size = len(transition_matrix) temp_values = [] for i in range(size): temp_values.append( cls.mfpts_to_target_microstate(transition_matrix, i, lag_time) ) mfpt_m = np.array(temp_values).T # to nummpy array and transposed return mfpt_m
[docs] @classmethod def min_commute_time(cls, matrix_of_mfpts): """Calculate minimum commuting time (round trip time) between all pairs of microstates from the matrix of mfpts. It also returns the indexes of the pair of microstates involved. Parameters ---------- matrix_of_mfpts: Numpy 2D array matrix of MFPTs with ij-element of MFPT(i->j) Returns ------- Minimum commuting time (round trip time) between all pairs of microstates and the indexes of the pair of microstates involved. """ matrix_of_mfpts = np.array(matrix_of_mfpts) n_states = len(matrix_of_mfpts) assert n_states == len(matrix_of_mfpts[0]) and n_states >= 2 # Initial values, arbitrary choice index_i = 0 index_j = 1 commute_times = matrix_of_mfpts + matrix_of_mfpts.T min_ct = commute_times[index_i, index_j] for i in range(n_states): for j in range(i + 1, n_states): if commute_times[i, j] < min_ct: min_ct = commute_times[i, j] index_i = i index_j = j return min_ct, index_i, index_j
[docs] @classmethod def max_commute_time(cls, matrix_of_mfpts): """Calculate maximum commuting time (round trip time) between all pairs of microstates from the matrix of mfpts. It also returns the indexes of the pair of microstates involved. Parameters ---------- matrix_of_mfpts: Numpy 2D array matrix of MFPTs with ij-element of MFPT(i->j) Returns ------- Maximum commuting time (round trip time) between all pairs of microstates and the indexes of the pair of microstates involved. """ matrix_of_mfpts = np.array(matrix_of_mfpts) n_states = len(matrix_of_mfpts) assert n_states == len(matrix_of_mfpts[0]) and n_states >= 2 # Initial values, arbitrary choice index_i = 0 index_j = 1 commute_times = matrix_of_mfpts + matrix_of_mfpts.T max_ct = commute_times[index_i, index_j] for i in range(n_states): for j in range(i + 1, n_states): if commute_times[i, j] > max_ct: max_ct = commute_times[i, j] index_i = i index_j = j return max_ct, index_i, index_j
[docs] @classmethod def fpt_distribution( cls, t_matrix, initial_state, final_state, initial_distrib, min_power=1, max_power=12, max_n_lags=100, lag_time=1, dt=1.0, clean_recycling=False, logscale=False, ): """Calculated distribution of first passage times from transition matrix Parameters ---------- t_matrix: Numpy 2D array initial_state, final_state: List of integer numbers Specifies the indexes of initial and final states. initial_distrib: List of float, default is None initial probabilities for initial states min_power, max_power: Integer The minimum and maximum power when the FPT distribution is shown in logscale such as (10^min_power, 10^max_power)*lag_time*dt. max_n_lags: Integer maximum number of lags when the FPT distribution is shown in linear scale such as (0, max_n_logs)*lag_time*dt. When in logscale, this is number of points to shown in the range of (10^min_power, 10^max_power)*lag_time*dt. lag_time: Integer Lag time used, the trajectory is "observed" every lag_time time steps dt: Float Time step clean_recycling: Bool Cleaning the recycling of steady state simulation if True logscale: Bool Option to use logscale for FPT time in the distribution Returns ------- Distributions of first passage times """ # copy everything since they are going to be modified tmatrix = np.copy(t_matrix) ini_state = list(initial_state) f_state = sorted(list(final_state)) assert len(ini_state) == len(initial_distrib) # Designate target state 0 as the only target state, # and adding all the fluxes into the other target states into state 0. # Namely reassign any flux into any target state into target state 0. tmatrix[:, f_state[0]] = np.sum(tmatrix[:, f_state], axis=1) # Remove all other target states for i in range(len(f_state) - 1, 0, -1): tmatrix = np.delete(tmatrix, f_state[i], axis=1) tmatrix = np.delete(tmatrix, f_state[i], axis=0) # For each initial state with a greater index than the target state we're cleaning. # Decrement the index by 1 to account for the removed, cleaned state. for j in range(len(ini_state)): if f_state[i] < ini_state[j]: ini_state[j] = ini_state[j] - 1 # Clean the recycling if necessary. # Get rid of recycling boundary conditions, otherwise we're getting a CDF if clean_recycling: tmatrix[f_state, :] = 0.0 tmatrix[f_state, f_state] = 0.0 # The new target state is the single state since all other target states have been reassigned to. f_state = f_state[0] new_n_states = len(tmatrix) list_of_pdfs = np.empty((len(ini_state), max_n_lags), dtype=np.float64) prevFmatrix = np.empty_like(tmatrix) # Option to set the list of lag time in logscale since FPT can be a wide distribution in several orders if logscale: lag_list = np.logspace(min_power, max_power, max_n_lags, dtype=int) else: lag_list = np.arange(0, max_n_lags, dtype=int) # for each ini_state calculate the FPT distribution from transition matrix for istateIndex in range(len(ini_state)): prevFmatrix = tmatrix.copy() Fmatrix = np.zeros((new_n_states, new_n_states)) list_of_pdfs[istateIndex, 0] = tmatrix[ini_state[istateIndex], f_state] cls.calc_fmatrix( Fmatrix, tmatrix, prevFmatrix, list_of_pdfs, lag_list, ini_state, istateIndex, f_state, ) # Nomalize the FPT distribution and output sum_ = np.sum(initial_distrib) initial_distrib = np.array(initial_distrib) density = np.sum(initial_distrib[:, None] * list_of_pdfs, axis=0) / sum_ dt2 = lag_time * dt if logscale: # For logscale the dts at different t are different, we need to let FPT(t) # absorb them. Otherwise we have to use dt in variable size to calculate mean # value such as integration of t*dt*FPT(t). dens_list = [[0, 0]] + [[lag_list[0] * dt2, density[0] * lag_list[0] / dt2]] for i in range(1, len(lag_list)): dens_list += [ [ lag_list[i] * dt2, density[i] * (lag_list[i] - lag_list[i - 1]) / dt2, ] ] density_vs_t = np.array(dens_list) else: density_vs_t = np.array( [[0, 0]] + [[(i + 1) * dt2, dens / dt2] for i, dens in zip(lag_list, density)] ) # normalized to 1 density_vs_t[:, 1] /= sum(density_vs_t[:, 1]) return density_vs_t
[docs] @staticmethod def adaptive_fpt_distribution( Tmatrix, initial_states, initial_state_probs, target_states, tau=1, increment=5, fine_increment=1.2, relevant_thresh=1e-4, max_steps=int(1e6), max_time=np.inf, explicit_renormalization=False, verbose=False, ): """ Adaptively computes a first-passage time distribution. Starting at t=tau, compute the probability flowing into the target at t. Then, increment t by multiplying it by the coarse increment. When relevant_thresh probability has entered the target state, step back to the previous coarse state, and swap over to incrementing with the fine increment. This allows you to efficiently sweep log-space. Procedurally, this starts probability in specified `initial_states` according to `initial_state_probs`, and then propagates that probability through the transition matrix. The FPT distribution is measured by tracking new probability entering the target state at each time. Note that absorbing boundary conditions are stripped from the transition matrix -- if this is not done, then the result is like a probability CDF, not a probability distribution. Parameters ---------- Tmatrix: array-like Transition matrix initial_states: array-like of ints List of initial states to start probability in initial_state_probs: array-like Probability distribution across the initial states. target_states: array-like Target states for MFPT. tau increment: float Multiplicative increment for coarse steps fine_increment: float Multiplicative increment for fine steps, once the minimum probability in the target has been reached. relevant_thresh: float Amount of probability that must be in the target before switching to fine increments. max_steps: int Maximum number of steps to run max_time: float Maximum time to run to explicit_renormalization: bool Whether to explicitly renormalize the transition matrix. This should not be necessary -- if it is, there's probably some numerical instability you should be careful of. verbose: bool Produce verbose text output. Returns ------- FPT distribution, probability distribution at each time, last step index, times at which FPT distribution was evaluated """ n_states = len(Tmatrix) all_probabilities = np.full(shape=(max_steps + 1, n_states), fill_value=np.nan) # The initial probability vector is zero except in the origin states, # which have their relative probabilities initial_probability = np.zeros(n_states) initial_probability[initial_states] = initial_state_probs initial_probability /= sum(initial_probability) all_probabilities[0] = initial_probability # Make the target states absorbing non_recycling_matrix = Tmatrix.copy() non_recycling_matrix[target_states, :] = 0.0 for target in target_states: non_recycling_matrix[target, target] = 1.0 # Track the probability that flowed into the target at each time probs = np.zeros(shape=max_steps) probs[0] = 0.0 # At each one of our timesteps, track the amount of flux that entered the target last_step = 1 get_next_step = lambda x: x * increment in_relevant_region = False steps = [1] with tqdm.tqdm(total=1) as pbar: for i in range(max_steps - 1): this_step = int(get_next_step(last_step)) if this_step <= last_step: this_step = int(last_step + 1) matrix_next = np.linalg.matrix_power(non_recycling_matrix, this_step) if explicit_renormalization: matrix_next = matrix_next / np.sum(matrix_next, axis=1) probability = initial_probability @ matrix_next if explicit_renormalization: probability /= sum(probability) # Check if we're just starting to get any probability if ( i > 0 and not in_relevant_region and (sum(probability[target_states]) - sum(probs[: i + 1])) > relevant_thresh ): if verbose: print( f"*** Entered relevant region at step {this_step}. " f"Swapping to fine-grained, and taking a step back to {this_step / increment}." ) # If so, then change our increment to finer resolution # TODO: Would be cool to do something like as the probability increases, # continue scaling down to some minimum increment in_relevant_region = True this_step /= increment steps.append(this_step) all_probabilities[i + 1] = all_probabilities[i] probs[i + 1] = probs[i] get_next_step = lambda x: x * fine_increment if verbose: print( f"Current time is {this_step}, time step will be {get_next_step(this_step)}" ) continue steps.append(this_step) all_probabilities[i + 1] = probability # The amount that flowed INTO the target is the probability that's flowed in since the last t if i == 0: # In the first iteration, all the probability into the target just got there probs[i + 1] = sum(probability[target_states]) else: # After the first, it's the amount that's there now minus the total amount that entered up until now probs[i + 1] = sum(probability[target_states]) - sum(probs[: i + 1]) pbar.update(probs[i + 1]) # Check if we're done (i.e., all our probability has flowed into the target, none left.) if np.isclose(sum(probs), 1): # if np.isclose(probs[i+1], 1): print( f"*** All probability reached the target at time {this_step}" ) break if this_step > max_time: print( "*** Max steps reached, before all probability flowed into target." ) break last_step = this_step print(f"Finished in {i} steps") print( f"By the last time, {sum(probs[:i])} probability has reached the target. (This should be 1!)" ) times = np.array(steps, dtype=float) * float(tau) return probs[: i + 2], all_probabilities[: i + 2], i, times
@classmethod def calc_fmatrix( cls, Fmatrix, tmatrix, prevFmatrix, list_of_pdfs, lag_list, ini_state, istateIndex, f_state, ): # Calculate FPT distribution from a the recursive formula, Eq. 3 in the paper below: # E. Suarez, A. J. Pratt, L. T. Chong, D. M. Zuckerman, Protein Science 26, 67-78 (2016). for time_index, time in enumerate(lag_list): # obtain the new transition matrix from time_index-1 to time_index if time_index == 0: tmatrix_new = np.linalg.matrix_power(tmatrix, time) else: tmatrix_new = np.linalg.matrix_power( tmatrix, time - lag_list[time_index - 1] ) Fmatrix = np.dot(tmatrix_new, prevFmatrix - np.diag(np.diag(prevFmatrix))) list_of_pdfs[istateIndex, time_index] = Fmatrix[ ini_state[istateIndex], f_state ] prevFmatrix = Fmatrix
[docs]class MarkovFPT(MatrixFPT): """Derived a class for calculating FPTs using Markovian transition matrix"""
[docs] @classmethod def mean_fpts(cls, markov_tmatrix, stateA, stateB, lag_time=1): """Computes mean first passage times using Markovian transition matrix in both directions A->B and B->A from a markov model. The MFPTs computed in this way are directly comparable with the values obtained by a long back and forth simulation between the target states. Parameters ---------- markov_matrix: Numpy 2D array Markovian transition matrix stateA, stateB: List of integers Both states are a list of indexes. lag_time: integer Lag time used, the trajectory is "observed" every lag_time time steps Returns ------- mean-first passage times from A->B and B->A """ auxiliar_matrix = utils.pseudo_nm_tmatrix(markov_tmatrix, stateA, stateB) # Is going to return a Markovian mfpt since the auxiliar # matrix was build from a pure Markovian matrix return NonMarkovFPT.mean_fpts(auxiliar_matrix, stateA, stateB, lag_time)
[docs] @classmethod def markov_commute_time(cls, transition_matrix, stateA, stateB, lag_time=1): """Computing commute time for Markovian Model Parameters ---------- transition_matrix: Numpy 2D array Markovian transition matrix stateA, stateB: List of integers Both states are a list of indexes. lag_time: integer Lag time used, the trajectory is "observed" every lag_time time steps Returns ------- Commute time from mean-first passage times """ mfpts = cls.mean_fpts(transition_matrix, stateA, stateB, lag_time) return mfpts["mfptAB"] + mfpts["mfptBA"]
[docs]class NonMarkovFPT(MatrixFPT): """Derived a class for calculating FPTs using Non Markov transition matrix"""
[docs] @classmethod def mean_fpts(cls, nm_transition_matrix, stateA, stateB, lag_time=1): """Computes the mean first passage times from a non-markovian model in both directions of A->B and B->A. The shape of the transition matrix should be (2*n_states, 2*n_states). Parameters ---------- nm_transition_matrix: Numpy 2D array Non-Markovian transition matrix stateA, stateB: List of integers Both states are a list of indexes. lag_time: integer Lag time used, the trajectory is "observed" every lag_time time steps Returns ------- mean-first passage times from A->B and B->A """ utils.check_tmatrix(nm_transition_matrix) labeled_pops = utils.pops_from_tmatrix(nm_transition_matrix) n_states = len(labeled_pops) // 2 fluxAB = 0 fluxBA = 0 for i in range(0, 2 * n_states, 2): for j in range(2 * n_states): if int(j / 2) in stateB: fluxAB += labeled_pops[i] * nm_transition_matrix[i, j] for i in range(1, 2 * n_states + 1, 2): for j in range(2 * n_states): if int(j / 2) in stateA: fluxBA += labeled_pops[i] * nm_transition_matrix[i, j] pop_colorA = 0.0 pop_colorB = 0.0 for i in range(0, 2 * n_states, 2): pop_colorA += labeled_pops[i] for i in range(1, 2 * n_states + 1, 2): pop_colorB += labeled_pops[i] if fluxAB == 0: mfptAB = float("inf") else: mfptAB = pop_colorA / fluxAB if fluxBA == 0: mfptBA = float("inf") else: mfptBA = pop_colorB / fluxBA mfptAB *= lag_time mfptBA *= lag_time return dict(mfptAB=mfptAB, mfptBA=mfptBA)