# Copyright (c) 2026, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Definition of numerical MD integrators.
These integrators are representations of engines for performing
molecular dynamics. Typically they will propagate Newtons
equations of motion in time numerically.
Important classes defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
MDEngine (:py:class:`.MDEngine`)
Base class for internal MDEngines.
RandomWalk (:py:class:`.RandomWalk`)
A Random Walk integrator.
Verlet (:py:class:`.Verlet`)
A Verlet MD integrator.
VelocityVerlet (:py:class:`.VelocityVerlet`)
A Velocity Verlet MD integrator.
Langevin (:py:class:`.Langevin`)
A Langevin MD integrator.
"""
import logging
import os
import numpy as np
from pyretis.core.forcefieldhelp import rescale_system_velocities
from pyretis.core.systemhelp import get_system_dim
from pyretis.core.random_gen import create_random_generator
from pyretis.core.particlefunctions import (calculate_kinetic_energy,
calculate_thermo,
calculate_thermo_path,
reset_momentum)
from pyretis.core.velocity import is_aimless_velocity_setting
from pyretis.engines.engine import EngineBase
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())
__all__ = ['MDEngine', 'RandomWalk', 'Verlet', 'VelocityVerlet', 'Langevin']
[docs]class MDEngine(EngineBase):
"""Base class for internal MD integrators.
This class defines an internal MD integrator. This class of
integrators work with the positions and velocities of the system
object directly. Further, we make use of the system object in
order to update forces etc.
Attributes
----------
timestep : float
Time step for the integration.
description : string
Description of the MD integrator.
dynamics : str
A short string to represent the type of dynamics produced
by the integrator (NVE, NVT, stochastic, ...).
"""
engine_type = 'internal'
default_units = 'lj'
[docs] @classmethod
def get_default_units(cls, settings=None):
"""Return the default unit system for internal engines.
Parameters
----------
settings : dict, optional
Full simulation settings or engine settings. This argument
is ignored for internal engines.
Returns
-------
out : string
The default unit system for internal engines.
"""
_ = settings
return 'lj'
[docs] def __init__(self, timestep, description, dynamics=None):
"""Set up the integrator.
Parameters
----------
timestep : float
The time step for the integrator in internal units.
description : string
A short description of the integrator.
dynamics : string or None, optional
Description of the kind of dynamics the integrator does.
"""
super().__init__(description)
self.timestep = timestep
self.dynamics = dynamics
self._forcefield = None
# ``subcycles`` is the number of internal integration steps that
# make up one recorded path frame. The native in-memory loop
# records every step, i.e. ``subcycles = 1``; this default keeps
# the native behaviour byte-identical. ``steps`` is the running
# total of integration steps consumed across propagations, read
# by the infinite-swapping scheduler for its subcycle accounting
# (see ``pyretis.core._tis_inf.run_md``). Both are additive: the
# native loop never reads them.
self.subcycles = 1
self.steps = 0
# Streaming (file-backed) template state. Populated by
# :py:meth:`setup_streaming` only when the engine runs inside the
# coordinator's worker loop; ``None`` for the native in-memory
# loop, which never touches the streaming path.
self._stream_system = None
self._stream_names = None
[docs] def set_forcefield(self, forcefield):
"""Attach a force field for system-independent evaluation."""
self._forcefield = forcefield
[docs] def _evaluate_potential_and_force(self, system):
"""Evaluate potential/force using the engine's forcefield.
On first call, lazily acquires the forcefield from the system
if not yet set via :meth:`set_forcefield`.
"""
if self._forcefield is None:
ff = getattr(system, 'forcefield', None)
if ff is not None:
self._forcefield = ff
else:
raise RuntimeError(
'No forcefield available. Call engine.set_forcefield() '
'or pass a system with a forcefield attribute.'
)
pot, forces, virial = (
self._forcefield.evaluate_potential_and_force(system)
)
system.vpot = pot
system.force = forces
# ``virial`` is harness-only (no setter on snapshot System);
# writing through the bridge no-ops when there's no particles.
if hasattr(system, 'virial'):
system.virial = virial
return pot, forces, virial
[docs] @staticmethod
def _get_force(system):
"""Return the force array from either System type."""
return system.force
[docs] def integration_step(self, _):
"""Perform a single time step of the integration.
Parameters
----------
- : place holder
Returns
-------
out : None
Does not return anything, in derived classes it will
typically update the given `System`.
"""
raise NotImplementedError
[docs] def select_thermo_function(self, thermo='full'):
"""Select function for calculating thermodynamic properties.
Parameters
----------
thermo : string, or None, optional
String which selects the kind of thermodynamic output.
Returns
-------
thermo_func : callable or None
The function matching the requested thermodynamic output.
"""
thermo_func = None
if thermo is not None:
if thermo not in ('full', 'path'):
logger.debug(
'Unknown thermo "%s" requested: Using "path"!',
thermo,
)
thermo = 'path'
logger.debug('Thermo output for %s.integrate(): "%s"',
self.__class__.__name__, thermo)
if thermo == 'full':
thermo_func = calculate_thermo
elif thermo == 'path':
thermo_func = calculate_thermo_path
return thermo_func
[docs] def integrate(self, system, order_function, steps, thermo='full'):
"""Perform several integration steps.
This method will perform several integration steps, but it will
also calculate order parameter(s) if requested and energy
terms.
Parameters
----------
system : object like :py:class:`.System`
The system we are integrating.
order_function : object like :py:class:`.OrderParameter` or None
An order function can be specified if we want to
calculate the order parameter along with the simulation.
steps : integer
The number of steps we are going to perform. Note that we
do not integrate on the first step (e.g. step 0) but we do
obtain the other properties. This is to output the starting
configuration.
thermo : string, optional
Select the thermodynamic properties we are to calculate.
Yields
------
results : dict
The result of a MD step. This contains the state of the
system and also the order parameter(s) (if calculated) and
the thermodynamic quantities (if calculated).
"""
thermo_func = self.select_thermo_function(thermo=thermo)
self.order_function = order_function
self._evaluate_potential_and_force(system)
for i in range(steps):
if i == 0:
pass
else:
self.integration_step(system)
results = {'system': system}
if order_function is not None:
results['order'] = self.calculate_order(system)
if thermo_func:
results['thermo'] = thermo_func(system)
yield results
[docs] def invert_dt(self):
"""Invert the time step for the integration.
Returns
-------
out : boolean
True if the time step is positive, False otherwise.
"""
self.timestep *= -1.0
return self.timestep > 0.0
[docs] def propagate(self, path, ens_set, system, reverse=False):
"""Generate a path by integrating until a criterion is met.
This function will generate a path by calling the function
specifying the integration step repeatedly. The integration is
carried out until the order parameter has passed the specified
interfaces or if we have integrated for more than a specified
maximum number of steps.
Parameters
----------
path : object like :py:class:`.PathBase`
This is the path we use to fill in phase-space points.
We are here not returning a new path, this since we want
to delegate the creation of the path (type) to the method
that is running `propagate`.
ens_set : dict
Ensemble settings. It contains:
* `interfaces` : list of floats
These interfaces define the stopping criterion.
system : object like :py:class:`.System`
The system object gives the initial state for the
integration; it is mutated in place during propagation. The
order function is injected on the engine
(``self.order_function``).
reverse : boolean, optional
If True, the system will be propagated backward in time.
Returns
-------
success : boolean
This is True if we generated an acceptable path.
status : string
A text description of the current status of the propagation.
"""
if self._is_streaming(system):
return self._propagate_streaming(path, ens_set, system, reverse)
status = 'Propagate w/internal engine'
logger.debug(status)
success = False
self._evaluate_potential_and_force(system) # Make sure forces are set.
left, _, right = ens_set['interfaces']
for i in range(path.maxlen):
system.order = self.calculate_order(system)
ekin = calculate_kinetic_energy(system)[0]
system.ekin = ekin
status, success, stop, _ = self.add_to_path(path, system.copy(),
left, right)
if stop:
logger.debug('Stopping propagate at step: %i ', i)
break
if reverse:
system.vel *= -1.0
self.integration_step(system)
system.vel *= -1.0
else:
self.integration_step(system)
logger.debug('Propagate done: "%s" (success: %s)', status, success)
return success, status
[docs] def modify_velocities(self, system, vel_settings, rgen):
"""Modify the velocities of the current state.
This method will modify the velocities of a time slice.
And it is part of the integrator since it, conceptually,
fits here: we are acting on the system and modifying it.
Parameters
----------
system : object like :py:class:`.System`
This is the system that contains the particles we are
investigating.
vel_settings: dict.
It contains all the info for the velocity:
* `sigma_v` : numpy.array, optional
Negative values select aimless shooting. Zero or positive
values set the standard deviation, one for each particle,
for soft velocity perturbations.
* `momentum` : boolean, optional
If True, we reset the linear momentum to zero after
generating.
* `rescale` : float, optional
In some NVE simulations, we may wish to re-scale the
energy to a fixed value. If `rescale` is a float > 0,
we will re-scale the energy (after modification of
the velocities) to match the given float.
rgen : object like :py:class:`.RandomGenerator`
The velocity-generation random generator, injected per call.
It is kept distinct from the integration ``self.rgen`` that
stochastic integrators (Langevin, random walk) own, so a
shooting-move velocity kick never perturbs the dynamics
random stream.
Returns
-------
dek : float
The change in the kinetic energy.
kin_new : float
The new kinetic energy.
"""
if self._is_streaming(system):
return self._modify_velocities_streaming(
system, vel_settings, rgen
)
rescale = vel_settings.get('rescale')
if rescale is not None and rescale is not False:
if rescale > 0:
kin_old = rescale - system.vpot
do_rescale = True
else:
logger.warning('Ignored re-scale 6.2%f < 0.0.', rescale)
return 0.0, calculate_kinetic_energy(system)[0]
else:
kin_old = calculate_kinetic_energy(system)[0]
do_rescale = False
if is_aimless_velocity_setting(vel_settings):
vel, _ = rgen.draw_maxwellian_velocities(system)
system.vel = vel
else:
dvel, _ = rgen.draw_maxwellian_velocities(
system, sigma_v=vel_settings['sigma_v'])
system.vel = system.vel + dvel
if vel_settings.get('momentum', False):
reset_momentum(system)
if do_rescale:
rescale_system_velocities(system, rescale,
forcefield=system.forcefield)
kin_new = calculate_kinetic_energy(system)[0]
dek = kin_new - kin_old
return dek, kin_new
[docs] def __call__(self, system):
"""To allow calling `MDEngine(system)`.
Here, we are just calling `self.integration_step(system)`.
Parameters
----------
system : object like :py:class:`.System`
The system we are integrating.
Returns
-------
out : None
Does not return anything, but will update the particles.
"""
return self.integration_step(system)
[docs] def calculate_order(self, system, xyz=None, vel=None, box=None):
"""Return the order parameter.
This method is just to help to calculate the order parameter
in cases where only the engine can do it.
Parameters
----------
system : object like :py:class:`.System`
This is the system that contains the particles we are
investigating. The order function is injected on the engine
(``self.order_function``).
xyz : numpy.array, optional
The positions to use. Typically for internal engines, this
is not needed. It is included here as it can be used for
testing and also to be compatible with the generic function
defined by the parent.
vel : numpy.array, optional
The velocities to use.
box : numpy.array, optional
The current box vectors.
Returns
-------
out : list of floats
The calculated order parameter(s).
"""
order_function = self.order_function
if self._is_streaming(system) and xyz is None:
return self._calculate_order_streaming(system)
if any((xyz is None, vel is None, box is None)):
return order_function.calculate(system)
system.pos = xyz
system.vel = vel
system.box.update_size(box)
return order_function.calculate(system)
[docs] def kick_across_middle(self, system, ens_set, middle, tis_settings,
rgen):
"""Force a phase point across the middle interface.
This is accomplished by repeatedly kicking the phase point so
that it crosses the middle interface.
Parameters
----------
system : object like :py:class:`.System`
The working state we repeatedly kick and integrate. The
order function is injected on the engine
(``self.order_function``).
ens_set : dict
Ensemble settings (kept for interface symmetry with the
other explicit-convention methods).
middle : float
This is the value for the middle interface.
tis_settings : dict
This dictionary contains settings for TIS. Explicitly used here:
* `zero_momentum`: boolean, determines if the momentum is zeroed.
* `rescale_energy`: boolean, determines if energy is re-scaled.
rgen : object like :py:class:`.RandomGenerator`
The velocity-generation random generator, injected per call
(distinct from the integration ``self.rgen``).
Returns
-------
out[0] : object like :py:class:`.System`
The phase-point just before the interface.
out[1] : object like :py:class:`.System`
The phase-point just after the interface.
Note
----
This function will update the system state.
"""
# We search for crossing with the middle interface and do this
# by sequentially kicking the initial phase point:
previous = system.copy()
self._evaluate_potential_and_force(system) # Make sure forces are set.
curr = self.calculate_order(system)[0]
logger.info('Kicking from: %9.6f', curr)
while True:
# Save current state:
previous = system.copy()
# Modify velocities:
self.modify_velocities(
system, {'sigma_v': -1,
'momentum': tis_settings['zero_momentum'],
'rescale': tis_settings['rescale_energy']},
rgen)
# Update order parameter in case it is velocity dependent:
order = self.calculate_order(system)
previous.order = order
curr = order[0]
# Store modified velocities:
previous.vel = system.vel.copy()
# Integrate forward one step:
self.integration_step(system)
# Compare previous order parameter and the new one:
prev = curr
order = self.calculate_order(system)
curr = order[0]
if curr == middle:
# By construction we want two points, one left and one
# right of the interface, and these two points should
# be connected by a MD step. If we hit exactly on the
# interface we just fall back:
system.particles = previous.particles.copy()
order = previous.order
curr = order[0]
else:
if (prev <= middle < curr) or (curr < middle <= prev):
# Middle interface was crossed, just stop the loop.
logger.info('Crossing found: %9.6f %9.6f ', prev, curr)
break
if (prev <= curr < middle) or (middle < curr <= prev):
# We are getting closer, keep the new point.
pass
else: # We did not get closer, fall back to previous point.
system.particles = previous.particles.copy()
order = previous.order
curr = order[0]
system.order = order
return previous, system
[docs] def dump_phasepoint(self, phasepoint, deffnm=None):
"""Dump a phase point to a file.
In the native in-memory loop the phase points carry their own
particles and there is nothing to dump, so this is a no-op (kept
for compatibility with the external integrators). In the
streaming (file-backed) loop the phase point is a snapshot whose
``config`` points into a trajectory file; we extract that single
frame to its own file and re-point the phase point at it, exactly
as the external base does.
"""
if phasepoint.particles is not None:
return
name = deffnm if deffnm is not None else 'conf'
pos_file = self._dump_config_streaming(phasepoint.config, name)
phasepoint.set_pos((pos_file, 0))
[docs] def clean_up(self):
"""Clean up after using the engine.
For the native in-memory loop there is nothing to clean (no files
are produced and no streaming template is attached), so this is a
no-op -- byte-identical to the historical behaviour. Only when the
engine runs in the streaming (file-backed) loop (a template was
attached by :py:meth:`setup_streaming`) and the scheduler has
handed it a private ``exe_dir`` do we remove the working files
there so a re-used engine starts from a clean directory.
"""
if self._stream_system is None:
return
exe_dir = self.exe_dir
if exe_dir is None or not os.path.isdir(exe_dir):
return
for item in os.scandir(exe_dir):
if item.is_file():
try:
os.remove(item.path)
except OSError:
logger.debug('Could not remove: %s', item.path)
# ------------------------------------------------------------------
# Streaming (file-backed) mode.
#
# These methods give the internal integrator the same coordinator
# interface the external engines (e.g. turtlemd) expose, so a native
# config can run inside the infinite-swap coordinator's worker loop.
# They are ADDITIVE: every method above checks ``_is_streaming`` and
# only enters this path when handed a file-backed snapshot System
# (``system.particles is None``). The native in-memory loop never
# attaches a streaming template, so ``_is_streaming`` is always
# False there and the original code runs byte-identically.
# ------------------------------------------------------------------
[docs] def setup_streaming(self, settings):
"""Build the file-backed (streaming) template for this engine.
Mirrors :py:class:`.TurtleMDEngine`, which builds its own box /
particles / potential in ``__init__``: here we reuse the native
:func:`pyretis.setup.createsystem.create_system` and
:func:`pyretis.setup.createforcefield.create_force_field` to
build a fully-attached in-memory ``System`` template (particles,
box, masses, force field). Each streaming propagation materialises
a fresh working copy of this template, fills in the positions and
velocities read from disk, runs the existing in-memory integrator,
and writes the produced frames back out as an xyz trajectory.
Parameters
----------
settings : dict
The full (coordinator) settings dictionary. It carries the
native ``[system]`` / ``[box]`` / ``[particles]`` /
``[potential]`` / ``[forcefield]`` / ``[engine]`` sections the
native flow uses to build the system and force field.
"""
# Local imports: the setup package imports the engines package, so
# importing it at module top would create an import cycle.
# pylint: disable=import-outside-toplevel
from pyretis.setup.createsystem import create_system
from pyretis.setup.createforcefield import create_force_field
from pyretis.core.units import units_from_settings
# Each worker is a fresh process: the unit-conversion tables the
# native flow registers via ``units_from_settings`` are not set
# there yet, so register them before reading the initial
# configuration (otherwise the file-vs-system unit conversion has
# no entry, e.g. ('A', 'reduced')).
units_from_settings(settings)
template = create_system(settings)
template.forcefield = create_force_field(settings)
self._forcefield = template.forcefield
self._stream_system = template
self._stream_names = list(
settings.get('particles', {}).get('name', ['X'])
)
# A streaming engine may carry its own ``subcycles`` from the
# engine settings; default to 1 (record every integration step)
# so the time-per-step matches the native loop.
self.subcycles = int(settings.get('engine', {}).get('subcycles', 1))
[docs] def _is_streaming(self, system):
"""Return True when ``system`` is a file-backed snapshot.
The native in-memory loop always hands the engine a ``System``
with particles attached; the coordinator hands it a snapshot
whose ``particles`` is ``None`` and whose ``config`` points at a
trajectory file. Streaming additionally requires the engine to
have a template (set by :py:meth:`setup_streaming`).
"""
return (
self._stream_system is not None
and getattr(system, 'particles', None) is None
)
[docs] def _ensure_dynamics_rgen(self):
"""Wrap a raw numpy dynamics rgen in the pyretis generator API.
Stochastic integrators (Langevin / random walk) draw their
dynamics noise from ``self.rgen`` using the pyretis generator
surface (notably ``multivariate_normal(..., cho=...)``). The
infinite-swap scheduler, however, injects a raw
:class:`numpy.random.Generator` onto ``engine.rgen`` per move. To
keep the dynamics draws working -- and deterministic -- wrap that
raw generator in :class:`.PCG64Generator`, which exposes the
pyretis surface while drawing from the very same (deterministic)
underlying stream. Engines without ``self.rgen`` (Verlet /
velocity-Verlet) and already-wrapped generators are left
untouched.
"""
# Local import avoids any import-time coupling to the generator
# module beyond what the engine already needs.
from numpy.random import Generator # pylint: disable=C0415
from pyretis.core.random_gen import ( # pylint: disable=C0415
PCG64Generator,
)
rgen = getattr(self, 'rgen', None)
if isinstance(rgen, Generator):
self.rgen = PCG64Generator(generator=rgen)
[docs] def _fresh_stream_system(self, pos, vel):
"""Return a working in-memory System seeded with pos/vel.
Copies the streaming template (so the box, masses, particle
names and force field are all in place) and writes the given
positions and velocities into it. The template's own state is
never mutated.
"""
work = self._stream_system.copy()
dim = work.pos.shape[1]
work.pos = np.array(pos[:, :dim], copy=True)
work.vel = np.array(vel[:, :dim], copy=True)
return work
[docs] def _materialize_system(self, system):
"""Read the frame ``system.config`` points at into a System.
Returns the working in-memory ``System`` plus the raw 3-column
``pos`` / ``vel`` / ``box`` arrays read from disk (kept so the
produced frames are written back at full xyz width).
"""
from pyretis.engines._parts import ( # pylint: disable=C0415
convert_snapshot,
read_xyz_file,
)
filename, idx = system.config
target = idx if idx is not None else 0
for i, snapshot in enumerate(read_xyz_file(filename)):
if i == target:
box, xyz, vel, _ = convert_snapshot(snapshot)
work = self._fresh_stream_system(xyz, vel)
if system.vel_rev:
work.vel = -1.0 * work.vel
return work, xyz, vel, box
raise ValueError(
f'Could not read frame {target} from "{filename}"'
)
[docs] def _calculate_order_streaming(self, system):
"""Order parameter for a file-backed snapshot System."""
work, _, _, _ = self._materialize_system(system)
return self.order_function.calculate(work)
[docs] def streaming_energies(self, systems):
"""Recompute ``(vpot, ekin)`` for file-backed snapshot systems.
The potential energy is a function of the positions and the
kinetic energy a function of the velocities, both stored in each
system's trajectory frame, so the energy terms are recovered
exactly by re-reading the frame and evaluating this engine's
force field. This restores the per-frame energies on paths
rebuilt from disk (initial paths, swaps, and the reload performed
by :py:meth:`pyretis.core.pathstorage.PathStorage.output`), whose
snapshot phase points carry positions and velocities but no
in-memory energy. Each distinct trajectory file is read once.
Parameters
----------
systems : iterable of objects like :py:class:`.System`
File-backed snapshot systems: each ``config`` is a
``(filename, frame_index)`` pair and ``particles`` is None.
Returns
-------
list of tuple of float
One ``(vpot, ekin)`` pair per input system, in input order.
"""
from pyretis.engines._parts import ( # pylint: disable=C0415
convert_snapshot,
read_xyz_file,
)
frames_cache = {}
energies = []
for system in systems:
filename, idx = system.config
target = idx if idx is not None else 0
if filename not in frames_cache:
frames_cache[filename] = list(read_xyz_file(filename))
_, xyz, vel, _ = convert_snapshot(frames_cache[filename][target])
work = self._fresh_stream_system(xyz, vel)
if system.vel_rev:
work.vel = -1.0 * work.vel
self._evaluate_potential_and_force(work)
ekin = calculate_kinetic_energy(work)[0]
energies.append((float(work.vpot), float(ekin)))
return energies
[docs] def _dump_config_streaming(self, config, deffnm):
"""Extract a single frame from a trajectory into its own file."""
from pyretis.engines._parts import ( # pylint: disable=C0415
convert_snapshot,
read_xyz_file,
write_xyz_trajectory,
)
filename, idx = config
target = idx if idx is not None else 0
out_file = os.path.join(self.exe_dir, f'{deffnm}.xyz')
if idx is None and filename == out_file:
return out_file
for i, snapshot in enumerate(read_xyz_file(filename)):
if i == target:
box, xyz, vel, names = convert_snapshot(snapshot)
write_xyz_trajectory(
out_file, xyz, vel, names, box, append=False
)
return out_file
raise ValueError(
f'Could not extract frame {target} from "{filename}"'
)
[docs] @staticmethod
def _draw_maxwellian(system, rgen, sigma_v=None):
"""Draw Maxwellian velocities for ``system`` using ``rgen``.
Replicates :py:meth:`.RandomGenerator.draw_maxwellian_velocities`
exactly (same ``sigma_v`` from ``beta``/``imass`` and the same
``normal`` draw) but accepts the raw :class:`numpy.random.Generator`
the infinite-swap scheduler injects as the velocity-generation
random generator (``pens['rgen-eng']``), which does not carry the
pyretis ``draw_maxwellian_velocities`` method.
"""
if sigma_v is None or np.all(np.asarray(sigma_v) < 0.0):
kbt = 1.0 / system.temperature['beta']
sigma_v = np.sqrt(kbt * system.imass)
npart, dim = system.vel.shape
vel = rgen.normal(loc=0.0, scale=sigma_v, size=(npart, dim))
return vel, sigma_v
[docs] def _modify_velocities_streaming(self, system, vel_settings, rgen):
"""Draw shooting velocities for a file-backed snapshot System.
Mirrors :py:meth:`.TurtleMDEngine.modify_velocities`: read the
current frame, draw new velocities (or perturb them) with the
injected random generator, write a ``genvel.xyz`` file and point
the system at it. The kinetic-energy change is returned.
"""
from pyretis.engines._parts import ( # pylint: disable=C0415
convert_snapshot,
read_xyz_file,
write_xyz_trajectory,
)
from pyretis.core.particlefunctions import ( # pylint: disable=C0415
calculate_kinetic_energy,
)
filename, idx = system.config
target = idx if idx is not None else 0
snapshot = None
for i, snap in enumerate(read_xyz_file(filename)):
if i == target:
snapshot = snap
break
if snapshot is None:
raise ValueError(
f'Could not read frame {target} from "{filename}"'
)
box, xyz, vel, names = convert_snapshot(snapshot)
work = self._fresh_stream_system(xyz, vel)
self._evaluate_potential_and_force(work)
rescale = vel_settings.get('rescale')
if rescale is not None and rescale is not False and rescale > 0:
kin_old = rescale - work.vpot
do_rescale = True
else:
kin_old = calculate_kinetic_energy(work)[0]
do_rescale = False
if is_aimless_velocity_setting(vel_settings):
new_vel, _ = self._draw_maxwellian(work, rgen)
work.vel = new_vel
else:
dvel, _ = self._draw_maxwellian(
work, rgen, sigma_v=vel_settings['sigma_v'])
work.vel = work.vel + dvel
if vel_settings.get('momentum', False):
reset_momentum(work)
if do_rescale:
rescale_system_velocities(work, rescale,
forcefield=work.forcefield)
kin_new = calculate_kinetic_energy(work)[0]
dek = kin_new - kin_old
# Write the kicked frame back at full xyz width.
dim = work.pos.shape[1]
out_pos = np.array(xyz, copy=True)
out_vel = np.array(vel, copy=True)
out_pos[:, :dim] = work.pos
out_vel[:, :dim] = work.vel
conf_out = os.path.join(self.exe_dir, 'genvel.xyz')
write_xyz_trajectory(
conf_out, out_pos, out_vel, names, box, append=False
)
# The freshly kicked configuration is stored unreversed, so clear
# any inherited velocity-reversal flag on the snapshot.
system.config = (conf_out, 0)
system.vel_rev = False
system.ekin = kin_new
return dek, kin_new
[docs] def _propagate_streaming(self, path, ens_set, system, reverse):
"""Propagate a file-backed snapshot System (coordinator loop).
Counterpart of :py:meth:`.TurtleMDEngine._propagate_from` for the
internal integrators: materialise the starting frame, run the
in-memory integrator forward (or backward) one recorded frame at a
time, write each frame to an xyz trajectory, and build snapshot
``System`` objects pointing at it. ``self.steps`` is accumulated
so the scheduler's subcycle accounting matches the external
engines.
"""
from pyretis.engines._parts import ( # pylint: disable=C0415
write_xyz_trajectory,
)
from pyretis.core.common import counter # pylint: disable=C0415
status = f'Propagate streaming w/internal engine (reverse={reverse})'
logger.debug(status)
success = False
self._ensure_dynamics_rgen()
left, _, right = ens_set['interfaces']
work, raw_pos, raw_vel, box = self._materialize_system(system)
self._evaluate_potential_and_force(work)
dim = work.pos.shape[1]
prefix = str(counter())
traj_file = os.path.join(self.exe_dir, f'{prefix}_internal.xyz')
for i in range(path.maxlen):
order = self.order_function.calculate(work)
ekin = calculate_kinetic_energy(work)[0]
work.ekin = ekin
# Write this frame to the trajectory (full xyz width).
out_pos = np.array(raw_pos, copy=True)
out_vel = np.array(raw_vel, copy=True)
out_pos[:, :dim] = work.pos
# Frames are stored unreversed; the snapshot records the
# velocity direction via ``vel_rev`` so a re-read flips them.
out_vel[:, :dim] = -work.vel if reverse else work.vel
write_xyz_trajectory(
traj_file, out_pos, out_vel, self._stream_names, box,
step=i, append=True
)
snapshot = {
'order': order,
'config': (traj_file, i),
'vel_rev': reverse,
'ekin': ekin,
'vpot': work.vpot,
}
phase_point = self.snapshot_to_system(system, snapshot)
phase_point.order = order
status, success, stop, _ = self.add_to_path(
path, phase_point, left, right
)
if stop:
logger.debug('Streaming propagate stop at step %i', i)
break
for _ in range(self.subcycles):
if reverse:
work.vel *= -1.0
self.integration_step(work)
work.vel *= -1.0
else:
self.integration_step(work)
self.steps += 1
logger.debug('Streaming propagate done: "%s" (success: %s)',
status, success)
return success, status
[docs]class Verlet(MDEngine):
"""The Verlet MD integrator.
This class defines the Verlet MD integrator.
Attributes
----------
half_idt : float
Half of the inverse time step: `0.5 / timestep`.
timestepsq : float
Squared time step: `timestep**2`.
previous_pos : numpy.array
Stores the previous positions of the particles.
"""
[docs] def __init__(self, timestep):
"""Set up the Verlet MD integrator.
Parameters
----------
timestep : float
The time step in internal units.
"""
super().__init__(timestep, 'Verlet MD integrator', dynamics='NVE')
self.half_idt = 0.5 / self.timestep
self.timestepsq = self.timestep**2
self.previous_pos = None
[docs] def set_initial_positions(self, particles):
"""Get initial positions for the Verlet integration.
Parameters
----------
particles : object like :py:class:`.Particles`
The initial configuration. Positions and velocities are
required.
"""
self.previous_pos = particles.pos - particles.vel * self.timestep
[docs] def integration_step(self, system):
"""Perform one Verlet integration step.
Parameters
----------
system : object like :py:class:`.System`
The system to integrate/act on. Assumed to have a particle
list in ``system.particles``.
Returns
-------
out : None
Does not return anything, but alters the state of the given
system.
"""
acc = self._get_force(system) * system.imass
pos = 2.0 * system.pos - self.previous_pos + acc * self.timestepsq
system.vel = (pos - self.previous_pos) * self.half_idt
self.previous_pos, system.pos = system.pos, pos
self._evaluate_potential_and_force(system)
[docs] def restart_info(self):
"""Return restart info.
The restart info for the engine Verlet.
Returns
-------
info : dict,
Contains all the updated simulation settings and counters.
"""
info = super().restart_info()
info['class'] = 'Verlet'
for attr, value in self.__dict__.items():
# ``order_function`` is transient injected state (restored
# separately from the ensemble settings), not durable engine
# config, so it is excluded from the restart info.
if attr in ('_forcefield', 'order_function'):
continue
info[attr] = value
return info
[docs] def load_restart_info(self, info):
"""Load restart information.
Parameters
----------
info : dict
The dictionary with the restart information, should be
similar to the dict produced by :py:func:`.restart_info`.
"""
super().load_restart_info(info)
self.timestep = info.get('timestep')
self.half_idt = info.get('half_idt')
self.timestepsq = info.get('timestepsq')
self.previous_pos = info.get('previous_pos')
[docs]class RandomWalk(MDEngine):
"""A Random Walker integrator.
This class defines a Random walker integrator.
Attributes
----------
timestep : float
The length of the step.
"""
[docs] def __init__(self, timestep, rgen=None, seed=0):
"""Set up the Random walker integrator.
Parameters
----------
timestep : float
The time step in internal units.
rgen : string, optional
This string can be used to pick a particular random
generator, which is useful for testing.
seed : integer, optional
A seed for the random generator.
"""
super().__init__(timestep, 'Random Walker integrator',
dynamics='NoNe')
rgen_settings = {'seed': seed, 'rgen': rgen}
self.rgen = create_random_generator(rgen_settings)
[docs] def integration_step(self, system):
"""Random Walker integration, one time step.
Parameters
----------
system : object like :py:class:`.System`
The system to integrate/act on. Assumed to have a particle
list in ``system.particles``.
Returns
-------
out : None
Does not return anything, but alters the state of the given
`system`.
"""
system.vel, _ = self.rgen.draw_maxwellian_velocities(system)
system.pos += self.timestep * system.vel
[docs] def restart_info(self):
"""Return restart info for the RandomWalk engine.
The integration random generator (``rgen``) is persisted so that
a restart -- including a re-run of an interrupted shot -- resumes
the exact same stochastic stream (bit-identical dynamics).
Returns
-------
info : dict
Contains all the updated simulation settings and counters.
"""
info = super().restart_info()
info['class'] = 'RandomWalk'
for attr, value in self.__dict__.items():
# ``order_function`` is transient injected state, excluded
# from the restart info (see Langevin.restart_info).
if attr in ('_forcefield', 'order_function'):
continue
if attr == 'rgen':
info[attr] = value.get_state()
else:
info[attr] = value
return info
[docs] def load_restart_info(self, info):
"""Load restart information for the RandomWalk engine.
Parameters
----------
info : dict
The dictionary with the restart information, should be
similar to the dict produced by :py:func:`.restart_info`.
"""
super().load_restart_info(info)
self.timestep = info.get('timestep')
self.dynamics = info.get('dynamics')
if 'rgen' in info:
self.rgen = create_random_generator(info['rgen'])
[docs]class VelocityVerlet(MDEngine):
"""The Velocity Verlet MD integrator.
This class defines the Velocity Verlet integrator.
Attributes
----------
half_timestep : float
Half of the timestep.
"""
[docs] def __init__(self, timestep):
"""Set up the Velocity Verlet integrator.
Parameters
----------
timestep : float
The time step in internal units.
"""
super().__init__(timestep, 'Velocity Verlet MD integrator',
dynamics='NVE')
self.half_timestep = self.timestep * 0.5
[docs] def integration_step(self, system):
"""Velocity Verlet integration, one time step.
Parameters
----------
system : object like :py:class:`.System`
The system to integrate/act on. Assumed to have a particle
list in ``system.particles``.
Returns
-------
out : None
Does not return anything, but alters the state of the given
`system`.
"""
imass = system.imass
system.vel += self.half_timestep * self._get_force(system) * imass
system.pos += self.timestep * system.vel
self._evaluate_potential_and_force(system)
system.vel += self.half_timestep * self._get_force(system) * imass
[docs] def restart_info(self):
"""Return some restart info.
The restart info for the engine VelocityVerlet.
Returns
-------
info : dict,
Contains all the updated simulation settings and counters.
"""
info = super().restart_info()
info['class'] = 'VelocityVerlet'
for attr, value in self.__dict__.items():
# ``order_function`` is transient injected state (restored
# separately from the ensemble settings), not durable engine
# config, so it is excluded from the restart info.
if attr in ('_forcefield', 'order_function'):
continue
info[attr] = value
return info
[docs] def load_restart_info(self, info):
"""Load restart information.
Parameters
----------
info : dict
The dictionary with the restart information, should be
similar to the dict produced by :py:func:`.restart_info`.
"""
super().load_restart_info(info)
self.timestep = info.get('timestep')
self.half_timestep = info.get('half_timestep')
self.dynamics = info.get('dynamics')
[docs]class Langevin(MDEngine):
"""The Langevin MD integrator.
This class defines a Langevin integrator.
Attributes
----------
rgen : object like :py:class:`.RandomGenerator`
This is the class that handles the generation of random numbers.
gamma : float
The friction parameter.
high_friction : boolean
Determines if we are in the high friction limit and should
do the over-damped version.
init_params : boolean
If true, we will initiate parameters for the Langevin
integrator when `integrate_step` is invoked.
param_high : dict
This contains the parameters for the high friction limit. Here
we integrate the equations of motion according to:
``r(t + dt) = r(t) + dt * f(t)/m*gamma + dr``.
The items in the dict are:
* `sigma` : float
standard deviation for the positions, used when drawing dr.
* `bddt` : numpy.array
Equal to ``dt*gamma/masses``, since the masses is a
numpy.array this will have the same shape.
param_iner : dict
This dict contains the parameters for the non-high friction
limit where we integrate the equations of motion according to:
``r(t + dt) = r(t) + c1 * dt * v(t) + c2*dt*dt*a(t) + dr``
and
``v(r + dt) = c0 * v(t) + (c1-c2)*dt*a(t) + c2*dt*a(t+dt) + dv``.
The dict contains:
* `c0` : float
Corresponds to ``c0`` in the equation above.
* `a1` : float
Corresponds to ``c1*dt`` in the equation above.
* `a2` : numpy.array
Corresponds to ``c2*dt*dt/mass`` in the equation above.
Here we divide by the masses in order to use the forces rather
than the acceleration. Since the masses might be different for
different particles, this will result in a numpy.array with
shape equal to the shape of the masses.
* `b1` : numpy.array
Corresponds to ``(c1-c2)*dt/mass`` in the equation above.
Here we also divide by the masses, resulting in a numpy.array.
* `b2` : numpy.array
Corresponds to ``c2*dt/mass`` in the equation above.
Here we also divide by the masses, resulting in a numpy.array.
* `mean` : numpy.array (2,)
The means for the bivariate Gaussian distribution.
* `cov` : numpy.array (2,2)
This array contains the covariance for the bivariate Gaussian
distribution. `param_iner['mean']` and `param_iner['cov']` are
used as parameters when drawing ``dr`` and ``dv`` from the
bivariate distribution.
Note
----
Currently, we are using a multi-normal distribution from numpy.
Consider replacing this one as it seems somewhat slow.
"""
[docs] def __init__(self, timestep, gamma, rgen=None, seed=0,
high_friction=False):
"""Set up the Langevin integrator.
Actually, it is very convenient to set some variables for the
different particles. However, to have a uniform initialisation
for the different integrators, we postpone this.
This initialisation can be done later by calling explicitly the
function `self._init_parameters(system)` or it will be called
the first time `self.integration_step` is invoked.
Parameters
----------
timestep : float
The time step in internal units.
gamma : float
The gamma parameter for the Langevin integrator.
rgen : string, optional
This string can be used to pick a particular random
generator, which is useful for testing.
seed : integer, optional
A seed for the random generator.
high_friction : boolean, optional
Determines if we are in the high_friction limit and should
do the over-damped version.
"""
super().__init__(timestep, 'Langevin MD integrator',
dynamics='stochastic')
self.gamma = gamma
self.high_friction = high_friction
rgen_settings = {'seed': seed, 'rgen': rgen}
self.rgen = create_random_generator(rgen_settings)
self.param_high = {'sigma': None, 'bddt': None}
self.param_iner = {'c0': None, 'a1': None, 'a2': None,
'b1': None, 'b2': None, 'mean': None, 'cov': None}
self.init_params = True
[docs] def _init_parameters(self, system):
"""Extra initialisation of the Langevin integrator.
Parameters
----------
system : object like :py:class:`.System`
The system to integrate/act on. Assumed to have a particle
list in ``system.particles``.
Returns
-------
out : None
Does not return anything, but updates ``self.param``.
"""
beta = system.temperature['beta']
imasses = system.imass
if self.high_friction:
self.param_high['sigma'] = np.sqrt(2.0 * self.timestep *
imasses/(beta * self.gamma))
self.param_high['bddt'] = self.timestep * imasses / self.gamma
else:
gammadt = self.gamma * self.timestep
exp_gdt = np.exp(-gammadt)
if self.gamma > 0.0:
c_0 = exp_gdt
c_1 = (1.0 - c_0) / gammadt
c_2 = (1.0 - c_1) / gammadt
else:
raise ValueError(
f'Langevin: Found gamma = {self.gamma:6.2f} < 0.'
)
self.param_iner['c0'] = c_0
self.param_iner['a1'] = c_1 * self.timestep
self.param_iner['a2'] = c_2 * self.timestep**2 * imasses
self.param_iner['b1'] = (c_1 - c_2) * self.timestep * imasses
self.param_iner['b2'] = c_2 * self.timestep * imasses
self.param_iner['mean'] = []
self.param_iner['cov'] = []
self.param_iner['cho'] = []
for imass in imasses:
sig_ri2 = ((self.timestep * imass[0] / (beta * self.gamma)) *
(2. - (3. - 4.*exp_gdt + exp_gdt**2) / gammadt))
sig_vi2 = (1.0 - exp_gdt**2) * imass[0] / beta
cov_rvi = (imass[0]/(beta * self.gamma)) * (1.0 - exp_gdt)**2
cov_matrix = np.array([[sig_ri2, cov_rvi],
[cov_rvi, sig_vi2]])
self.param_iner['cov'].append(cov_matrix)
self.param_iner['cho'].append(np.linalg.cholesky(cov_matrix))
self.param_iner['mean'].append(np.zeros(2))
[docs] def integration_step(self, system):
"""Langevin integration, one time step.
Parameters
----------
system : object like :py:class:`.System`
The system to integrate/act on. Assumed to have a particle
list in ``system.particles``.
Returns
-------
out : None
Does not return anything, but alters the state of the given
system.
"""
if self.init_params:
self._init_parameters(system)
self.init_params = False
if self.high_friction:
return self.integration_step_overdamped(system)
return self.integration_step_inertia(system)
[docs] def integration_step_overdamped(self, system):
"""Over damped Langevin integration, one time step.
Parameters
----------
system : object like :py:class:`.System`
The system to integrate/act on. Assumed to have a particle
list in ``system.particles``.
Returns
-------
out : None
Does not return anything, but alters the state of the given
`system`.
"""
self._evaluate_potential_and_force(system)
rands = self.rgen.normal(loc=0.0, scale=self.param_high['sigma'],
size=system.vel.shape)
system.pos += self.param_high['bddt'] * self._get_force(system) + rands
system.vel = rands
[docs] def integration_step_inertia(self, system):
"""Langevin integration, one time step.
Parameters
----------
system : object like :py:class:`.System`
The system to integrate/act on. Assumed to have a particle
list in ``system.particles``.
Returns
-------
out : None
Does not return anything, but alters the state of the given
`system`.
"""
pos_rand = np.zeros(system.pos.shape)
vel_rand = np.zeros(system.vel.shape)
if self.gamma > 0.0:
mean, cov = self.param_iner['mean'], self.param_iner['cov']
cho = self.param_iner['cho']
for i, (meani, covi, choi) in enumerate(
zip(mean, cov, cho, strict=True)):
randxv = self.rgen.multivariate_normal(
meani, covi, cho=choi, size=get_system_dim(system)
)
pos_rand[i] = randxv[:, 0]
vel_rand[i] = randxv[:, 1]
force = self._get_force(system)
system.pos += (self.param_iner['a1'] * system.vel +
self.param_iner['a2'] * force + pos_rand)
vel2 = (self.param_iner['c0'] * system.vel +
self.param_iner['b1'] * force + vel_rand)
self._evaluate_potential_and_force(system)
system.vel = vel2 + self.param_iner['b2'] * self._get_force(system)
[docs] def restart_info(self):
"""Return restart info.
The restart info for the engine Langevin.
Returns
-------
info : dict
Contains all the updated simulation settings and counters.
"""
info = super().restart_info()
info['class'] = 'Langevin'
for attr, value in self.__dict__.items():
# ``order_function`` is transient injected state, excluded
# from the restart info (see VelocityVerlet.restart_info).
if attr in ('_forcefield', 'order_function'):
continue
if attr == 'rgen':
info[attr] = value.get_state()
else:
info[attr] = value
return info
[docs] def load_restart_info(self, info):
"""Load restart information.
Parameters
----------
info : dict
The dictionary with the restart information, should be
similar to the dict produced by :py:func:`.restart_info`.
"""
super().load_restart_info(info)
self.timestep = info.get('timestep')
self.dynamics = info.get('dynamics')
self.gamma = info.get('gamma')
self.high_friction = info.get('high_friction')
self.param_high = info.get('param_high')
self.param_iner = info.get('param_iner')
self.init_params = info.get('init_params')
if 'rgen' in info:
self.rgen = create_random_generator(info['rgen'])