# Copyright (c) 2026, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Definition of the OpenMM engine.
This module defines the class for the OpenMM MD engine.
Important classes defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
OpenMMEngine (:py:class:`.OpenMMEngine`)
The class for running the OpenMM engine.
"""
import logging
from pyretis.core import System, Particles
from pyretis.core.forcefieldhelp import rescale_system_velocities
from pyretis.core.particlefunctions import (calculate_kinetic_energy,
reset_momentum)
from pyretis.core.velocity import is_aimless_velocity_setting
from pyretis.core.engine_time import validate_engine_timestep
from pyretis.engines.engine import EngineBase
from pyretis.core.box import create_box, box_matrix_to_list
from pyretis.core.common import import_from
try:
import openmm.unit as u
HAS_OPENMM = True
except ImportError:
try:
import simtk.unit as u # pragma: no cover
HAS_OPENMM = True # pragma: no cover
except ImportError: # pragma: no cover
HAS_OPENMM = False # pragma: no cover
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())
__all__ = ['OpenMMEngine']
def make_pyretis_system(settings):
"""Make a PyRETIS system from OpenMM Simulation.
First this looks for setting->engine->openmm_module and openmm_simulation.
It then imports that openmm_simulation object and creates a internal
pyretis system representation
"""
simulation = import_from(settings['engine']['openmm_module'],
settings['engine']['openmm_simulation'])
state = simulation.context.getState(getPositions=True, getVelocities=True)
box = state.getPeriodicBoxVectors(asNumpy=True).T
box = create_box(cell=box_matrix_to_list(box),
periodic=[True, True, True])
system = System(units='openmm', box=box,
temperature=simulation.integrator.getTemperature())
system.particles = Particles(system.get_dim())
pos = state.getPositions(asNumpy=True)
vel = state.getVelocities(asNumpy=True)
for i, atom in enumerate(simulation.topology.atoms()):
mass = simulation.system.getParticleMass(i)
mass = mass.value_in_unit(u.grams/u.mole)
name = atom.name
system.particles.add_particle(pos=pos[i], vel=vel[i],
force=[None, None, None],
mass=mass, name=name)
return system
[docs]class OpenMMEngine(EngineBase):
"""
A class for interfacing with OpenMM.
This class defines the interface to OpenMM.
Attributes
----------
simulation: OpenMM.simulation
OpenMM simulation object.
subcyles: int
Number of OpenMM steps per PyRETIS step.
"""
engine_type = 'openmm'
default_units = 'openmm'
[docs] @classmethod
def get_default_units(cls, settings=None):
"""Return the default unit system for the OpenMM engine.
Parameters
----------
settings : dict, optional
Full simulation settings or engine settings. This argument
is ignored for the OpenMM engine.
Returns
-------
out : string
The default unit system for the OpenMM engine.
"""
_ = settings
return 'openmm'
[docs] def __init__(self, openmm_simulation, subcycles=1, openmm_module=None,
timestep=None):
"""Set up the OpenMM Engine.
Parameters
----------
openmm_simulation : OpenMM.simulation, or string
The OpenMM simulation object or the variable name if `module` is
not None.
subcycles: int
Number of OpenMM integration steps per PyRETIS step.
openmm_module: string, optional
If defined and simulation is a string, try loading `simulation`
from `module`.
timestep : float, optional
The OpenMM time step is defined by the integrator; this optional
``[engine]`` value is only used as a cross-check (in ps) and must
equal the integrator step size (otherwise an error is raised).
"""
# Check if openmm is installed.
if HAS_OPENMM is False:
raise RuntimeError("OpenMM is not installed")
super().__init__(description='OpenMM')
if isinstance(openmm_simulation, str) and openmm_module is not None:
openmm_simulation = import_from(openmm_module, openmm_simulation)
if isinstance(openmm_simulation, str):
msg = "simulation is a string but could not be loaded from module"
msg += f" '{openmm_module}'"
raise RuntimeError(msg)
self.simulation = openmm_simulation
self.subcycles = subcycles
# The OpenMM integrator owns the time step; expose it (in ps) so
# this engine satisfies the same timestep contract as the others
# and the value is available to the flux/rate analysis. Guard the
# lookup so a bare stand-in simulation (used in some tests) does
# not break construction; a real run always has an integrator.
integrator = getattr(self.simulation, 'integrator', None)
if integrator is not None:
self.timestep = integrator.getStepSize().value_in_unit(
u.picosecond
)
# The integrator owns the time step; if one was also given in
# the [engine] settings, it must agree with it.
validate_engine_timestep(timestep, self.timestep, 'OpenMM')
[docs] def calculate_order(self, system, xyz=None, vel=None, box=None):
"""Return the order parameter.
This method is just to help to calculate the order parameter
in cases where only the engine can do it.
Parameters
----------
system : object like :py:class:`.System`
This is the system that contains the particles we are
investigating.
xyz : numpy.array, optional
The positions to use. Typically for internal engines, this
is not needed. It is included here as it can be used for
testing and also to be compatible with the generic function
defined by the parent.
vel : numpy.array, optional
The velocities to use.
box : numpy.array, optional
The current box vectors.
Returns
-------
out : list of floats
The calculated order parameter(s).
"""
order_function = self.order_function
if any((xyz is None, vel is None, box is None)):
return order_function.calculate(system)
system.pos = xyz
system.vel = vel
system.box.update_size(box)
return order_function.calculate(system)
[docs] def clean_up(self):
"""Clean up after using the engine.
Currently, this is only included for compatibility with external
integrators.
"""
return
[docs] def dump_phasepoint(self, phasepoint, deffnm=None):
"""For compatibility with external integrators."""
return
[docs] def integration_step(self, system): # pylint: disable=arguments-renamed
"""Perform one integration step of n subcycles.
Parameters
----------
system : object like :py:class:`.System`
The system to integrate/act on. Assumed to have a particle
list in ``system.particles``.
Returns
-------
out : None
Does not return anything, but alters the state of the given
system.
"""
pos = system.pos
vel = system.vel
box = system.box
context = self.simulation.context
# Update the simulation to the correct system:
context.setPositions(pos)
context.setVelocities(vel)
# PyRETIS matrix.T is openmm boxvectors:
if box is not None:
vectors = list(box.box_matrix.T)
context.setPeriodicBoxVectors(*vectors)
# Run the simulation for n subcycles:
self.simulation.step(self.subcycles)
# Get the current state:
state = context.getState(getPositions=True, getVelocities=True)
# Update system:
system.pos = state.getPositions(asNumpy=True)
system.vel = state.getVelocities(asNumpy=True)
if box is not None:
box.update_size(
box_matrix_to_list(state.getPeriodicBoxVectors(asNumpy=True).T)
)
[docs] def kick_across_middle(self, system, ens_set, middle, tis_settings,
rgen):
"""Force a phase point across the middle interface.
This is accomplished by repeatedly kicking the phase point so
that it crosses the middle interface.
Parameters
----------
system : object like :py:class:`.System`
This is the system that contains the particles we are
investigating.
ens_set : dict
It contains the ensemble settings. This method does not use
it directly, but it is part of the common interface.
middle : float
This is the value for the middle interface.
tis_settings : dict
This dictionary contains settings for TIS. Explicitly used here:
* `zero_momentum`: boolean, determines if the momentum is zeroed.
* `rescale_energy`: boolean, determines if energy is re-scaled.
rgen : object like :py:class:`.RandomGenerator`
This is the random generator that will be used.
Returns
-------
out[0] : object like :py:class:`.System`
The phase-point just before the interface.
out[1] : object like :py:class:`.System`
The phase-point just after the interface.
Note
----
This function will update the system state.
"""
# Taken from MDEngine in internal.py
# We search for crossing with the middle interface and do this
# by sequentially kicking the initial phase point:
_ = ens_set
previous = None
curr = self.calculate_order(system)[0]
while True: # pragma: no cover
# Save current state:
previous = system.copy()
# Save previous box matrix:
prev_box = system.box.box_matrix
# Modify velocities:
vel_settings = {'sigma_v': -1,
'momentum': tis_settings['zero_momentum'],
'rescale': tis_settings['rescale_energy']}
self.modify_velocities(system, vel_settings, rgen)
# Update order parameter in case it is velocity dependent:
order = self.calculate_order(system)
previous.order = order
curr = order[0]
# Store modified velocities:
previous.vel = system.vel.copy()
# Integrate forward one step:
self.integration_step(system)
# Compare previous order parameter and the new one:
prev = curr
curr = self.calculate_order(system)[0]
if (prev < middle < curr) or (curr < middle < prev):
# Middle interface was crossed, just stop the loop.
logger.info('Crossing found: %9.6f %9.6f ', prev, curr)
break
if (prev <= curr <= middle) or (middle <= curr <= prev):
# We are getting closer, keep the new point.
pass
elif prev == middle:
# Unlucky case, we want more than 1 point, so search more.
pass
else: # We did not get closer, fall back to previous point.
system.particles = previous.particles.copy()
system.box.update_size(box_matrix_to_list(prev_box))
curr = previous.order[0]
return previous, system
[docs] def modify_velocities(self, system, vel_settings, rgen):
"""Modify the velocities of the current state.
This method will modify the velocities of a time slice.
And it is part of the integrator since it, conceptually,
fits here: we are acting on the system and modifying it.
Parameters
----------
system : object like :py:class:`.System`
This is the system that contains the particles we are
investigating.
vel_settings: dict.
It contains info about velocity settings:
* `sigma_v` : numpy.array, optional
Negative values select aimless shooting. Zero or positive
values set the standard deviation, one for each particle,
for soft velocity perturbations.
* `momentum` : boolean, optional
If True, we reset the linear momentum to zero after
generating.
* `rescale` : float, optional
In some NVE simulations, we may wish to re-scale the
energy to a fixed value. If `rescale` is a float > 0,
we will re-scale the energy (after modification of
the velocities) to match the given float.
rgen : object like :py:class:`.RandomGenerator`
This is the random generator that will be used.
Returns
-------
dek : float
The change in the kinetic energy.
kin_new : float
The new kinetic energy.
"""
rescale = vel_settings.get('rescale')
if rescale is not None and rescale is not False:
if rescale > 0:
kin_old = rescale - system.vpot
do_rescale = True
else:
logger.warning('Ignored re-scale 6.2%f < 0.0.', rescale)
return 0.0, calculate_kinetic_energy(system)[0]
else:
kin_old = calculate_kinetic_energy(system)[0]
do_rescale = False
if is_aimless_velocity_setting(vel_settings):
vel, _ = rgen.draw_maxwellian_velocities(system=system)
system.vel = vel
else: # Soft velocity change, from a Gaussian distribution:
dvel, _ = rgen.draw_maxwellian_velocities(
system=system,
sigma_v=vel_settings['sigma_v'])
system.vel = system.vel + dvel
if vel_settings.get('momentum', False):
reset_momentum(system)
if do_rescale:
rescale_system_velocities(system, rescale,
forcefield=system.forcefield)
kin_new = calculate_kinetic_energy(system)[0]
dek = kin_new - kin_old
return dek, kin_new
[docs] def propagate(self, path, ens_set, system, reverse=False):
"""Generate a path by integrating until a criterion is met.
This function will generate a path by calling the function
specifying the integration step repeatedly. The integration is
carried out until the order parameter has passed the specified
interfaces or if we have integrated for more than a specified
maximum number of steps. The given system defines the initial
state and the system is reset to its initial state when this
method is done.
Parameters
----------
path : object like :py:class:`.PathBase`
This is the path we use to fill in phase-space points.
We are here not returning a new path - this since we want
to delegate the creation of the path (type) to the method
that is running `propagate`.
ens_set : dict
It contains the ensemble settings. Explicitly used here:
* `interfaces` : list of floats
These interfaces define the stopping criterion.
system : object like :py:class:`.System`
The system object gives the initial state for the
integration. The initial state is stored and the system is
reset to the initial state when the integration is done.
reverse : boolean, optional
If True, the system will be propagated backward in time.
Returns
-------
success : boolean
This is True if we generated an acceptable path.
status : string
A text description of the current status of the propagation.
"""
status = 'Propagate w/OpenMM engine'
logger.debug(status)
success = False
left, _, right = ens_set['interfaces']
for i in range(path.maxlen):
order = self.calculate_order(system)
kin = calculate_kinetic_energy(system)[0]
snapshot = {'order': order,
'pos': system.pos,
'vel': system.vel,
'box': system.box,
'ekin': kin,
'vpot': None}
phasepoint = self.snapshot_to_system(system, snapshot)
status, success, stop, _ = self.add_to_path(path, phasepoint,
left, right)
if stop:
logger.debug('Stopping propagate at step: %i ', i)
break
if reverse:
system.vel *= -1.0
self.integration_step(system)
system.vel *= -1.0
else:
self.integration_step(system)
logger.debug('Propagate done: "%s" (success: %s)', status, success)
return success, status