"""haMSM estimation and analysis"""
from __future__ import division, print_function
__metaclass__ = type
import numpy as np
import tqdm.auto as tqdm
from functools import partialmethod
import concurrent
import multiprocessing as mp
from copy import deepcopy
import mdtraj as md
from rich.live import Live
from rich.table import Table
from rich.console import Group
import ray
from sklearn.utils._openmp_helpers import _openmp_effective_n_threads
# My mixin structure may be a little strange here, because these aren't really mixins that are meant to be
# reused, it's just to break these out.
# Alternatively, I think I could drop the functions straight in those modules without the mixin classes,
# and within my modelWE definition just do (for example)
# class modelWE:
# from ._clustering import *
from ._hamsm import ClusteringMixin
from ._hamsm import DimensionalityReductionMixin
from ._hamsm import PlottingMixin
from ._hamsm import AnalysisMixin
from ._hamsm import DataMixin
from ._hamsm import FluxMatrixMixin
from ._logging import log, DefaultProgress, ProgressBar
[docs]class modelWE(
ClusteringMixin,
DimensionalityReductionMixin,
PlottingMixin,
AnalysisMixin,
DataMixin,
FluxMatrixMixin,
):
"""
History-augmented Markov state model estimation from WE data
Implementation of haMSM model building, particularly for steady-state estimation
from recycling WE sampling with basis (source) and target (sink) states.
Set up for typical west.h5 file structure, with coordinates to be stored in west.h5 /iterations/auxdata/coord and
basis and target definitions in progress coordinate space.
References
--------
Copperman and Zuckerman,
*Accelerated estimation of long-timescale kinetics by combining weighted ensemble simulation with Markov model
microstategs using non-Markovian theory*, **arXiv** (2020).
"""
class BlockValidationError(Exception):
pass
def __init__(self):
"""
Todo
----
- Document all of these attributes
- Reorganize these attributes into some meaningful structure
"""
# TODO: In general, it's not clear if this needs to be strictly 1...
# However, oversubscribing causes very difficult to diagnose problems
# (like hanging during clustering / k-means fitting), and 1 seems to be safe.
if not _openmp_effective_n_threads() == 1:
log.critical(
"Set $OMP_NUM_THREADS=1 for proper msm-we functionality! "
"Other values may cause strange problems such as silent hanging during "
"discretization or ray-parallel steps."
)
self.modelName = None
"""str: Name used for storing files"""
self.n_lag = 0
self.pcoord_ndim = None
"""int: Number of dimensions in the progress coordinate"""
self.pcoord_len = None
"""int: Number of stored progress coordinates for each iteration, per-segment."""
self.tau = None
"""float: Resampling time for weighted ensemble. (Maybe should be int? Units?)"""
self.WEtargetp1_min = None
self.WEtargetp1_max = None
"""float: Progress coordinate value at target state.optimization flow
Used to check if a progress coord is in the target, and to set the RMSD of the target cluster when cleaning the
fluxmatrix."""
self.target_bin_center = None
self._WEtargetp1_bounds = None
self.WEbasisp1_min = None
"""float: Minimum progress coordinate value within basis state.
Used to check if a progress coord is in the basis, and to set the RMSD of the basis cluster when cleaning the
fluxmatrix."""
self.WEbasisp1_max = None
"""float: Maximum progress coordinate value within basis state.
Used to check if a progress coord is in the basis, and to set the RMSD of the basis cluster when cleaning the
fluxmatrix."""
self.basis_bin_center = None
self._WEbasisp1_bounds = None
self._basis_pcoord_bounds = None
self._target_pcoord_bounds = None
self.reference_structure = None
self.reference_coord = None
self.basis_structure = None
# TODO: This is plural, reference_coord is singular. Intentional? Can you have multiple bases but 1 reference?
self.basis_coords = None
self.nAtoms = None
self.coordinates = None
self.ndim = None
self.targetRMSD_centers = None
"""array-like: List of RMSDs corresponding to each cluster."""
self.indBasis = None
self.removed_clusters = []
self.cluster_structures = None
self.cluster_structure_weights = None
"""dict: Mapping of cluster indices to structures in that cluster"""
self.clustering_method = None
self.validation_models = []
self.pcoord_shape_warned = False
self.pre_discretization_model = None
# self.progress_bar = Progress()
[docs] def initialize(
# self, fileSpecifier: str, refPDBfile: str, initPDBfile: str, modelName: str
self,
fileSpecifier: str,
refPDBfile: str,
modelName: str,
basis_pcoord_bounds: list = None,
target_pcoord_bounds: list = None,
dim_reduce_method: str = "none",
tau: float = None,
pcoord_ndim: int = 1,
auxpath: str = "coord",
_suppress_boundary_warning=False,
use_weights_in_clustering=False,
):
"""
Initialize the model-builder.
Parameters
----------
fileSpecifier : list
List of paths to H5 files to analyze.
refPDBfile : string
Path to PDB file that defines topology.
modelName : string
Name to use in output filenames.
basis_pcoord_bounds: list
List of [[pcoord0 lower bound, pcoord1 upper bound], [pcoord1 lower bound, pcoord1 upper bound], ...]
in pcoord-space for the basis state
target_pcoord_bounds: list
List of [[pcoord0 lower bound, pcoord1 upper bound], [pcoord1 lower bound, pcoord1 upper bound], ...]
in pcoord-space for the target state
dim_reduce_method: str, default="none"
Dimensionality reduction method. "pca", "vamp", or "none".
tau: float
Resampling time (i.e. time of 1 WE iteration). Used to map fluxes to physical times.
pcoord_ndim: int, default=1
Defaults to 1. Dimensionality of progress coordinates.
auxpath: str, default="coord"
Augmented coordinates used for MSM construction are stored in west.h5 under auxdata/auxpath
Returns
-------
None
Todo
----
This should probably just be merged into the constructor.
"""
log.debug("Initializing msm_we model")
self.modelName = modelName
if type(fileSpecifier) is list:
fileList = fileSpecifier
elif type(fileSpecifier) is str:
fileList = fileSpecifier.split(" ")
log.warning(
"HDF5 file paths were provided in a string -- this is now deprecated, please pass as a list "
"of paths."
)
self.pcoord_ndim = pcoord_ndim
self.pcoord_len = 2
if basis_pcoord_bounds is None:
if not _suppress_boundary_warning:
log.warning(
"No basis coord bounds provided to initialize(). "
"You can manually set this for now, but that will be deprecated eventually."
)
else:
self.basis_pcoord_bounds = basis_pcoord_bounds
self._target_pcoord_bounds = None
if target_pcoord_bounds is None:
if not _suppress_boundary_warning:
log.warning(
"No target coord bounds provided to initialize(). "
"You can manually set this for now, but that will be deprecated eventually."
)
else:
log.debug("Setting basis pcoord bounds")
# self.WEtargetp1_bounds = target_pcoord_bounds
self.target_pcoord_bounds = target_pcoord_bounds
# self._basis_pcoord_bounds = None
self.auxpath = auxpath
self.fileList = fileList
self.n_data_files = len(fileList)
#####
if tau is None:
log.warning("No tau provided, defaulting to 1.")
tau = 1.0
self.tau = float(tau)
# This is really only used for nAtoms
self.set_topology(refPDBfile)
# self.set_basis(initPDBfile)
if dim_reduce_method is None:
log.warning(
"No dimensionality reduction method provided to initialize(). Defaulting to pca."
"You can manually set this for now, but that will be deprecated eventually."
)
self.dimReduceMethod = "pca"
else:
self.dimReduceMethod = dim_reduce_method
try:
self.load_iter_data(1)
self.load_iter_coordinates0()
self.coordsExist = True
# Nothing is raised here because this might be fine, depending on what you're doing.
except KeyError:
if not _suppress_boundary_warning:
log.warning("Model initialized, but coordinates do not exist yet.")
self.coordsExist = False
self.use_weights_in_clustering = use_weights_in_clustering
log.debug("msm_we model successfully initialized")
@property
def WEbasisp1_bounds(self):
return self.basis_pcoord_bounds
@WEbasisp1_bounds.setter
def WEbasisp1_bounds(self, bounds):
"""
Set the boundaries for the basis state in pcoord1, and also set the bin center based on those.
Parameters
----------
bounds
"""
log.warning(
"WEbasisp1_bounds is a deprecated attribute. "
"Set pcoord boundaries using basis_pcoord_bounds instead"
)
self.basis_pcoord_bounds = bounds
@property
def basis_pcoord_bounds(self):
return self._basis_pcoord_bounds
@basis_pcoord_bounds.setter
def basis_pcoord_bounds(self, bounds):
"""
Set the boundaries for the basis state in an arbitrary dimension pcoord space.
Parameters
----------
bounds: array-like, (pcoord_ndim x 2)
Array of [lower, upper] bounds for each progress coordinate.
"""
bounds = np.array(bounds)
# In case it's a 1D pcoord provided as a simple list [min, max],
# reshape it to be consistent with N-D pcoord boundaries as [[min, max]]
if len(bounds.shape) == 1:
log.warning(
"Please provide 1-D boundaries as a list of lists or 2-D array"
" [[lower bound, upper bound]]. Automatically doing conversion for now."
)
bounds = np.reshape(bounds, (1, 2))
assert bounds.shape == (
self.pcoord_ndim,
2,
), f"Shape of bounds was {bounds.shape}, should've been ({self.pcoord_ndim}, 2)"
assert np.all(
[bound[0] < bound[1] for bound in bounds]
), "A boundary has a lower bound larger than its upper bound"
self._basis_pcoord_bounds = bounds
basis_bin_centers = np.full(self.pcoord_ndim, fill_value=np.nan)
for i, (bound_min, bound_max) in enumerate(bounds):
# If neither of the bin boundaries are infinity, then the bin center is their mean.
if not abs(bound_min) == np.inf and not abs(bound_max) == np.inf:
basis_bin_centers[i] = np.mean([bound_min, bound_max])
# If one of them IS infinity, their "bin center" is the non-infinity one.
else:
# Janky indexing, if p1_max == inf then that's True, and True == 1 so it picks the second element
basis_bin_centers[i] = [bound_min, bound_max][abs(bound_min) == np.inf]
self.basis_bin_centers = basis_bin_centers
@property
def n_lag(self):
return self._n_lag
@n_lag.setter
def n_lag(self, lag):
if not lag == 0:
raise NotImplementedError(
"Only a lag of 1 tau (n_lag = 0) is currently supported"
)
else:
self._n_lag = lag
# TODO: Deprecate this for an N-dimensional version
@property
def WEtargetp1_bounds(self):
return self.target_pcoord_bounds
# TODO: Deprecate this for an N-dimensional version
@WEtargetp1_bounds.setter
def WEtargetp1_bounds(self, bounds):
"""
Set the boundaries for the target state in pcoord1, and also set the bin center based on those.
Parameters
----------
bounds
"""
if None in bounds:
raise Exception("A target boundary has not been correctly provided")
log.warning(
"WEbasisp1_bounds is a deprecated attribute. "
"Set pcoord boundaries using basis_pcoord_bounds instead"
)
self.target_pcoord_bounds = bounds
def progress_disable(self):
"""Disable all progress bars"""
tqdm.tqdm.__init__ = partialmethod(tqdm.tqdm.__init__, disable=True)
def progress_enable(self):
"""Enable all progress bars"""
tqdm.tqdm.__init__ = partialmethod(tqdm.tqdm.__init__, disable=False)
@property
def target_pcoord_bounds(self):
return self._target_pcoord_bounds
@target_pcoord_bounds.setter
def target_pcoord_bounds(self, bounds):
"""
Set the boundaries for the target state in an arbitrary dimension pcoord space.
Parameters
----------
bounds: array-like, (pcoord_ndim x 2)
Array of [lower, upper] bounds for each progress coordinate.
"""
bounds = np.array(bounds)
# In case it's a 1D pcoord provided as a simple list [min, max],
# reshape it to be consistent with N-D pcoord boundaries as [[min, max]]
if len(bounds.shape) == 1:
log.warning(
"Please provide 1-D boundaries as a list of lists or 2-D array"
" [[lower bound, upper bound]]. Automatically doing conversion for now."
)
bounds = np.reshape(bounds, (1, 2))
assert bounds.shape == (
self.pcoord_ndim,
2,
), "Shape of bounds was {bounds.shape}, should've been ({self.pcoord_ndim}, 2)"
assert np.all(
[bound[0] < bound[1] for bound in bounds]
), "A boundary has a lower bound larger than its upper bound"
self._target_pcoord_bounds = bounds
target_bin_centers = np.full(self.pcoord_ndim, fill_value=np.nan)
for i, (bound_min, bound_max) in enumerate(bounds):
# If neither of the bin boundaries are infinity, then the bin center is their mean.
if not abs(bound_min) == np.inf and not abs(bound_max) == np.inf:
target_bin_centers[i] = np.mean([bound_min, bound_max])
# If one of them IS infinity, their "bin center" is the non-infinity one.
else:
# Janky indexing, if p1_max == inf then that's True, and True == 1 so it picks the second element
target_bin_centers[i] = [bound_min, bound_max][abs(bound_min) == np.inf]
self.target_bin_centers = target_bin_centers
@staticmethod
def check_connect_ray():
assert ray.is_initialized(), (
"Ray cluster has not been initialized! "
"Launch from the code calling this with ray.init()."
)
resources = ray.cluster_resources()
try:
log.debug(f"Using Ray cluster with {resources['CPU']} CPUs!")
except KeyError as e:
log.error(f"Total cluster resources were {resources}")
log.error(f"However, available resources are {ray.available_resources()}")
raise e
def is_WE_basis(self, pcoords):
"""
Checks if the input progress coordinates are in the basis state.
Parameters
----------
pcoords : numpy.ndarray(num_segments, num_pcoords)
Array of progress coordinates for each segment.
Returns
-------
True or False : bool
Todo
----
This only checks the 0th progress coordinate
"""
in_basis = np.full_like(pcoords, fill_value=np.nan)
for pcoord_dimension in range(self.pcoord_ndim):
in_basis[:, pcoord_dimension] = np.logical_and(
pcoords[:, pcoord_dimension]
> self.basis_pcoord_bounds[pcoord_dimension, 0],
pcoords[:, pcoord_dimension]
< self.basis_pcoord_bounds[pcoord_dimension, 1],
)
in_basis = np.all(in_basis, axis=1)
return in_basis
def is_WE_target(self, pcoords):
"""
Checks if the input progress coordinates are in the target state.
Parameters
----------
pcoords : numpy.ndarray(num_segments, num_pcoords)
Array of progress coordinates for each segment.
Returns
-------
True or False : bool
Todo
----
This only checks the 0th progress coordinate
This also assumes you need a small pcoord!
"""
in_target = np.full_like(pcoords, fill_value=np.nan)
for pcoord_dimension in range(self.pcoord_ndim):
in_target[:, pcoord_dimension] = np.logical_and(
pcoords[:, pcoord_dimension]
> self.target_pcoord_bounds[pcoord_dimension, 0],
pcoords[:, pcoord_dimension]
< self.target_pcoord_bounds[pcoord_dimension, 1],
)
in_target = np.all(in_target, axis=1)
return in_target
@staticmethod
def do_step(table, row, step, args=[], kwargs={}, in_subprocess=False):
step_text = table.columns[1]._cells[row]
table.columns[0]._cells[row] = "[bold black][ [bold yellow]* [bold black]]"
table.columns[1]._cells[row] = f"[bold black]{step_text}"
try:
if not in_subprocess:
step(*args, **kwargs)
else:
# print(f"Calling {step} with args={args} and kwargs={kwargs}")
with concurrent.futures.ProcessPoolExecutor(
max_workers=1, mp_context=mp.get_context("fork")
) as executor:
executor.submit(step, *args, **kwargs).result()
except Exception as e:
table.columns[0]._cells[row] = "[bold black] [[bold red]x[bold black]]"
table.columns[1]._cells[row] = f"[black]{step_text}"
table.columns[2]._cells[row] = f"{getattr(e, 'message', repr(e))}"
raise e
table.columns[0]._cells[row] = "[bold black] [[bold green]✓[bold black]]"
table.columns[1]._cells[row] = f"[black]{step_text}"
@staticmethod
def set_note(table, row, text):
table.columns[2]._cells[row] = text
@staticmethod
def new_table():
table = Table(title="haMSM Progress")
table.add_column("Status")
table.add_column("Step")
table.add_column("Notes")
steps = [
"Ray initialization",
"Model initialization",
"Loading iterations",
"Loading coordinates",
"Computing dimensionality reduction",
"Clustering",
"Flux matrix",
"Cleaning",
"Transition matrix",
"Steady-state distribution",
"Steady-state target flux",
"Cross-validation",
]
for step in steps:
table.add_row(" [ ]", f"{step}", "")
return table
[docs] def build_analyze_model(
self,
file_paths,
ref_struct,
modelName,
basis_pcoord_bounds,
target_pcoord_bounds,
dimreduce_method,
tau,
n_clusters,
ray_kwargs={},
max_coord_iter=-1,
stratified=True,
streaming=True,
use_ray=True,
fluxmatrix_iters=[1, -1],
fluxmatrix_iters_to_use=None,
cross_validation_groups=2,
cross_validation_blocks=4,
show_live_display=True,
allow_validation_failure=False,
step_kwargs={},
):
"""
One-shot function to build the model and analyze all at once. This provides a convenient interface for running
the blockwise estimation.
This may not be desirable for very long workflows, or workflows still being debugged, where it might make sense
to run the individual steps one by one.
Parameters
----------
file_paths: list
List of paths to H5 files to analyze.
ref_struct: string
Path to PDB file that defines topology.
modelName: string
Name to use in output filenames.
basis_pcoord_bounds: list
List of [[pcoord0 lower bound, pcoord1 upper bound], [pcoord1 lower bound, pcoord1 upper bound], ...]
in pcoord-space for the basis state
target_pcoord_bounds: list
List of [[pcoord0 lower bound, pcoord1 upper bound], [pcoord1 lower bound, pcoord1 upper bound], ...]
in pcoord-space for the target state
dimreduce_method: str
Dimensionality reduction method. "pca", "vamp", or "none".
tau: float
Resampling time (i.e. time of 1 WE iteration). Used to map fluxes to physical times.
n_clusters: int
Number of clusters to use when clustering. This is clusters per bin for stratified, or total clusters for
aggregate.
ray_kwargs: dict
Keyword arguments passed to ray.init(). Useful for specifying num_cpus. You could also use this to connect
to an existing Ray cluster.
max_coord_iter: int, optional (Default = model.maxIter, so all)
Last iteration to obtain coordinates from. Useful for excluding the end of some data.
stratified: bool, optional (Default = True)
Enables stratified clustering, where clustering is performed independently within each WE bin.
streaming: bool, optional (Default = True)
Enables streaming over input data, rather than batch processing. Substantially improves memory efficiency,
at a potential small performance hit.
use_ray: bool, optional (Default = True)
Enable parallelization, using Ray. This provides substantial speedup in discretization and fluxmatrix
calculations.
fluxmatrix_iters: list, optional (Default = [1, -1])
List of [first, last] iteration to use when calculating fluxmatrix. Defaults to using all iterations.
fluxmatrix_iters_to_use: list, optional (Default = None)
Specific range of iterations to use, as opposed to bounds like fluxmatrix_iters. Note that either this OR
fluxmatrix_iters
cross_validation_groups: int, optional (Default = 2)
Number of independent models to build when doing cross-validation. Each group contains (blocks / groups)
blocks.
cross_validation_blocks: int, optional (Default = 4)
Number of blocks to split your data into, before building the independent models.
show_live_display
allow_validation_failure: bool, optional (Default = False)
If True, then raise a warning but don't fail if construction of a cross-validation model fails.
Returns
-------
"""
progress_bar = DefaultProgress()
table = self.new_table()
renderable_group = Group(table, progress_bar)
# Clean up any existing Ray instances
if use_ray:
ray.shutdown()
with Live(
renderable_group, refresh_per_second=10, auto_refresh=show_live_display
) as live:
# If live updating was disabled, write to the table once now. (Doesn't do anything if it was enabled)
live.refresh()
model = self
# # Launch Ray
# TODO: Do I actually want to do this in here? Previously, I assumed the user would set it up.
step_idx = 0
if use_ray:
table.columns[0]._cells[
0
] = "[bold black][ [bold yellow]* [bold black]]"
table.columns[1]._cells[0] = "Ray initialization"
table.columns[2]._cells[0] = ""
log.info(f"Initializing Ray cluster with keywords {ray_kwargs}")
self.do_step(table, step_idx, ray.init, kwargs=ray_kwargs)
self.set_note(
table, step_idx, f"{ray.available_resources()['CPU']} CPUs"
)
log.info(f"Initialized Ray with resources {ray.available_resources()}")
# # Initialize model
step_idx += 1
self.do_step(
table,
step_idx,
step=model.initialize,
kwargs={
"fileSpecifier": file_paths,
"refPDBfile": ref_struct,
"modelName": modelName,
"basis_pcoord_bounds": basis_pcoord_bounds,
"target_pcoord_bounds": target_pcoord_bounds,
"dim_reduce_method": dimreduce_method,
"tau": tau,
**step_kwargs.get("initialize", {}),
},
)
self.set_note(table, step_idx, "")
# # Get number of iterations
step_idx += 1
self.do_step(
table,
step_idx,
step=model.get_iterations,
)
self.set_note(table, step_idx, f"{model.maxIter} iterations exist")
# # Load coordinates
step_idx += 1
_max_coord_iter = [max_coord_iter, model.maxIter][max_coord_iter == -1]
self.do_step(
table,
step_idx,
step=model.get_coordSet,
args=[_max_coord_iter],
kwargs={
"progress_bar": progress_bar,
},
)
self.set_note(
table, step_idx, f"Got coords for {_max_coord_iter} iterations"
)
# # Dimensionality reduction
step_idx += 1
self.set_note(table, step_idx, f"Method: {model.dimReduceMethod}")
self.do_step(
table,
step_idx,
step=model.dimReduce,
kwargs={
"progress_bar": progress_bar,
**step_kwargs.get("dimReduce", {}),
},
)
# # Clustering
step_idx += 1
self.do_step(
table,
step_idx,
step=model.cluster_coordinates,
kwargs={
"n_clusters": n_clusters,
"streaming": streaming,
"use_ray": use_ray,
"stratified": stratified,
"store_validation_model": cross_validation_groups > 0,
"progress_bar": progress_bar,
**step_kwargs.get("clustering", {}),
},
)
# # Flux matrix
step_idx += 1
_fluxmatrix_iters = fluxmatrix_iters
if fluxmatrix_iters[1] == -1:
_fluxmatrix_iters[1] = model.maxIter
self.do_step(
table,
step_idx,
step=model.get_fluxMatrix,
kwargs={
"n_lag": 0,
"first_iter": _fluxmatrix_iters[0],
"last_iter": _fluxmatrix_iters[1],
"iters_to_use": fluxmatrix_iters_to_use,
"use_ray": use_ray,
"progress_bar": progress_bar,
**step_kwargs.get("fluxmatrix", {}),
},
)
self.set_note(
table,
step_idx,
f"Fluxmatrix built from iters "
f"{[f'{_fluxmatrix_iters[0]} - {_fluxmatrix_iters[1]}', fluxmatrix_iters_to_use][fluxmatrix_iters_to_use is not None]}",
)
# # Cleaning
step_idx += 1
original_clusters = model.fluxMatrixRaw.shape[0]
self.do_step(
table,
step_idx,
step=model.organize_fluxMatrix,
kwargs={
"use_ray": use_ray,
"progress_bar": progress_bar,
**step_kwargs.get("organize", {}),
},
)
final_clusters = model.fluxMatrix.shape[0]
self.set_note(
table,
step_idx,
f"{original_clusters} clusters cleaned to {final_clusters}",
)
# # Transition matrix
step_idx += 1
self.do_step(table, step_idx, step=model.get_Tmatrix)
# # Steady state
step_idx += 1
self.do_step(table, step_idx, step=model.get_steady_state)
# # Steady-state flux
step_idx += 1
self.do_step(table, step_idx, step=model.get_steady_state_target_flux)
self.set_note(table, step_idx, f"Target flux: {model.JtargetSS:.2e}")
# # Cross-validation
# TODO
# use `post_cluster_model`, which is the discretized model right before the fluxmatrix was calculated
# for now just do something useless w/ the post_cluster_model so linter doesn't complain
step_idx += 1
if cross_validation_groups > 0:
try:
self.do_step(
table,
step_idx,
step=model.do_block_validation,
kwargs={
"cross_validation_groups": cross_validation_groups,
"cross_validation_blocks": cross_validation_blocks,
"use_ray": use_ray,
"progress_bar": progress_bar,
**step_kwargs.get("block_validation", {}),
},
)
except Exception as e:
log.error(e)
if not allow_validation_failure:
raise e
# TODO: Print for the one that does pass
self.set_note(
table, step_idx, "At least one validation model failed"
)
else:
flux_text = ""
for group, _validation_model in enumerate(self.validation_models):
flux_text += (
f"Group {group} flux: {_validation_model.JtargetSS:.2e}\n"
)
self.set_note(table, step_idx, flux_text)
# If live updating was disabled, write to the table once now. (Doesn't do anything if it was enabled)
live.refresh()
[docs] def do_block_validation(
self,
cross_validation_groups,
cross_validation_blocks,
use_ray=True,
progress_bar=None,
):
"""
One way to estimate the uncertainty of your model is to split your data into blocks, compute models over
groups of the blocks, and assess consistency between the groups.
The procedure here chops your data up into uniform blocks, by iteration. For example, with 100 iterations and 4
blocks, the blocks consist of iterations `[0-25), [25-50), [50-75), [75-100)`.
If the above blocks were assigned to 2 groups, the groups would consist of iterations `( [0-25), [50-75) )` and
`( [25-50), [75-100) )`
Parameters
----------
cross_validation_groups: int
Number of groups to assign blocks over
cross_validation_blocks: int
Number of blocks to split data into
"""
assert (
hasattr(self, "post_cluster_model") and self.post_cluster_model is not None
), (
"Perform clustering with cluster_coordinates() before attempting"
"block validation -- self.post_cluster_model is not set."
)
# TODO: Best way to move around post_cluster_model? Copying it will get massive.
# TODO: copying AGAIN seems suboptimal...
validation_models = [
deepcopy(self.post_cluster_model) for _ in range(cross_validation_groups)
]
# Get the number of iterations in each block
iters_per_block = self.post_cluster_model.maxIter // cross_validation_blocks
block_iterations = [
[start_iter, start_iter + iters_per_block]
for start_iter in range(1, self.post_cluster_model.maxIter, iters_per_block)
]
# Otherwise, this may be maxIters + 1
block_iterations[-1][-1] = block_iterations[-1][-1] - 1
# Get the iterations corresponding to each group
group_blocks = [
range(
start_idx,
cross_validation_blocks,
cross_validation_groups,
)
for start_idx in range(cross_validation_groups)
]
validation_iterations = []
with ProgressBar(progress_bar) as progress_bar:
task = progress_bar.add_task(description="Block validation", total=2)
for group in range(cross_validation_groups):
group_iterations = []
for block in group_blocks[group]:
group_iterations.extend(range(*block_iterations[block]))
validation_iterations.append(group_iterations)
# You're looking at this massive try block and judging me -- but don't worry.
# The purpose of this is just to catch ANY error, and preface it with an explicit heads-up that it's coming
# from the block validation. This is useful because errors may crop up only in the block-validation, and it
# should be clear at a glance that it's not from the main model building, but only when the data is split up.
try:
log.info(
f"Beginning analysis of cross-validation group {group + 1}/{cross_validation_groups}."
)
_model = validation_models[group]
# Get the flux matrix
_model.get_fluxMatrix(
0,
iters_to_use=validation_iterations[group],
use_ray=use_ray,
progress_bar=progress_bar,
)
# Clean it
_model.organize_fluxMatrix(
use_ray=use_ray, progress_bar=progress_bar
)
# Get tmatrix
_model.get_Tmatrix()
# Get steady-state
_model.get_steady_state()
# Get target flux
_model.get_steady_state_target_flux()
# Get FPT distribution?
pass
except Exception as e:
log.error("Error during block validation!")
log.exception(e)
# TODO: Would be nice to gracefully handle this and move on to the next validation group.
# However, validation models are used in a number of places, and leaving a model with uninitialized
# parameters will cause problems there.
# Maybe a solution is to only populate self.validation_models with successfully generated ones, though
# make sure having the length possibly change there is handled well.
raise modelWE.BlockValidationError(e)
progress_bar.advance(task, 1)
# Store the validation models, in case you want to analyze them.
self.validation_iterations = validation_iterations
self.validation_models = validation_models
def set_topology(self, topology):
"""
Updates internal state with a new topology.
Parameters
----------
topology : str, md.Trajectory, dict
Path to a file containing the PDB with the topology,
an mdtraj Trajectory object describing the new basis structure,
or a dictionary with keys
'basis_coords': the coordinates of the basis structure and
'nAtoms': the number of features in the coordinates
Returns
-------
None
"""
if type(topology) is str:
log.debug(
"Input reference topology was provided as a path, trying to load with mdtraj"
)
if topology[-3:] == "dat":
self.reference_coord = np.loadtxt(topology)
self.nAtoms = 1
self.coord_ndim = 3
return
elif topology[-6:] == "prmtop":
struct = md.load_prmtop(topology)
self.reference_structure = struct
self.nAtoms = struct.n_atoms
self.coord_ndim = 3
return
elif not topology[-3:] == "pdb":
log.critical(
"Topology is not a recognized type (PDB)! Proceeding, but no guarantees."
)
struct = md.load(topology)
self.reference_structure = struct
self.reference_coord = np.squeeze(struct._xyz)
self.nAtoms = struct.topology.n_atoms
self.coord_ndim = 3
return
elif type(topology) in [md.Trajectory, md.Topology]:
log.debug(
"Input reference topology was provided as an mdtraj structure, loading that"
)
struct = topology
self.reference_structure = struct
self.reference_coord = np.squeeze(struct._xyz)
self.nAtoms = struct.topology.n_atoms
self.coord_ndim = 3
elif type(topology) == dict:
self.reference_coord = topology["coords"]
self.nAtoms = topology["nAtoms"]
self.coord_ndim = topology["coord_ndim"]
else:
raise NotImplementedError("Unsupported topology")
def set_basis(self, basis):
"""
Updates internal state with a new basis.
Parameters
----------
basis : str, mdtraj.Trajectory, or dict
Path to a file containing the PDB with the new basis state,
an mdtraj Trajectory object describing the new basis structure,
or a dictionary with key 'basis_coords': the coordinates of the basis structure
Returns
-------
None
"""
if type(basis) is str:
log.debug(
"Input basis state topology was provided as a path, trying to load with mdtraj"
)
if basis[-3:] == "dat":
self.basis_coords = np.loadtxt(basis)
elif basis[-3:] == "pdb":
struct = md.load(basis)
# self.basis_structure = struct
self.basis_coords = np.squeeze(struct._xyz)
else:
log.critical(
"Basis is not a recognized type! Proceeding, but no guarantees."
)
# raise NotImplementedError("Basis coordinates are not a recognized filetype")
elif type(basis) in [md.Trajectory, md.Topology]:
log.debug(
"Input reference topology was provided as an mdtraj structure, loading that"
)
struct = basis
# self.basis_structure = struct
self.basis_coords = np.squeeze(struct._xyz)
elif type(basis) == dict:
self.basis_coords = basis["coords"]
else:
raise NotImplementedError("Unsupported topology")