#!/usr/bin/env python
"""
COPYRIGHT AND LICENSE
@author John D. Chodera <jchodera@gmail.com>
All code in this repository is released under the MIT License.
This program is free software: you can redistribute it and/or modify it under
the terms of the MIT License.
This program is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the MIT License for more details.
You should have received a copy of the MIT License along with this program.
"""
# =============================================================================
# MODULE DOCSTRING
# =============================================================================
"""Markov chain Monte Carlo simulation framework.
This module provides a framework for equilibrium sampling from a given
thermodynamic state of a biomolecule using a Markov chain Monte Carlo scheme.
It currently offer supports for
* Langevin dynamics (assumed to be free of integration error; use at your own risk]),
* hybrid Monte Carlo,
* generalized hybrid Monte Carlo, and
* Monte Carlo barostat moves,
which can be combined through the SequenceMove and WeightedMove classes.
By default, the MCMCMoves use the fastest OpenMM platform available and a
shared global ContextCache that minimizes the number of OpenMM. The examples
below show how to configure these aspects.
References
----------
Jun S. Liu. Monte Carlo Strategies in Scientific Computing. Springer, 2008.
Examples
--------
>>> from simtk import unit
>>> from openmmtools import testsystems, cache
>>> from openmmtools.states import ThermodynamicState, SamplerState
Create the initial state (thermodynamic and microscopic) for an alanine
dipeptide system in vacuum.
>>> test = testsystems.AlanineDipeptideVacuum()
>>> thermodynamic_state = ThermodynamicState(system=test.system,
... temperature=298*unit.kelvin)
>>> sampler_state = SamplerState(positions=test.positions)
Create an MCMC move to perform at every iteration of the simulation, and
initialize a sampler instance.
>>> ghmc_move = GHMCMove(timestep=1.0*unit.femtosecond, n_steps=50)
>>> langevin_move = LangevinDynamicsMove(n_steps=10)
>>> sampler = MCMCSampler(thermodynamic_state, sampler_state, move=ghmc_move)
You can combine them to form a sequence of moves
>>> sequence_move = SequenceMove([ghmc_move, langevin_move])
>>> sampler = MCMCSampler(thermodynamic_state, sampler_state, move=sequence_move)
or create a move that selects one of them at random with given probability
at each iteration.
>>> weighted_move = WeightedMove([(ghmc_move, 0.5), (langevin_move, 0.5)])
>>> sampler = MCMCSampler(thermodynamic_state, sampler_state, move=weighted_move)
By default the MCMCMove use a global ContextCache that creates Context on the
fastest available OpenMM platform. You can configure the default platform to use
before starting the simulation
>>> reference_platform = openmm.Platform.getPlatformByName('Reference')
>>> cache.global_context_cache.platform = reference_platform
>>> cache.global_context_cache.time_to_live = 10 # number of read/write operations
Minimize and run the simulation for few iterations.
>>> sampler.minimize()
>>> sampler.run(n_iterations=2)
If you don't want to use a global cache, you can create local ones.
>>> local_cache1 = cache.ContextCache(capacity=5, time_to_live=50)
>>> local_cache2 = cache.ContextCache(platform=reference_platform, capacity=1)
>>> sequence_move = SequenceMove([HMCMove(), LangevinDynamicsMove()],
... context_cache=local_cache1)
>>> ghmc_move = GHMCMove(context_cache=local_cache2)
If you don't want to cache Context at all but create one every time, you can use
the DummyCache.
>>> dummy_cache = cache.DummyContextCache(platform=reference_platform)
>>> ghmc_move = GHMCMove(context_cache=dummy_cache)
"""
# =============================================================================
# GLOBAL IMPORTS
# =============================================================================
import os
import abc
import copy
import logging
import numpy as np
from simtk import openmm, unit
from openmmtools import integrators, cache, utils
from openmmtools.utils import SubhookedABCMeta, Timer
logger = logging.getLogger(__name__)
# =============================================================================
# MODULE CONSTANTS
# =============================================================================
_RANDOM_SEED_MAX = np.iinfo(np.int32).max # maximum random number seed value
# =============================================================================
# MARKOV CHAIN MOVE ABSTRACTION
# =============================================================================
[docs]class MCMCMove(SubhookedABCMeta):
"""Markov chain Monte Carlo (MCMC) move abstraction.
To create a new MCMCMove class compatible with this framework, you
will have to implement this abstraction. The instance can keep internal
statistics such as number of attempted moves and acceptance rates.
"""
[docs] @abc.abstractmethod
def apply(self, thermodynamic_state, sampler_state):
"""Apply the MCMC move.
Depending on the implementation, this can alter the thermodynamic
state and/or the sampler state.
Parameters
----------
thermodynamic_state : openmmtools.states.ThermodynamicState
The initial thermodynamic state before applying the move. This
may be modified depending on the implementation.
sampler_state : openmmtools.states.SamplerState
The initial sampler state before applying the move. This may
be modified depending on the implementation.
"""
pass
# =============================================================================
# MARKOV CHAIN MONTE CARLO SAMPLER
# =============================================================================
[docs]class MCMCSampler(object):
"""Basic Markov chain Monte Carlo sampler.
Parameters
----------
thermodynamic_state : openmmtools.states.ThermodynamicState
Initial thermodynamic state.
sampler_state : openmmtools.states.SamplerState
Initial sampler state.
move_set : container of MarkovChainMonteCarloMove objects
Moves to attempt during MCMC run. If list or tuple, will run all moves each
iteration in specified sequence (e.g. [move1, move2, move3]). If dict, will
use specified unnormalized weights (e.g. { move1 : 0.3, move2 : 0.5, move3, 0.9 })
Attributes
----------
thermodynamic_state : openmmtools.states.ThermodynamicState
Current thermodynamic state.
sampler_state : openmmtools.states.SamplerState
Current sampler state.
move_set : container of MarkovChainMonteCarloMove objects
Moves to attempt during MCMC run. If list or tuple, will run all moves each
iteration in specified sequence (e.g. [move1, move2, move3]). If dict, will
use specified unnormalized weights (e.g. { move1 : 0.3, move2 : 0.5, move3, 0.9 })
Examples
--------
>>> import numpy as np
>>> from simtk import unit
>>> from openmmtools import testsystems
>>> from openmmtools.states import ThermodynamicState, SamplerState
Create and run an alanine dipeptide simulation with a weighted move.
>>> test = testsystems.AlanineDipeptideVacuum()
>>> thermodynamic_state = ThermodynamicState(system=test.system,
... temperature=298*unit.kelvin)
>>> sampler_state = SamplerState(positions=test.positions)
>>> # Create a move set specifying probabilities fo each type of move.
>>> move = WeightedMove([(HMCMove(n_steps=10), 0.5), (LangevinDynamicsMove(n_steps=10), 0.5)])
>>> # Create an MCMC sampler instance and run 10 iterations of the simulation.
>>> sampler = MCMCSampler(thermodynamic_state, sampler_state, move=move)
>>> sampler.run(n_iterations=2)
>>> np.allclose(sampler.sampler_state.positions, test.positions)
False
NPT ensemble simulation of a Lennard Jones fluid with a sequence of moves.
>>> test = testsystems.LennardJonesFluid(nparticles=200)
>>> thermodynamic_state = ThermodynamicState(system=test.system, temperature=298*unit.kelvin,
... pressure=1*unit.atmospheres)
>>> sampler_state = SamplerState(positions=test.positions)
>>> # Create a move set that includes a Monte Carlo barostat move.
>>> move = SequenceMove([GHMCMove(n_steps=50), MonteCarloBarostatMove(n_attempts=5)])
>>> # Create an MCMC sampler instance and run 5 iterations of the simulation.
>>> sampler = MCMCSampler(thermodynamic_state, sampler_state, move=move)
>>> sampler.run(n_iterations=2)
>>> np.allclose(sampler.sampler_state.positions, test.positions)
False
"""
[docs] def __init__(self, thermodynamic_state, sampler_state, move):
# Make a deep copy of the state so that initial state is unchanged.
self.thermodynamic_state = copy.deepcopy(thermodynamic_state)
self.sampler_state = copy.deepcopy(sampler_state)
self.move = move
[docs] def run(self, n_iterations=1):
"""
Run the sampler for a specified number of iterations.
Parameters
----------
n_iterations : int
Number of iterations of the sampler to run.
"""
# Apply move for n_iterations.
for iteration in range(n_iterations):
self.move.apply(self.thermodynamic_state, self.sampler_state)
[docs] def minimize(self, tolerance=1.0*unit.kilocalories_per_mole/unit.angstroms,
max_iterations=100, context_cache=None):
"""Minimize the current configuration.
Parameters
----------
tolerance : simtk.unit.Quantity, optional
Tolerance to use for minimization termination criterion (units of
energy/(mole*distance), default is 1*kilocalories_per_mole/angstroms).
max_iterations : int, optional
Maximum number of iterations to use for minimization. If 0, the minimization
will continue until convergence (default is 100).
context_cache : openmmtools.cache.ContextCache, optional
The ContextCache to use for Context creation. If None, the global cache
openmmtools.cache.global_context_cache is used (default is None).
"""
if context_cache is None:
context_cache = cache.global_context_cache
timer = Timer()
# Use LocalEnergyMinimizer
timer.start("Context request")
integrator = openmm.VerletIntegrator(1.0*unit.femtosecond)
context, integrator = context_cache.get_context(self.thermodynamic_state, integrator)
self.sampler_state.apply_to_context(context)
logger.debug("LocalEnergyMinimizer: platform is %s" % context.getPlatform().getName())
logger.debug("Minimizing with tolerance %s and %d max. iterations." % (tolerance, max_iterations))
timer.stop("Context request")
timer.start("LocalEnergyMinimizer minimize")
openmm.LocalEnergyMinimizer.minimize(context, tolerance, max_iterations)
timer.stop("LocalEnergyMinimizer minimize")
# Retrieve data.
self.sampler_state.update_from_context(context)
#timer.report_timing()
# =============================================================================
# MCMC MOVE CONTAINERS
# =============================================================================
[docs]class SequenceMove(MCMCMove):
"""A sequence of MCMC moves.
Parameters
----------
move_list : list-like of MCMCMove
The sequence of MCMC moves to apply.
context_cache : openmmtools.cache.ContextCache, optional
If not None, the context_cache of all the moves in the sequence
will be set to this (default is None).
Attributes
----------
move_list : list of MCMCMove
The sequence of MCMC moves to apply.
Examples
--------
NPT ensemble simulation of a Lennard Jones fluid with a sequence of moves.
>>> import numpy as np
>>> from simtk import unit
>>> from openmmtools import testsystems
>>> from openmmtools.states import ThermodynamicState, SamplerState
>>> test = testsystems.LennardJonesFluid(nparticles=200)
>>> thermodynamic_state = ThermodynamicState(system=test.system, temperature=298*unit.kelvin,
... pressure=1*unit.atmospheres)
>>> sampler_state = SamplerState(positions=test.positions)
>>> # Create a move set that includes a Monte Carlo barostat move.
>>> move = SequenceMove([GHMCMove(n_steps=50), MonteCarloBarostatMove(n_attempts=5)])
>>> # Create an MCMC sampler instance and run 5 iterations of the simulation.
>>> sampler = MCMCSampler(thermodynamic_state, sampler_state, move=move)
>>> sampler.run(n_iterations=2)
>>> np.allclose(sampler.sampler_state.positions, test.positions)
False
"""
[docs] def __init__(self, move_list, context_cache=None):
self.move_list = list(move_list)
if context_cache is not None:
for move in self.move_list:
move.context_cache = context_cache
@property
def statistics(self):
"""The statistics of all moves as a list of dictionaries."""
stats = [None for _ in range(len(self.move_list))]
for i, move in enumerate(self.move_list):
try:
stats[i] = move.statistics
except AttributeError:
stats[i] = {}
return stats
@statistics.setter
def statistics(self, value):
for i, move in enumerate(self.move_list):
if hasattr(move, 'statistics'):
move.statistics = value[i]
[docs] def apply(self, thermodynamic_state, sampler_state):
"""Apply the sequence of MCMC move in order.
Parameters
----------
thermodynamic_state : openmmtools.states.ThermodynamicState
The thermodynamic state to use to propagate dynamics.
sampler_state : openmmtools.states.SamplerState
The sampler state to apply the move to.
"""
for move in self.move_list:
move.apply(thermodynamic_state, sampler_state)
def __str__(self):
return str(self.move_list)
def __iter__(self):
return iter(self.move_list)
def __getstate__(self):
serialized_moves = [utils.serialize(move) for move in self.move_list]
return dict(move_list=serialized_moves)
def __setstate__(self, serialization):
serialized_moves = serialization['move_list']
self.move_list = [utils.deserialize(move) for move in serialized_moves]
[docs]class WeightedMove(MCMCMove):
"""Pick an MCMC move out of set with given probability at each iteration.
Parameters
----------
move_set : list of tuples (MCMCMove, float_
Each tuple associate an MCMCMoves to its probability of being
selected on apply().
context_cache : openmmtools.cache.ContextCache, optional
If not None, the context_cache of all the moves in the set will be
set to this (default is None).
Attributes
----------
move_set
Examples
--------
Create and run an alanine dipeptide simulation with a weighted move.
>>> import numpy as np
>>> from simtk import unit
>>> from openmmtools import testsystems
>>> from openmmtools.states import ThermodynamicState, SamplerState
>>> test = testsystems.AlanineDipeptideVacuum()
>>> thermodynamic_state = ThermodynamicState(system=test.system,
... temperature=298*unit.kelvin)
>>> sampler_state = SamplerState(positions=test.positions)
>>> # Create a move set specifying probabilities fo each type of move.
>>> move = WeightedMove([(HMCMove(n_steps=10), 0.5),
... (LangevinDynamicsMove(n_steps=10), 0.5)])
>>> # Create an MCMC sampler instance and run 10 iterations of the simulation.
>>> sampler = MCMCSampler(thermodynamic_state, sampler_state, move=move)
>>> sampler.run(n_iterations=2)
>>> np.allclose(sampler.sampler_state.positions, test.positions)
False
"""
[docs] def __init__(self, move_set, context_cache=None):
self.move_set = move_set
if context_cache is not None:
for move, weight in self.move_set:
move.context_cache = context_cache
@property
def statistics(self):
"""The statistics of all moves as a list of dictionaries."""
stats = [None for _ in range(len(self.move_set))]
for i, (move, weight) in enumerate(self.move_set):
try:
stats[i] = move.statistics
except AttributeError:
stats[i] = {}
return stats
@statistics.setter
def statistics(self, value):
for i, (move, weight) in enumerate(self.move_set):
if hasattr(move, 'statistics'):
move.statistics = value[i]
[docs] def apply(self, thermodynamic_state, sampler_state):
"""Apply one of the MCMC moves in the set to the state.
The probability that a move is picked is given by its weight.
Parameters
----------
thermodynamic_state : openmmtools.states.ThermodynamicState
The thermodynamic state to use to propagate dynamics.
sampler_state : openmmtools.states.SamplerState
The sampler state to apply the move to.
"""
moves, weights = zip(*self.move_set)
move = np.random.choice(moves, p=weights)
move.apply(thermodynamic_state, sampler_state)
def __getstate__(self):
serialized_moves = [utils.serialize(move) for move, _ in self.move_set]
weights = [weight for _, weight in self.move_set]
return dict(moves=serialized_moves, weights=weights)
def __setstate__(self, serialization):
serialized_moves = serialization['moves']
weights = serialization['weights']
self.move_set = [(utils.deserialize(move), weight)
for move, weight in zip(serialized_moves, weights)]
def __str__(self):
return str(self.move_set)
def __iter__(self):
return self.move_set
# =============================================================================
# INTEGRATOR MCMC MOVE BASE CLASS
# =============================================================================
class IntegratorMoveError(Exception):
"""An error raised when NaN is found after applying a move.
Parameters
----------
message : str
A description of the error.
move : MCMCMove
The MCMCMove that raised the error.
context : simtk.openmm.Context, optional
The context after the integration.
"""
def __init__(self, message, move, context=None):
super(IntegratorMoveError, self).__init__(message)
self.move = move
self.context = context
def serialize_error(self, path_files_prefix):
"""Serializes and save the state of the simulation causing the error.
This creates several files:
- path_files_prefix-move.yaml
A YAML serialization of the MCMCMove.
- path_files_prefix-system.xml
The serialized system in the Context.
- path_files_prefix-integrator.xml
The serialized integrator in the Context.
- path_files_prefix-state.xml
The serialized OpenMM State object before the integration.
Parameters
----------
path_files_prefix : str
The prefix (including eventually a directory) for the files. Existing
files will be overwritten.
"""
directory_path = os.path.dirname(path_files_prefix)
if not os.path.exists(directory_path):
os.makedirs(directory_path)
# Serialize MCMCMove.
import json
# Create class to encode quantities
class quantity_encoder(json.JSONEncoder):
def default(self, o):
if type(o) in [unit.quantity.Quantity, unit.unit.Unit]:
return str(o)
super(quantity_encoder, self).default(o)
serialized_move = utils.serialize(self.move)
with open(os.path.join(path_files_prefix + '-move.json'), 'w') as f:
json.dump(serialized_move, f, cls=quantity_encoder)
# Serialize Context.
openmm_state = self.context.getState(getPositions=True, getVelocities=True,
getEnergy=True, getForces=True, getParameters=True)
to_serialize = [self.context.getSystem(), self.context.getIntegrator(), openmm_state]
for name, openmm_object in zip(['system', 'integrator', 'state'], to_serialize):
serialized_object = openmm.XmlSerializer.serialize(openmm_object)
with open(os.path.join(path_files_prefix + '-' + name + '.xml'), 'w') as f:
f.write(serialized_object)
[docs]class BaseIntegratorMove(MCMCMove):
"""A general MCMC move that applies an integrator.
This class is intended to be inherited by MCMCMoves that need to integrate
the system. The child class has to implement the _get_integrator method.
You can decide to override _before_integration() and _after_integration()
to execute some code at specific points of the workflow, for example to
read data from the Context before the it is destroyed.
Parameters
----------
n_steps : int
The number of integration steps to take each time the move is applied.
context_cache : openmmtools.cache.ContextCache, optional
The ContextCache to use for Context creation. If None, the global cache
openmmtools.cache.global_context_cache is used (default is None).
reassign_velocities : bool, optional
If True, the velocities will be reassigned from the Maxwell-Boltzmann
distribution at the beginning of the move (default is False).
restart_attempts : int, optional
When greater than 0, if after the integration there are NaNs in energies,
the move will restart. When the integrator has a random component, this
may help recovering. On the last attempt, the ``Context`` is
re-initialized in a slower process, but better than the simulation
crashing. An IntegratorMoveError is raised after the given number of
attempts if there are still NaNs.
Attributes
----------
n_steps : int
context_cache : openmmtools.cache.ContextCache
reassign_velocities : bool
restart_attempts : int or None
Examples
--------
Create a VerletIntegratorMove class.
>>> from openmmtools import testsystems, states
>>> from simtk.openmm import VerletIntegrator
>>> class VerletMove(BaseIntegratorMove):
... def __init__(self, timestep, n_steps, **kwargs):
... super(VerletMove, self).__init__(n_steps, **kwargs)
... self.timestep = timestep
... def _get_integrator(self, thermodynamic_state):
... return VerletIntegrator(self.timestep)
... def _before_integration(self, context, thermodynamic_state):
... print('Setting velocities')
... context.setVelocitiesToTemperature(thermodynamic_state.temperature)
... def _after_integration(self, context, thermodynamic_state):
... print('Reading statistics')
...
>>> alanine = testsystems.AlanineDipeptideVacuum()
>>> sampler_state = states.SamplerState(alanine.positions)
>>> thermodynamic_state = states.ThermodynamicState(alanine.system, 300*unit.kelvin)
>>> move = VerletMove(timestep=1.0*unit.femtosecond, n_steps=2)
>>> move.apply(thermodynamic_state, sampler_state)
Setting velocities
Reading statistics
"""
[docs] def __init__(self, n_steps, context_cache=None,
reassign_velocities=False, n_restart_attempts=4):
self.n_steps = n_steps
self.context_cache = context_cache
self.reassign_velocities = reassign_velocities
self.n_restart_attempts = n_restart_attempts
[docs] def apply(self, thermodynamic_state, sampler_state):
"""Propagate the state through the integrator.
This updates the SamplerState after the integration. It also logs
benchmarking information through the utils.Timer class.
Parameters
----------
thermodynamic_state : openmmtools.states.ThermodynamicState
The thermodynamic state to use to propagate dynamics.
sampler_state : openmmtools.states.SamplerState
The sampler state to apply the move to. This is modified.
See Also
--------
openmmtools.utils.Timer
"""
move_name = self.__class__.__name__ # shortcut
timer = Timer()
# Check if we have to use the global cache.
if self.context_cache is None:
context_cache = cache.global_context_cache
else:
context_cache = self.context_cache
# Create integrator.
integrator = self._get_integrator(thermodynamic_state)
# Create context.
timer.start("{}: Context request".format(move_name))
context, integrator = context_cache.get_context(thermodynamic_state, integrator)
timer.stop("{}: Context request".format(move_name))
#logger.debug("{}: Context obtained, platform is {}".format(
# move_name, context.getPlatform().getName()))
# Perform the integration.
for attempt_counter in range(self.n_restart_attempts + 1):
# If we reassign velocities, we can ignore the ones in sampler_state.
sampler_state.apply_to_context(context, ignore_velocities=self.reassign_velocities)
if self.reassign_velocities:
context.setVelocitiesToTemperature(thermodynamic_state.temperature)
# Subclasses may implement _before_integration().
self._before_integration(context, thermodynamic_state)
try:
# Run dynamics.
timer.start("{}: step({})".format(move_name, self.n_steps))
integrator.step(self.n_steps)
except Exception:
# Catches particle positions becoming nan during integration.
restart = True
else:
timer.stop("{}: step({})".format(move_name, self.n_steps))
# We get also velocities here even if we don't need them because we
# will recycle this State to update the sampler state object. This
# way we won't need a second call to Context.getState().
context_state = context.getState(getPositions=True, getVelocities=True, getEnergy=True,
enforcePeriodicBox=thermodynamic_state.is_periodic)
# Check for NaNs in energies.
potential_energy = context_state.getPotentialEnergy()
restart = np.isnan(potential_energy.value_in_unit(potential_energy.unit))
# Restart the move if we found NaNs.
if restart:
err_msg = ('Potential energy is NaN after {} attempts of integration '
'with move {}'.format(attempt_counter, self.__class__.__name__))
# If we are on our last chance before crash, try to re-initialize context
if attempt_counter == self.n_restart_attempts - 1:
logger.error(err_msg + ' Trying to reinitialize Context as a last-resort restart attempt...')
context.reinitialize()
sampler_state.apply_to_context(context)
thermodynamic_state.apply_to_context(context)
# If we have hit the number of restart attempts, raise an exception.
elif attempt_counter == self.n_restart_attempts:
# Restore the context to the state right before the integration.
sampler_state.apply_to_context(context)
logger.error(err_msg)
raise IntegratorMoveError(err_msg, self, context)
else:
logger.warning(err_msg + ' Attempting a restart...')
else:
break
# Subclasses can read here info from the context to update internal statistics.
self._after_integration(context, thermodynamic_state)
# Updated sampler state.
timer.start("{}: update sampler state".format(move_name))
# This is an optimization around the fact that Collective Variables are not a part of the State,
# but are a part of the Context. We do this call twice to minimize duplicating information fetched from
# the State.
# Update everything but the collective variables from the State object
sampler_state.update_from_context(context_state, ignore_collective_variables=True)
# Update only the collective variables from the Context
sampler_state.update_from_context(context, ignore_positions=True, ignore_velocities=True,
ignore_collective_variables=False)
timer.stop("{}: update sampler state".format(move_name))
#timer.report_timing()
@abc.abstractmethod
def _get_integrator(self, thermodynamic_state):
"""Create a new instance of the integrator to apply."""
pass
def _before_integration(self, context, thermodynamic_state):
"""Execute code after Context creation and before integration."""
pass
def _after_integration(self, context, thermodynamic_state):
"""Execute code after integration.
After this point there are no guarantees that the Context will still
exist, together with its bound integrator and system.
"""
pass
def __getstate__(self):
if self.context_cache is None:
context_cache_serialized = None
else:
context_cache_serialized = utils.serialize(self.context_cache)
return dict(n_steps=self.n_steps, context_cache=context_cache_serialized,
reassign_velocities=self.reassign_velocities,
n_restart_attempts=self.n_restart_attempts)
def __setstate__(self, serialization):
self.n_steps = serialization['n_steps']
self.reassign_velocities = serialization['reassign_velocities']
self.n_restart_attempts = serialization['n_restart_attempts']
if serialization['context_cache'] is None:
self.context_cache = None
else:
self.context_cache = utils.deserialize(serialization['context_cache'])
# =============================================================================
# METROPOLIZED MOVE BASE CLASS
# =============================================================================
[docs]class MetropolizedMove(MCMCMove):
"""A base class for metropolized moves.
This class is intended to be inherited by MCMCMoves that needs to
accept or reject a proposed move with a Metropolis criterion. Only
the proposal needs to be specified by subclasses through the method
_propose_positions().
Parameters
----------
atom_subset : slice or list of int, optional
If specified, the move is applied only to those atoms specified by these
indices. If None, the move is applied to all atoms (default is None).
context_cache : openmmtools.cache.ContextCache, optional
The ContextCache to use for Context creation. If None, the global cache
openmmtools.cache.global_context_cache is used (default is None).
Attributes
----------
n_accepted : int
The number of proposals accepted.
n_proposed : int
The total number of attempted moves.
atom_subset
context_cache
Examples
--------
>>> from simtk import unit
>>> from openmmtools import testsystems, states
>>> class AddOneVector(MetropolizedMove):
... def __init__(self, **kwargs):
... super(AddOneVector, self).__init__(**kwargs)
... def _propose_positions(self, initial_positions):
... print('Propose new positions')
... displacement = unit.Quantity(np.array([1.0, 1.0, 1.0]), initial_positions.unit)
... return initial_positions + displacement
...
>>> alanine = testsystems.AlanineDipeptideVacuum()
>>> sampler_state = states.SamplerState(alanine.positions)
>>> thermodynamic_state = states.ThermodynamicState(alanine.system, 300*unit.kelvin)
>>> move = AddOneVector(atom_subset=list(range(sampler_state.n_particles)))
>>> move.apply(thermodynamic_state, sampler_state)
Propose new positions
>>> move.n_accepted
1
>>> move.n_proposed
1
"""
[docs] def __init__(self, atom_subset=None, context_cache=None):
self.n_accepted = 0
self.n_proposed = 0
self.atom_subset = atom_subset
self.context_cache = context_cache
@property
def statistics(self):
"""The acceptance statistics as a dictionary."""
return dict(n_accepted=self.n_accepted, n_proposed=self.n_proposed)
@statistics.setter
def statistics(self, value):
self.n_accepted = value['n_accepted']
self.n_proposed = value['n_proposed']
[docs] def apply(self, thermodynamic_state, sampler_state):
"""Apply a metropolized move to the sampler state.
Total number of acceptances and proposed move are updated.
Parameters
----------
thermodynamic_state : openmmtools.states.ThermodynamicState
The thermodynamic state to use to apply the move.
sampler_state : openmmtools.states.SamplerState
The initial sampler state to apply the move to. This is modified.
"""
timer = Timer()
benchmark_id = 'Applying {}'.format(self.__class__.__name__ )
timer.start(benchmark_id)
# Check if we have to use the global cache.
if self.context_cache is None:
context_cache = cache.global_context_cache
else:
context_cache = self.context_cache
# Create context, any integrator works.
context, unused_integrator = context_cache.get_context(thermodynamic_state)
# Compute initial energy. We don't need to set velocities to compute the potential.
# TODO assume sampler_state.potential_energy is the correct potential if not None?
sampler_state.apply_to_context(context, ignore_velocities=True)
initial_energy = thermodynamic_state.reduced_potential(context)
# Handle default and weird cases for atom_subset.
if self.atom_subset is None:
atom_subset = slice(None)
elif not isinstance(self.atom_subset, slice) and len(self.atom_subset) == 1:
# Slice so that initial_positions (below) will have a 2D shape.
atom_subset = slice(self.atom_subset[0], self.atom_subset[0]+1)
else:
atom_subset = self.atom_subset
# Store initial positions of the atoms that are moved.
# We'll use this also to recover in case the move is rejected.
if isinstance(atom_subset, slice):
# Numpy array when sliced return a view, they are not copied.
initial_positions = copy.deepcopy(sampler_state.positions[atom_subset])
else:
# This automatically creates a copy.
initial_positions = sampler_state.positions[atom_subset]
# Propose perturbed positions. Modifying the reference changes the sampler state.
proposed_positions = self._propose_positions(initial_positions)
# Compute the energy of the proposed positions.
sampler_state.positions[atom_subset] = proposed_positions
sampler_state.apply_to_context(context, ignore_velocities=True)
proposed_energy = thermodynamic_state.reduced_potential(context)
# Accept or reject with Metropolis criteria.
delta_energy = proposed_energy - initial_energy
if (not np.isnan(proposed_energy) and
(delta_energy <= 0.0 or np.random.rand() < np.exp(-delta_energy))):
self.n_accepted += 1
else:
# Restore original positions.
sampler_state.positions[atom_subset] = initial_positions
self.n_proposed += 1
# Print timing information.
timer.stop(benchmark_id)
#timer.report_timing()
def __getstate__(self):
if self.context_cache is None:
context_cache_serialized = None
else:
context_cache_serialized = utils.serialize(self.context_cache)
serialization = dict(atom_subset=self.atom_subset, context_cache=context_cache_serialized)
serialization.update(self.statistics)
return serialization
def __setstate__(self, serialization):
self.atom_subset = serialization['atom_subset']
if serialization['context_cache'] is None:
self.context_cache = None
else:
self.context_cache = utils.deserialize(serialization['context_cache'])
self.statistics = serialization
@abc.abstractmethod
def _propose_positions(self, positions):
"""Return new proposed positions.
These method must be implemented in subclasses.
Parameters
----------
positions : nx3 numpy.ndarray
The original positions of the subset of atoms that these move
applied to.
Returns
-------
proposed_positions : nx3 numpy.ndarray
The new proposed positions.
"""
pass
# =============================================================================
# GENERIC INTEGRATOR MOVE
# =============================================================================
[docs]class IntegratorMove(BaseIntegratorMove):
"""An MCMCMove that propagate the system with an integrator.
This class makes it easy to convert OpenMM Integrator objects to
MCMCMove objects.
Parameters
----------
integrator : simtk.openmm.Integrator
An instance of an OpenMM Integrator object to use for propagation.
n_steps : int
The number of integration steps to take each time the move is applied.
context_cache : openmmtools.cache.ContextCache, optional
The ContextCache to use for Context creation. If None, the global cache
openmmtools.cache.global_context_cache is used (default is None).
Attributes
----------
integrator
n_steps
context_cache
"""
[docs] def __init__(self, integrator, n_steps, **kwargs):
super(IntegratorMove, self).__init__(n_steps=n_steps, **kwargs)
self.integrator = integrator
def _get_integrator(self, thermodynamic_state):
"""Implement BaseIntegratorMove._get_integrator abstract method."""
# We copy the integrator to make sure that the MCMCMove
# can be applied to multiple Contexts.
copied_integrator = copy.deepcopy(self.integrator)
# Restore eventual extra methods for custom forces.
integrators.ThermostatedIntegrator.restore_interface(copied_integrator)
return copied_integrator
def __getstate__(self):
serialization = super(IntegratorMove, self).__getstate__()
serialization['integrator'] = openmm.XmlSerializer.serialize(self.integrator)
return serialization
def __setstate__(self, serialization):
super(IntegratorMove, self).__setstate__(serialization)
self.integrator = openmm.XmlSerializer.deserialize(serialization['integrator'])
# =============================================================================
# LANGEVIN DYNAMICS MOVES
# =============================================================================
[docs]class LangevinDynamicsMove(BaseIntegratorMove):
"""Langevin dynamics segment as a (pseudo) Monte Carlo move.
This move assigns a velocity from the Maxwell-Boltzmann distribution
and executes a number of Maxwell-Boltzmann steps to propagate dynamics.
This is not a *true* Monte Carlo move, in that the generation of the
correct distribution is only exact in the limit of infinitely small
timestep; in other words, the discretization error is assumed to be
negligible. Use HybridMonteCarloMove instead to ensure the exact
distribution is generated.
.. warning::
No Metropolization is used to ensure the correct phase space
distribution is sampled. This means that timestep-dependent errors
will remain uncorrected, and are amplified with larger timesteps.
Use this move at your own risk!
Parameters
----------
timestep : simtk.unit.Quantity, optional
The timestep to use for Langevin integration
(time units, default is 1*simtk.unit.femtosecond).
collision_rate : simtk.unit.Quantity, optional
The collision rate with fictitious bath particles
(1/time units, default is 10/simtk.unit.picoseconds).
n_steps : int, optional
The number of integration timesteps to take each time the
move is applied (default is 1000).
reassign_velocities : bool, optional
If True, the velocities will be reassigned from the Maxwell-Boltzmann
distribution at the beginning of the move (default is False).
context_cache : openmmtools.cache.ContextCache, optional
The ContextCache to use for Context creation. If None, the global cache
openmmtools.cache.global_context_cache is used (default is None).
Attributes
----------
timestep : simtk.unit.Quantity
The timestep to use for Langevin integration (time units).
collision_rate : simtk.unit.Quantity
The collision rate with fictitious bath particles (1/time units).
n_steps : int
The number of integration timesteps to take each time the move
is applied.
reassign_velocities : bool
If True, the velocities will be reassigned from the Maxwell-Boltzmann
distribution at the beginning of the move.
context_cache : openmmtools.cache.ContextCache
The ContextCache to use for Context creation. If None, the global
cache openmmtools.cache.global_context_cache is used.
Examples
--------
First we need to create the thermodynamic state and the sampler
state to propagate. Here we create an alanine dipeptide system
in vacuum.
>>> from simtk import unit
>>> from openmmtools import testsystems
>>> from openmmtools.states import SamplerState, ThermodynamicState
>>> test = testsystems.AlanineDipeptideVacuum()
>>> sampler_state = SamplerState(positions=test.positions)
>>> thermodynamic_state = ThermodynamicState(system=test.system, temperature=298*unit.kelvin)
Create a Langevin move with default parameters
>>> move = LangevinDynamicsMove()
or create a Langevin move with specified parameters.
>>> move = LangevinDynamicsMove(timestep=0.5*unit.femtoseconds,
... collision_rate=20.0/unit.picoseconds, n_steps=10)
Perform one update of the sampler state. The sampler state is updated
with the new state.
>>> move.apply(thermodynamic_state, sampler_state)
>>> np.allclose(sampler_state.positions, test.positions)
False
The same move can be applied to a different state, here an ideal gas.
>>> test = testsystems.IdealGas()
>>> sampler_state = SamplerState(positions=test.positions)
>>> thermodynamic_state = ThermodynamicState(system=test.system,
... temperature=298*unit.kelvin)
>>> move.apply(thermodynamic_state, sampler_state)
>>> np.allclose(sampler_state.positions, test.positions)
False
"""
[docs] def __init__(self, timestep=1.0*unit.femtosecond, collision_rate=10.0/unit.picoseconds,
n_steps=1000, reassign_velocities=False, **kwargs):
super(LangevinDynamicsMove, self).__init__(n_steps=n_steps,
reassign_velocities=reassign_velocities,
**kwargs)
self.timestep = timestep
self.collision_rate = collision_rate
[docs] def apply(self, thermodynamic_state, sampler_state):
"""Apply the Langevin dynamics MCMC move.
This modifies the given sampler_state. The temperature of the
thermodynamic state is used in Langevin dynamics.
Parameters
----------
thermodynamic_state : openmmtools.states.ThermodynamicState
The thermodynamic state to use to propagate dynamics.
sampler_state : openmmtools.states.SamplerState
The sampler state to apply the move to. This is modified.
"""
# Explicitly implemented just to have more specific docstring.
super(LangevinDynamicsMove, self).apply(thermodynamic_state, sampler_state)
def __getstate__(self):
serialization = super(LangevinDynamicsMove, self).__getstate__()
serialization['timestep'] = self.timestep
serialization['collision_rate'] = self.collision_rate
return serialization
def __setstate__(self, serialization):
super(LangevinDynamicsMove, self).__setstate__(serialization)
self.timestep = serialization['timestep']
self.collision_rate = serialization['collision_rate']
def _get_integrator(self, thermodynamic_state):
"""Implement BaseIntegratorMove._get_integrator()."""
return openmm.LangevinIntegrator(thermodynamic_state.temperature,
self.collision_rate, self.timestep)
[docs]class LangevinSplittingDynamicsMove(LangevinDynamicsMove):
"""
Langevin dynamics segment with custom splitting of the operators and optional Metropolized Monte Carlo validation.
Besides all the normal properties of the :class:`LangevinDynamicsMove`, this class implements the custom splitting
sequence of the :class:`openmmtools.integrators.LangevinIntegrator`. Additionally, the steps can be wrapped around
a proper Generalized Hybrid Monte Carlo step to ensure that the exact distribution is generated.
Parameters
----------
timestep : simtk.unit.Quantity, optional
The timestep to use for Langevin integration
(time units, default is 1*simtk.unit.femtosecond).
collision_rate : simtk.unit.Quantity, optional
The collision rate with fictitious bath particles
(1/time units, default is 10/simtk.unit.picoseconds).
n_steps : int, optional
The number of integration timesteps to take each time the
move is applied (default is 1000).
reassign_velocities : bool, optional
If True, the velocities will be reassigned from the Maxwell-Boltzmann
distribution at the beginning of the move (default is False).
context_cache : openmmtools.cache.ContextCache, optional
The ContextCache to use for Context creation. If None, the global cache
openmmtools.cache.global_context_cache is used (default is None).
splitting : string, default: "V R O R V"
Sequence of "R", "V", "O" (and optionally "{", "}", "V0", "V1", ...) substeps to be executed each timestep.
Forces are only used in V-step. Handle multiple force groups by appending the force group index
to V-steps, e.g. "V0" will only use forces from force group 0. "V" will perform a step using all forces.
"{" will cause metropolization, and must be followed later by a "}".
constraint_tolerance : float, default: 1.0e-8
Tolerance for constraint solver
measure_shadow_work : boolean, default: False
Accumulate the shadow work performed by the symplectic substeps, in the global `shadow_work`
measure_heat : boolean, default: False
Accumulate the heat exchanged with the bath in each step, in the global `heat`
Attributes
----------
timestep : simtk.unit.Quantity
The timestep to use for Langevin integration (time units).
collision_rate : simtk.unit.Quantity
The collision rate with fictitious bath particles (1/time units).
n_steps : int
The number of integration timesteps to take each time the move
is applied.
reassign_velocities : bool
If True, the velocities will be reassigned from the Maxwell-Boltzmann
distribution at the beginning of the move.
context_cache : openmmtools.cache.ContextCache
The ContextCache to use for Context creation. If None, the global
cache openmmtools.cache.global_context_cache is used.
splitting : str
Splitting applied to this integrator represented as a string.
constraint_tolerance : float, default: 1.0e-8
Tolerance for constraint solver
measure_shadow_work : boolean, default: False
Accumulate the shadow work performed by the symplectic substeps, in the global `shadow_work`
measure_heat : boolean, default: False
Accumulate the heat exchanged with the bath in each step, in the global `heat`
Examples
--------
First we need to create the thermodynamic state and the sampler
state to propagate. Here we create an alanine dipeptide system
in vacuum.
>>> from simtk import unit
>>> from openmmtools import testsystems
>>> from openmmtools.states import SamplerState, ThermodynamicState
>>> test = testsystems.AlanineDipeptideVacuum()
>>> sampler_state = SamplerState(positions=test.positions)
>>> thermodynamic_state = ThermodynamicState(system=test.system, temperature=298*unit.kelvin)
Create a Langevin move with default parameters
>>> move = LangevinSplittingDynamicsMove()
or create a Langevin move with specified splitting.
>>> move = LangevinSplittingDynamicsMove(splitting="O { V R V } O")
Where this splitting is a 5 step symplectic integrator:
*. Ornstein-Uhlenbeck (O) interactions with the stochastic heat bath interactions
*. Hybrid Metropolized step around the half-step velocity updates (V) with full position updates (R).
Perform one update of the sampler state. The sampler state is updated
with the new state.
>>> move.apply(thermodynamic_state, sampler_state)
>>> np.allclose(sampler_state.positions, test.positions)
False
The same move can be applied to a different state, here an ideal gas.
>>> test = testsystems.IdealGas()
>>> sampler_state = SamplerState(positions=test.positions)
>>> thermodynamic_state = ThermodynamicState(system=test.system,
... temperature=298*unit.kelvin)
>>> move.apply(thermodynamic_state, sampler_state)
>>> np.allclose(sampler_state.positions, test.positions)
False
"""
[docs] def __init__(self, timestep=1.0 * unit.femtosecond, collision_rate=10.0 / unit.picoseconds,
n_steps=1000, reassign_velocities=False, splitting="V R O R V", constraint_tolerance=1.0e-8,
measure_shadow_work=False, measure_heat=False, **kwargs):
super(LangevinSplittingDynamicsMove, self).__init__(n_steps=n_steps,
reassign_velocities=reassign_velocities,
timestep=timestep,
collision_rate=collision_rate,
**kwargs)
self.splitting = splitting
self.constraint_tolerance = constraint_tolerance
self.measure_shadow_work = measure_shadow_work
self.measure_heat = measure_heat
def __getstate__(self):
serialization = super(LangevinSplittingDynamicsMove, self).__getstate__()
serialization['splitting'] = self.splitting
serialization['constraint_tolerance'] = self.constraint_tolerance
serialization['measure_shadow_work'] = self.measure_shadow_work
serialization['measure_heat'] = self.measure_heat
return serialization
def __setstate__(self, serialization):
super(LangevinSplittingDynamicsMove, self).__setstate__(serialization)
self.splitting = serialization['splitting']
self.constraint_tolerance = serialization['constraint_tolerance']
self.measure_shadow_work = serialization['measure_shadow_work']
self.measure_heat = serialization['measure_heat']
def _get_integrator(self, thermodynamic_state):
"""Implement BaseIntegratorMove._get_integrator()."""
return integrators.LangevinIntegrator(temperature=thermodynamic_state.temperature,
collision_rate=self.collision_rate,
timestep=self.timestep,
splitting=self.splitting,
constraint_tolerance=self.constraint_tolerance,
measure_shadow_work=self.measure_shadow_work,
measure_heat=self.measure_heat)
# =============================================================================
# GENERALIZED HYBRID MONTE CARLO MOVE
# =============================================================================
[docs]class GHMCMove(BaseIntegratorMove):
"""Generalized hybrid Monte Carlo (GHMC) Markov chain Monte Carlo move.
This move uses generalized Hybrid Monte Carlo (GHMC), a form of Metropolized
Langevin dynamics, to propagate the system.
Parameters
----------
timestep : simtk.unit.Quantity, optional
The timestep to use for Langevin integration (time units, default
is 1*simtk.unit.femtoseconds).
collision_rate : simtk.unit.Quantity, optional
The collision rate with fictitious bath particles (1/time units,
default is 20/simtk.unit.picoseconds).
n_steps : int, optional
The number of integration timesteps to take each time the move
is applied (default is 1000).
context_cache : openmmtools.cache.ContextCache, optional
The ContextCache to use for Context creation. If None, the global cache
openmmtools.cache.global_context_cache is used (default is None).
Attributes
----------
timestep : simtk.unit.Quantity
The timestep to use for Langevin integration (time units).
collision_rate : simtk.unit.Quantity
The collision rate with fictitious bath particles (1/time units).
n_steps : int
The number of integration timesteps to take each time the move
is applied.
context_cache : openmmtools.cache.ContextCache
The ContextCache to use for Context creation. If None, the global
cache openmmtools.cache.global_context_cache is used.
n_accepted : int
The number of accepted steps.
n_proposed : int
The number of attempted steps.
fraction_accepted
References
----------
Lelievre T, Stoltz G, Rousset M. Free energy computations: A mathematical
perspective. World Scientific, 2010.
Examples
--------
First we need to create the thermodynamic state and the sampler
state to propagate. Here we create an alanine dipeptide system
in vacuum.
>>> from simtk import unit
>>> from openmmtools import testsystems
>>> from openmmtools.states import ThermodynamicState, SamplerState
>>> test = testsystems.AlanineDipeptideVacuum()
>>> sampler_state = SamplerState(positions=test.positions)
>>> thermodynamic_state = ThermodynamicState(system=test.system, temperature=298*unit.kelvin)
Create a GHMC move with default parameters.
>>> move = GHMCMove()
or create a GHMC move with specified parameters.
>>> move = GHMCMove(timestep=0.5*unit.femtoseconds,
... collision_rate=20.0/unit.picoseconds, n_steps=10)
Perform one update of the sampler state. The sampler state is updated
with the new state.
>>> move.apply(thermodynamic_state, sampler_state)
>>> np.allclose(sampler_state.positions, test.positions)
False
The same move can be applied to a different state, here an ideal gas.
>>> test = testsystems.IdealGas()
>>> sampler_state = SamplerState(positions=test.positions)
>>> thermodynamic_state = ThermodynamicState(system=test.system,
... temperature=298*unit.kelvin)
>>> move.apply(thermodynamic_state, sampler_state)
>>> np.allclose(sampler_state.positions, test.positions)
False
"""
[docs] def __init__(self, timestep=1.0*unit.femtosecond, collision_rate=20.0/unit.picoseconds,
n_steps=1000, **kwargs):
super(GHMCMove, self).__init__(n_steps=n_steps, **kwargs)
self.timestep = timestep
self.collision_rate = collision_rate
self.n_accepted = 0 # Number of accepted steps.
self.n_proposed = 0 # Number of attempted steps.
@property
def fraction_accepted(self):
"""Ratio between accepted over attempted moves (read-only).
If the number of attempted steps is 0, this is numpy.NaN.
"""
if self.n_proposed == 0:
return np.NaN
# TODO drop the casting when stop Python2 support
return float(self.n_accepted) / self.n_proposed
@property
def statistics(self):
"""The acceptance statistics as a dictionary."""
return dict(n_accepted=self.n_accepted, n_proposed=self.n_proposed)
@statistics.setter
def statistics(self, value):
self.n_accepted = value['n_accepted']
self.n_proposed = value['n_proposed']
[docs] def reset_statistics(self):
"""Reset the internal statistics of number of accepted and attempted moves."""
self.n_accepted = 0
self.n_proposed = 0
[docs] def apply(self, thermodynamic_state, sampler_state):
"""Apply the GHMC MCMC move.
This modifies the given sampler_state. The temperature of the
thermodynamic state is used.
Parameters
----------
thermodynamic_state : openmmtools.states.ThermodynamicState
The thermodynamic state to use when applying the MCMC move.
sampler_state : openmmtools.states.SamplerState
The sampler state to apply the move to. This is modified.
"""
# Explicitly implemented just to have more specific docstring.
super(GHMCMove, self).apply(thermodynamic_state, sampler_state)
def __getstate__(self):
serialization = super(GHMCMove, self).__getstate__()
serialization['timestep'] = self.timestep
serialization['collision_rate'] = self.collision_rate
serialization.update(self.statistics)
return serialization
def __setstate__(self, serialization):
super(GHMCMove, self).__setstate__(serialization)
self.timestep = serialization['timestep']
self.collision_rate = serialization['collision_rate']
self.statistics = serialization
def _get_integrator(self, thermodynamic_state):
"""Implement BaseIntegratorMove._get_integrator()."""
# Store lastly generated integrator to collect statistics.
return integrators.GHMCIntegrator(temperature=thermodynamic_state.temperature,
collision_rate=self.collision_rate,
timestep=self.timestep)
def _after_integration(self, context, thermodynamic_state):
"""Implement BaseIntegratorMove._after_integration()."""
integrator = context.getIntegrator()
# Accumulate acceptance statistics.
ghmc_global_variables = {integrator.getGlobalVariableName(index): index
for index in range(integrator.getNumGlobalVariables())}
n_accepted = integrator.getGlobalVariable(ghmc_global_variables['naccept'])
n_proposed = integrator.getGlobalVariable(ghmc_global_variables['ntrials'])
self.n_accepted += n_accepted
self.n_proposed += n_proposed
# =============================================================================
# HYBRID MONTE CARLO MOVE
# =============================================================================
[docs]class HMCMove(BaseIntegratorMove):
"""Hybrid Monte Carlo dynamics.
This move assigns a velocity from the Maxwell-Boltzmann distribution
and executes a number of velocity Verlet steps to propagate dynamics.
Parameters
----------
timestep : simtk.unit.Quantity, optional
The timestep to use for HMC dynamics, which uses velocity Verlet following
velocity randomization (time units, default is 1*simtk.unit.femtosecond)
n_steps : int, optional
The number of dynamics steps to take before Metropolis acceptance/rejection
(default is 1000).
context_cache : openmmtools.cache.ContextCache, optional
The ContextCache to use for Context creation. If None, the global cache
openmmtools.cache.global_context_cache is used (default is None).
Attributes
----------
timestep : simtk.unit.Quantity
The timestep to use for HMC dynamics, which uses velocity Verlet following
velocity randomization (time units).
n_steps : int
The number of dynamics steps to take before Metropolis acceptance/rejection.
context_cache : openmmtools.cache.ContextCache
The ContextCache to use for Context creation. If None, the global cache
openmmtools.cache.global_context_cache is used.
Examples
--------
First we need to create the thermodynamic state and the sampler
state to propagate. Here we create an alanine dipeptide system
in vacuum.
>>> from simtk import unit
>>> from openmmtools import testsystems
>>> from openmmtools.states import ThermodynamicState, SamplerState
>>> test = testsystems.AlanineDipeptideVacuum()
>>> sampler_state = SamplerState(positions=test.positions)
>>> thermodynamic_state = ThermodynamicState(system=test.system, temperature=298*unit.kelvin)
Create a GHMC move with default parameters.
>>> move = HMCMove()
or create a GHMC move with specified parameters.
>>> move = HMCMove(timestep=0.5*unit.femtoseconds, n_steps=10)
Perform one update of the sampler state. The sampler state is updated
with the new state.
>>> move.apply(thermodynamic_state, sampler_state)
>>> np.allclose(sampler_state.positions, test.positions)
False
The same move can be applied to a different state, here an ideal gas.
>>> test = testsystems.IdealGas()
>>> sampler_state = SamplerState(positions=test.positions)
>>> thermodynamic_state = ThermodynamicState(system=test.system,
... temperature=298*unit.kelvin)
>>> move.apply(thermodynamic_state, sampler_state)
>>> np.allclose(sampler_state.positions, test.positions)
False
"""
[docs] def __init__(self, timestep=1.0*unit.femtosecond, n_steps=1000, **kwargs):
super(HMCMove, self).__init__(n_steps=n_steps, **kwargs)
self.timestep = timestep
[docs] def apply(self, thermodynamic_state, sampler_state):
"""Apply the MCMC move.
This modifies the given sampler_state.
Parameters
----------
thermodynamic_state : openmmtools.states.ThermodynamicState
The thermodynamic state to use when applying the MCMC move.
sampler_state : openmmtools.states.SamplerState
The sampler state to apply the move to. This is modified.
"""
# Explicitly implemented just to have more specific docstring.
super(HMCMove, self).apply(thermodynamic_state, sampler_state)
def __getstate__(self):
serialization = super(HMCMove, self).__getstate__()
serialization['timestep'] = self.timestep
return serialization
def __setstate__(self, serialization):
super(HMCMove, self).__setstate__(serialization)
self.timestep = serialization['timestep']
def _get_integrator(self, thermodynamic_state):
"""Implement BaseIntegratorMove._get_integrator()."""
return integrators.HMCIntegrator(temperature=thermodynamic_state.temperature,
timestep=self.timestep, nsteps=self.n_steps)
# =============================================================================
# MONTE CARLO BAROSTAT MOVE
# =============================================================================
[docs]class MonteCarloBarostatMove(BaseIntegratorMove):
"""Monte Carlo barostat move.
This move makes one or more attempts to update the box volume using
Monte Carlo updates.
Parameters
----------
n_attempts : int, optional
The number of Monte Carlo attempts to make to adjust the box
volume (default is 5).
context_cache : openmmtools.cache.ContextCache, optional
The ContextCache to use for Context creation. If None, the global
cache openmmtools.cache.global_context_cache is used (default is
None).
Attributes
----------
n_attempts
context_cache : openmmtools.cache.ContextCache
The ContextCache to use for Context creation. If None, the global
cache openmmtools.cache.global_context_cache is used.
Examples
--------
The thermodynamic state must be barostated by a MonteCarloBarostat
force. The class ThermodynamicState takes care of adding one when
we specify the pressure in its constructor.
>>> from simtk import unit
>>> from openmmtools import testsystems
>>> from openmmtools.states import ThermodynamicState, SamplerState
>>> test = testsystems.AlanineDipeptideExplicit()
>>> sampler_state = SamplerState(positions=test.positions)
>>> thermodynamic_state = ThermodynamicState(system=test.system, temperature=298*unit.kelvin,
... pressure=1.0*unit.atmosphere)
Create a MonteCarloBarostatMove move with default parameters.
>>> move = MonteCarloBarostatMove()
or create a GHMC move with specified parameters.
>>> move = MonteCarloBarostatMove(n_attempts=2)
Perform one update of the sampler state. The sampler state is updated
with the new state.
>>> move.apply(thermodynamic_state, sampler_state)
>>> np.allclose(sampler_state.positions, test.positions)
False
"""
[docs] def __init__(self, n_attempts=5, **kwargs):
super(MonteCarloBarostatMove, self).__init__(n_steps=n_attempts, **kwargs)
@property
def n_attempts(self):
"""The number of MC attempts to make to adjust the box volume."""
return self.n_steps # The number of steps of the dummy integrator.
@n_attempts.setter
def n_attempts(self, value):
self.n_steps = value
[docs] def apply(self, thermodynamic_state, sampler_state):
"""Apply the MCMC move.
The thermodynamic state must be barostated by a MonteCarloBarostat
force. This modifies the given sampler_state.
Parameters
----------
thermodynamic_state : openmmtools.states.ThermodynamicState
The thermodynamic state to use when applying the MCMC move.
sampler_state : openmmtools.states.SamplerState
The sampler state to apply the move to. This is modified.
"""
# Make sure system contains a MonteCarlo barostat.
barostat = thermodynamic_state.barostat
if barostat is None:
raise RuntimeError('Requested a MonteCarloBarostat move'
' on a system at constant pressure')
if not isinstance(barostat, openmm.MonteCarloBarostat):
raise RuntimeError('Requested a MonteCarloBarostat move on a system '
'barostated with a {}'.format(barostat.__class__.__name__))
# Set temporarily the frequency if needed.
old_barostat_frequency = barostat.getFrequency()
if old_barostat_frequency != 1:
barostat.setFrequency(1)
thermodynamic_state.barostat = barostat
super(MonteCarloBarostatMove, self).apply(thermodynamic_state, sampler_state)
# Restore frequency of barostat.
if old_barostat_frequency != 1:
barostat.setFrequency(old_barostat_frequency)
thermodynamic_state.barostat = barostat
def _get_integrator(self, thermodynamic_state):
"""Implement BaseIntegratorMove._get_integrator()."""
return integrators.DummyIntegrator()
# =============================================================================
# RANDOM DISPLACEMENT MOVE
# =============================================================================
[docs]class MCDisplacementMove(MetropolizedMove):
"""A metropolized move that randomly displace a subset of atoms.
Parameters
----------
displacement_sigma : simtk.unit.Quantity
The standard deviation of the normal distribution used to propose the
random displacement (units of length, default is 1.0*nanometer).
atom_subset : slice or list of int, optional
If specified, the move is applied only to those atoms specified by these
indices. If None, the move is applied to all atoms (default is None).
context_cache : openmmtools.cache.ContextCache, optional
The ContextCache to use for Context creation. If None, the global cache
openmmtools.cache.global_context_cache is used (default is None).
Attributes
----------
n_accepted : int
The number of proposals accepted.
n_proposed : int
The total number of attempted moves.
displacement_sigma
atom_subset
context_cache
See Also
--------
MetropolizedMove
"""
[docs] def __init__(self, displacement_sigma=1.0*unit.nanometer, **kwargs):
super(MCDisplacementMove, self).__init__(**kwargs)
self.displacement_sigma = displacement_sigma
[docs] @staticmethod
def displace_positions(positions, displacement_sigma=1.0*unit.nanometer):
"""Return the positions after applying a random displacement to them.
Parameters
----------
positions : nx3 numpy.ndarray simtk.unit.Quantity
The positions to displace.
displacement_sigma : simtk.unit.Quantity
The standard deviation of the normal distribution used to propose
the random displacement (units of length, default is 1.0*nanometer).
Returns
-------
rotated_positions : nx3 numpy.ndarray simtk.unit.Quantity
The displaced positions.
"""
positions_unit = positions.unit
unitless_displacement_sigma = displacement_sigma / positions_unit
displacement_vector = unit.Quantity(np.random.randn(3) * unitless_displacement_sigma,
positions_unit)
return positions + displacement_vector
def __getstate__(self):
serialization = super(MCDisplacementMove, self).__getstate__()
serialization['displacement_sigma'] = self.displacement_sigma
return serialization
def __setstate__(self, serialization):
super(MCDisplacementMove, self).__setstate__(serialization)
self.displacement_sigma = serialization['displacement_sigma']
def _propose_positions(self, initial_positions):
"""Implement MetropolizedMove._propose_positions for apply()."""
return self.displace_positions(initial_positions, self.displacement_sigma)
# =============================================================================
# RANDOM ROTATION MOVE
# =============================================================================
[docs]class MCRotationMove(MetropolizedMove):
"""A metropolized move that randomly rotate a subset of atoms.
Parameters
----------
atom_subset : slice or list of int, optional
If specified, the move is applied only to those atoms specified by these
indices. If None, the move is applied to all atoms (default is None).
context_cache : openmmtools.cache.ContextCache, optional
The ContextCache to use for Context creation. If None, the global cache
openmmtools.cache.global_context_cache is used (default is None).
Attributes
----------
n_accepted : int
The number of proposals accepted.
n_proposed : int
The total number of attempted moves.
atom_subset
context_cache
See Also
--------
MetropolizedMove
"""
[docs] def __init__(self, **kwargs):
super(MCRotationMove, self).__init__(**kwargs)
[docs] @classmethod
def rotate_positions(cls, positions):
"""Return the positions after applying a random rotation to them.
Parameters
----------
positions : nx3 numpy.ndarray simtk.unit.Quantity
The positions to rotate.
Returns
-------
rotated_positions : nx3 numpy.ndarray simtk.unit.Quantity
The rotated positions.
"""
positions_unit = positions.unit
x_initial = positions / positions_unit
# Compute center of geometry of atoms to rotate.
x_initial_mean = x_initial.mean(0)
# Generate a random rotation matrix.
rotation_matrix = cls.generate_random_rotation_matrix()
# Apply rotation.
x_proposed = (rotation_matrix * np.matrix(x_initial - x_initial_mean).T).T + x_initial_mean
return unit.Quantity(x_proposed, positions_unit)
[docs] @classmethod
def generate_random_rotation_matrix(cls):
"""Return a random 3x3 rotation matrix.
Returns
-------
Rq : 3x3 numpy.ndarray
The random rotation matrix.
"""
q = cls._generate_uniform_quaternion()
return cls._rotation_matrix_from_quaternion(q)
@staticmethod
def _rotation_matrix_from_quaternion(q):
"""Compute a 3x3 rotation matrix from a given quaternion (4-vector).
Parameters
----------
q : 1x4 numpy.ndarray
Quaterion (need not be normalized, zero norm OK).
Returns
-------
Rq : 3x3 numpy.ndarray
Orthogonal rotation matrix corresponding to quaternion q.
Examples
--------
>>> q = np.array([0.1, 0.2, 0.3, -0.4])
>>> Rq = MCRotationMove._rotation_matrix_from_quaternion(q)
References
----------
[1] http://en.wikipedia.org/wiki/Rotation_matrix#Quaternion
"""
w, x, y, z = q
Nq = (q**2).sum() # Squared norm.
if Nq > 0.0:
s = 2.0 / Nq
else:
s = 0.0
X = x*s; Y = y*s; Z = z*s
wX = w*X; wY = w*Y; wZ = w*Z
xX = x*X; xY = x*Y; xZ = x*Z
yY = y*Y; yZ = y*Z; zZ = z*Z
Rq = np.matrix([[1.0-(yY+zZ), xY-wZ, xZ+wY],
[xY+wZ, 1.0-(xX+zZ), yZ-wX],
[xZ-wY, yZ+wX, 1.0-(xX+yY)]])
return Rq
@staticmethod
def _generate_uniform_quaternion():
"""Generate a uniform normalized quaternion 4-vector.
References
----------
[1] K. Shoemake. Uniform random rotations. In D. Kirk, editor,
Graphics Gems III, pages 124-132. Academic, New York, 1992.
[2] Described briefly here: http://planning.cs.uiuc.edu/node198.html
Examples
--------
>>> q = MCRotationMove._generate_uniform_quaternion()
"""
u = np.random.rand(3)
q = np.array([np.sqrt(1-u[0])*np.sin(2*np.pi*u[1]),
np.sqrt(1-u[0])*np.cos(2*np.pi*u[1]),
np.sqrt(u[0])*np.sin(2*np.pi*u[2]),
np.sqrt(u[0])*np.cos(2*np.pi*u[2])])
return q
def _propose_positions(self, initial_positions):
"""Implement MetropolizedMove._propose_positions for apply()"""
return self.rotate_positions(initial_positions)
# =============================================================================
# MAIN AND TESTS
# =============================================================================
if __name__ == "__main__":
import doctest
doctest.testmod()