Source code for pyretis.engines.internal

# Copyright (c) 2026, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Definition of numerical MD integrators.

These integrators are representations of engines for performing
molecular dynamics. Typically they will propagate Newtons
equations of motion in time numerically.

Important classes defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

MDEngine (:py:class:`.MDEngine`)
    Base class for internal MDEngines.

RandomWalk (:py:class:`.RandomWalk`)
    A Random Walk integrator.

Verlet (:py:class:`.Verlet`)
    A Verlet MD integrator.

VelocityVerlet (:py:class:`.VelocityVerlet`)
    A Velocity Verlet MD integrator.

Langevin (:py:class:`.Langevin`)
    A Langevin MD integrator.
"""
import logging
import os
import numpy as np
from pyretis.core.forcefieldhelp import rescale_system_velocities
from pyretis.core.systemhelp import get_system_dim
from pyretis.core.random_gen import create_random_generator
from pyretis.core.particlefunctions import (calculate_kinetic_energy,
                                            calculate_thermo,
                                            calculate_thermo_path,
                                            reset_momentum)
from pyretis.core.velocity import is_aimless_velocity_setting
from pyretis.engines.engine import EngineBase

logger = logging.getLogger(__name__)  # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())


__all__ = ['MDEngine', 'RandomWalk', 'Verlet', 'VelocityVerlet', 'Langevin']


[docs]class MDEngine(EngineBase): """Base class for internal MD integrators. This class defines an internal MD integrator. This class of integrators work with the positions and velocities of the system object directly. Further, we make use of the system object in order to update forces etc. Attributes ---------- timestep : float Time step for the integration. description : string Description of the MD integrator. dynamics : str A short string to represent the type of dynamics produced by the integrator (NVE, NVT, stochastic, ...). """ engine_type = 'internal' default_units = 'lj'
[docs] @classmethod def get_default_units(cls, settings=None): """Return the default unit system for internal engines. Parameters ---------- settings : dict, optional Full simulation settings or engine settings. This argument is ignored for internal engines. Returns ------- out : string The default unit system for internal engines. """ _ = settings return 'lj'
[docs] def __init__(self, timestep, description, dynamics=None): """Set up the integrator. Parameters ---------- timestep : float The time step for the integrator in internal units. description : string A short description of the integrator. dynamics : string or None, optional Description of the kind of dynamics the integrator does. """ super().__init__(description) self.timestep = timestep self.dynamics = dynamics self._forcefield = None # ``subcycles`` is the number of internal integration steps that # make up one recorded path frame. The native in-memory loop # records every step, i.e. ``subcycles = 1``; this default keeps # the native behaviour byte-identical. ``steps`` is the running # total of integration steps consumed across propagations, read # by the infinite-swapping scheduler for its subcycle accounting # (see ``pyretis.core._tis_inf.run_md``). Both are additive: the # native loop never reads them. self.subcycles = 1 self.steps = 0 # Streaming (file-backed) template state. Populated by # :py:meth:`setup_streaming` only when the engine runs inside the # coordinator's worker loop; ``None`` for the native in-memory # loop, which never touches the streaming path. self._stream_system = None self._stream_names = None
[docs] def set_forcefield(self, forcefield): """Attach a force field for system-independent evaluation.""" self._forcefield = forcefield
[docs] def _evaluate_potential_and_force(self, system): """Evaluate potential/force using the engine's forcefield. On first call, lazily acquires the forcefield from the system if not yet set via :meth:`set_forcefield`. """ if self._forcefield is None: ff = getattr(system, 'forcefield', None) if ff is not None: self._forcefield = ff else: raise RuntimeError( 'No forcefield available. Call engine.set_forcefield() ' 'or pass a system with a forcefield attribute.' ) pot, forces, virial = ( self._forcefield.evaluate_potential_and_force(system) ) system.vpot = pot system.force = forces # ``virial`` is harness-only (no setter on snapshot System); # writing through the bridge no-ops when there's no particles. if hasattr(system, 'virial'): system.virial = virial return pot, forces, virial
[docs] @staticmethod def _get_force(system): """Return the force array from either System type.""" return system.force
[docs] def integration_step(self, _): """Perform a single time step of the integration. Parameters ---------- - : place holder Returns ------- out : None Does not return anything, in derived classes it will typically update the given `System`. """ raise NotImplementedError
[docs] def select_thermo_function(self, thermo='full'): """Select function for calculating thermodynamic properties. Parameters ---------- thermo : string, or None, optional String which selects the kind of thermodynamic output. Returns ------- thermo_func : callable or None The function matching the requested thermodynamic output. """ thermo_func = None if thermo is not None: if thermo not in ('full', 'path'): logger.debug( 'Unknown thermo "%s" requested: Using "path"!', thermo, ) thermo = 'path' logger.debug('Thermo output for %s.integrate(): "%s"', self.__class__.__name__, thermo) if thermo == 'full': thermo_func = calculate_thermo elif thermo == 'path': thermo_func = calculate_thermo_path return thermo_func
[docs] def integrate(self, system, order_function, steps, thermo='full'): """Perform several integration steps. This method will perform several integration steps, but it will also calculate order parameter(s) if requested and energy terms. Parameters ---------- system : object like :py:class:`.System` The system we are integrating. order_function : object like :py:class:`.OrderParameter` or None An order function can be specified if we want to calculate the order parameter along with the simulation. steps : integer The number of steps we are going to perform. Note that we do not integrate on the first step (e.g. step 0) but we do obtain the other properties. This is to output the starting configuration. thermo : string, optional Select the thermodynamic properties we are to calculate. Yields ------ results : dict The result of a MD step. This contains the state of the system and also the order parameter(s) (if calculated) and the thermodynamic quantities (if calculated). """ thermo_func = self.select_thermo_function(thermo=thermo) self.order_function = order_function self._evaluate_potential_and_force(system) for i in range(steps): if i == 0: pass else: self.integration_step(system) results = {'system': system} if order_function is not None: results['order'] = self.calculate_order(system) if thermo_func: results['thermo'] = thermo_func(system) yield results
[docs] def invert_dt(self): """Invert the time step for the integration. Returns ------- out : boolean True if the time step is positive, False otherwise. """ self.timestep *= -1.0 return self.timestep > 0.0
[docs] def propagate(self, path, ens_set, system, reverse=False): """Generate a path by integrating until a criterion is met. This function will generate a path by calling the function specifying the integration step repeatedly. The integration is carried out until the order parameter has passed the specified interfaces or if we have integrated for more than a specified maximum number of steps. Parameters ---------- path : object like :py:class:`.PathBase` This is the path we use to fill in phase-space points. We are here not returning a new path, this since we want to delegate the creation of the path (type) to the method that is running `propagate`. ens_set : dict Ensemble settings. It contains: * `interfaces` : list of floats These interfaces define the stopping criterion. system : object like :py:class:`.System` The system object gives the initial state for the integration; it is mutated in place during propagation. The order function is injected on the engine (``self.order_function``). reverse : boolean, optional If True, the system will be propagated backward in time. Returns ------- success : boolean This is True if we generated an acceptable path. status : string A text description of the current status of the propagation. """ if self._is_streaming(system): return self._propagate_streaming(path, ens_set, system, reverse) status = 'Propagate w/internal engine' logger.debug(status) success = False self._evaluate_potential_and_force(system) # Make sure forces are set. left, _, right = ens_set['interfaces'] for i in range(path.maxlen): system.order = self.calculate_order(system) ekin = calculate_kinetic_energy(system)[0] system.ekin = ekin status, success, stop, _ = self.add_to_path(path, system.copy(), left, right) if stop: logger.debug('Stopping propagate at step: %i ', i) break if reverse: system.vel *= -1.0 self.integration_step(system) system.vel *= -1.0 else: self.integration_step(system) logger.debug('Propagate done: "%s" (success: %s)', status, success) return success, status
[docs] def modify_velocities(self, system, vel_settings, rgen): """Modify the velocities of the current state. This method will modify the velocities of a time slice. And it is part of the integrator since it, conceptually, fits here: we are acting on the system and modifying it. Parameters ---------- system : object like :py:class:`.System` This is the system that contains the particles we are investigating. vel_settings: dict. It contains all the info for the velocity: * `sigma_v` : numpy.array, optional Negative values select aimless shooting. Zero or positive values set the standard deviation, one for each particle, for soft velocity perturbations. * `momentum` : boolean, optional If True, we reset the linear momentum to zero after generating. * `rescale` : float, optional In some NVE simulations, we may wish to re-scale the energy to a fixed value. If `rescale` is a float > 0, we will re-scale the energy (after modification of the velocities) to match the given float. rgen : object like :py:class:`.RandomGenerator` The velocity-generation random generator, injected per call. It is kept distinct from the integration ``self.rgen`` that stochastic integrators (Langevin, random walk) own, so a shooting-move velocity kick never perturbs the dynamics random stream. Returns ------- dek : float The change in the kinetic energy. kin_new : float The new kinetic energy. """ if self._is_streaming(system): return self._modify_velocities_streaming( system, vel_settings, rgen ) rescale = vel_settings.get('rescale') if rescale is not None and rescale is not False: if rescale > 0: kin_old = rescale - system.vpot do_rescale = True else: logger.warning('Ignored re-scale 6.2%f < 0.0.', rescale) return 0.0, calculate_kinetic_energy(system)[0] else: kin_old = calculate_kinetic_energy(system)[0] do_rescale = False if is_aimless_velocity_setting(vel_settings): vel, _ = rgen.draw_maxwellian_velocities(system) system.vel = vel else: dvel, _ = rgen.draw_maxwellian_velocities( system, sigma_v=vel_settings['sigma_v']) system.vel = system.vel + dvel if vel_settings.get('momentum', False): reset_momentum(system) if do_rescale: rescale_system_velocities(system, rescale, forcefield=system.forcefield) kin_new = calculate_kinetic_energy(system)[0] dek = kin_new - kin_old return dek, kin_new
[docs] def __call__(self, system): """To allow calling `MDEngine(system)`. Here, we are just calling `self.integration_step(system)`. Parameters ---------- system : object like :py:class:`.System` The system we are integrating. Returns ------- out : None Does not return anything, but will update the particles. """ return self.integration_step(system)
[docs] def calculate_order(self, system, xyz=None, vel=None, box=None): """Return the order parameter. This method is just to help to calculate the order parameter in cases where only the engine can do it. Parameters ---------- system : object like :py:class:`.System` This is the system that contains the particles we are investigating. The order function is injected on the engine (``self.order_function``). xyz : numpy.array, optional The positions to use. Typically for internal engines, this is not needed. It is included here as it can be used for testing and also to be compatible with the generic function defined by the parent. vel : numpy.array, optional The velocities to use. box : numpy.array, optional The current box vectors. Returns ------- out : list of floats The calculated order parameter(s). """ order_function = self.order_function if self._is_streaming(system) and xyz is None: return self._calculate_order_streaming(system) if any((xyz is None, vel is None, box is None)): return order_function.calculate(system) system.pos = xyz system.vel = vel system.box.update_size(box) return order_function.calculate(system)
[docs] def kick_across_middle(self, system, ens_set, middle, tis_settings, rgen): """Force a phase point across the middle interface. This is accomplished by repeatedly kicking the phase point so that it crosses the middle interface. Parameters ---------- system : object like :py:class:`.System` The working state we repeatedly kick and integrate. The order function is injected on the engine (``self.order_function``). ens_set : dict Ensemble settings (kept for interface symmetry with the other explicit-convention methods). middle : float This is the value for the middle interface. tis_settings : dict This dictionary contains settings for TIS. Explicitly used here: * `zero_momentum`: boolean, determines if the momentum is zeroed. * `rescale_energy`: boolean, determines if energy is re-scaled. rgen : object like :py:class:`.RandomGenerator` The velocity-generation random generator, injected per call (distinct from the integration ``self.rgen``). Returns ------- out[0] : object like :py:class:`.System` The phase-point just before the interface. out[1] : object like :py:class:`.System` The phase-point just after the interface. Note ---- This function will update the system state. """ # We search for crossing with the middle interface and do this # by sequentially kicking the initial phase point: previous = system.copy() self._evaluate_potential_and_force(system) # Make sure forces are set. curr = self.calculate_order(system)[0] logger.info('Kicking from: %9.6f', curr) while True: # Save current state: previous = system.copy() # Modify velocities: self.modify_velocities( system, {'sigma_v': -1, 'momentum': tis_settings['zero_momentum'], 'rescale': tis_settings['rescale_energy']}, rgen) # Update order parameter in case it is velocity dependent: order = self.calculate_order(system) previous.order = order curr = order[0] # Store modified velocities: previous.vel = system.vel.copy() # Integrate forward one step: self.integration_step(system) # Compare previous order parameter and the new one: prev = curr order = self.calculate_order(system) curr = order[0] if curr == middle: # By construction we want two points, one left and one # right of the interface, and these two points should # be connected by a MD step. If we hit exactly on the # interface we just fall back: system.particles = previous.particles.copy() order = previous.order curr = order[0] else: if (prev <= middle < curr) or (curr < middle <= prev): # Middle interface was crossed, just stop the loop. logger.info('Crossing found: %9.6f %9.6f ', prev, curr) break if (prev <= curr < middle) or (middle < curr <= prev): # We are getting closer, keep the new point. pass else: # We did not get closer, fall back to previous point. system.particles = previous.particles.copy() order = previous.order curr = order[0] system.order = order return previous, system
[docs] def dump_phasepoint(self, phasepoint, deffnm=None): """Dump a phase point to a file. In the native in-memory loop the phase points carry their own particles and there is nothing to dump, so this is a no-op (kept for compatibility with the external integrators). In the streaming (file-backed) loop the phase point is a snapshot whose ``config`` points into a trajectory file; we extract that single frame to its own file and re-point the phase point at it, exactly as the external base does. """ if phasepoint.particles is not None: return name = deffnm if deffnm is not None else 'conf' pos_file = self._dump_config_streaming(phasepoint.config, name) phasepoint.set_pos((pos_file, 0))
[docs] def clean_up(self): """Clean up after using the engine. For the native in-memory loop there is nothing to clean (no files are produced and no streaming template is attached), so this is a no-op -- byte-identical to the historical behaviour. Only when the engine runs in the streaming (file-backed) loop (a template was attached by :py:meth:`setup_streaming`) and the scheduler has handed it a private ``exe_dir`` do we remove the working files there so a re-used engine starts from a clean directory. """ if self._stream_system is None: return exe_dir = self.exe_dir if exe_dir is None or not os.path.isdir(exe_dir): return for item in os.scandir(exe_dir): if item.is_file(): try: os.remove(item.path) except OSError: logger.debug('Could not remove: %s', item.path)
# ------------------------------------------------------------------ # Streaming (file-backed) mode. # # These methods give the internal integrator the same coordinator # interface the external engines (e.g. turtlemd) expose, so a native # config can run inside the infinite-swap coordinator's worker loop. # They are ADDITIVE: every method above checks ``_is_streaming`` and # only enters this path when handed a file-backed snapshot System # (``system.particles is None``). The native in-memory loop never # attaches a streaming template, so ``_is_streaming`` is always # False there and the original code runs byte-identically. # ------------------------------------------------------------------
[docs] def setup_streaming(self, settings): """Build the file-backed (streaming) template for this engine. Mirrors :py:class:`.TurtleMDEngine`, which builds its own box / particles / potential in ``__init__``: here we reuse the native :func:`pyretis.setup.createsystem.create_system` and :func:`pyretis.setup.createforcefield.create_force_field` to build a fully-attached in-memory ``System`` template (particles, box, masses, force field). Each streaming propagation materialises a fresh working copy of this template, fills in the positions and velocities read from disk, runs the existing in-memory integrator, and writes the produced frames back out as an xyz trajectory. Parameters ---------- settings : dict The full (coordinator) settings dictionary. It carries the native ``[system]`` / ``[box]`` / ``[particles]`` / ``[potential]`` / ``[forcefield]`` / ``[engine]`` sections the native flow uses to build the system and force field. """ # Local imports: the setup package imports the engines package, so # importing it at module top would create an import cycle. # pylint: disable=import-outside-toplevel from pyretis.setup.createsystem import create_system from pyretis.setup.createforcefield import create_force_field from pyretis.core.units import units_from_settings # Each worker is a fresh process: the unit-conversion tables the # native flow registers via ``units_from_settings`` are not set # there yet, so register them before reading the initial # configuration (otherwise the file-vs-system unit conversion has # no entry, e.g. ('A', 'reduced')). units_from_settings(settings) template = create_system(settings) template.forcefield = create_force_field(settings) self._forcefield = template.forcefield self._stream_system = template self._stream_names = list( settings.get('particles', {}).get('name', ['X']) ) # A streaming engine may carry its own ``subcycles`` from the # engine settings; default to 1 (record every integration step) # so the time-per-step matches the native loop. self.subcycles = int(settings.get('engine', {}).get('subcycles', 1))
[docs] def _is_streaming(self, system): """Return True when ``system`` is a file-backed snapshot. The native in-memory loop always hands the engine a ``System`` with particles attached; the coordinator hands it a snapshot whose ``particles`` is ``None`` and whose ``config`` points at a trajectory file. Streaming additionally requires the engine to have a template (set by :py:meth:`setup_streaming`). """ return ( self._stream_system is not None and getattr(system, 'particles', None) is None )
[docs] def _ensure_dynamics_rgen(self): """Wrap a raw numpy dynamics rgen in the pyretis generator API. Stochastic integrators (Langevin / random walk) draw their dynamics noise from ``self.rgen`` using the pyretis generator surface (notably ``multivariate_normal(..., cho=...)``). The infinite-swap scheduler, however, injects a raw :class:`numpy.random.Generator` onto ``engine.rgen`` per move. To keep the dynamics draws working -- and deterministic -- wrap that raw generator in :class:`.PCG64Generator`, which exposes the pyretis surface while drawing from the very same (deterministic) underlying stream. Engines without ``self.rgen`` (Verlet / velocity-Verlet) and already-wrapped generators are left untouched. """ # Local import avoids any import-time coupling to the generator # module beyond what the engine already needs. from numpy.random import Generator # pylint: disable=C0415 from pyretis.core.random_gen import ( # pylint: disable=C0415 PCG64Generator, ) rgen = getattr(self, 'rgen', None) if isinstance(rgen, Generator): self.rgen = PCG64Generator(generator=rgen)
[docs] def _fresh_stream_system(self, pos, vel): """Return a working in-memory System seeded with pos/vel. Copies the streaming template (so the box, masses, particle names and force field are all in place) and writes the given positions and velocities into it. The template's own state is never mutated. """ work = self._stream_system.copy() dim = work.pos.shape[1] work.pos = np.array(pos[:, :dim], copy=True) work.vel = np.array(vel[:, :dim], copy=True) return work
[docs] def _materialize_system(self, system): """Read the frame ``system.config`` points at into a System. Returns the working in-memory ``System`` plus the raw 3-column ``pos`` / ``vel`` / ``box`` arrays read from disk (kept so the produced frames are written back at full xyz width). """ from pyretis.engines._parts import ( # pylint: disable=C0415 convert_snapshot, read_xyz_file, ) filename, idx = system.config target = idx if idx is not None else 0 for i, snapshot in enumerate(read_xyz_file(filename)): if i == target: box, xyz, vel, _ = convert_snapshot(snapshot) work = self._fresh_stream_system(xyz, vel) if system.vel_rev: work.vel = -1.0 * work.vel return work, xyz, vel, box raise ValueError( f'Could not read frame {target} from "{filename}"' )
[docs] def _calculate_order_streaming(self, system): """Order parameter for a file-backed snapshot System.""" work, _, _, _ = self._materialize_system(system) return self.order_function.calculate(work)
[docs] def streaming_energies(self, systems): """Recompute ``(vpot, ekin)`` for file-backed snapshot systems. The potential energy is a function of the positions and the kinetic energy a function of the velocities, both stored in each system's trajectory frame, so the energy terms are recovered exactly by re-reading the frame and evaluating this engine's force field. This restores the per-frame energies on paths rebuilt from disk (initial paths, swaps, and the reload performed by :py:meth:`pyretis.core.pathstorage.PathStorage.output`), whose snapshot phase points carry positions and velocities but no in-memory energy. Each distinct trajectory file is read once. Parameters ---------- systems : iterable of objects like :py:class:`.System` File-backed snapshot systems: each ``config`` is a ``(filename, frame_index)`` pair and ``particles`` is None. Returns ------- list of tuple of float One ``(vpot, ekin)`` pair per input system, in input order. """ from pyretis.engines._parts import ( # pylint: disable=C0415 convert_snapshot, read_xyz_file, ) frames_cache = {} energies = [] for system in systems: filename, idx = system.config target = idx if idx is not None else 0 if filename not in frames_cache: frames_cache[filename] = list(read_xyz_file(filename)) _, xyz, vel, _ = convert_snapshot(frames_cache[filename][target]) work = self._fresh_stream_system(xyz, vel) if system.vel_rev: work.vel = -1.0 * work.vel self._evaluate_potential_and_force(work) ekin = calculate_kinetic_energy(work)[0] energies.append((float(work.vpot), float(ekin))) return energies
[docs] def _dump_config_streaming(self, config, deffnm): """Extract a single frame from a trajectory into its own file.""" from pyretis.engines._parts import ( # pylint: disable=C0415 convert_snapshot, read_xyz_file, write_xyz_trajectory, ) filename, idx = config target = idx if idx is not None else 0 out_file = os.path.join(self.exe_dir, f'{deffnm}.xyz') if idx is None and filename == out_file: return out_file for i, snapshot in enumerate(read_xyz_file(filename)): if i == target: box, xyz, vel, names = convert_snapshot(snapshot) write_xyz_trajectory( out_file, xyz, vel, names, box, append=False ) return out_file raise ValueError( f'Could not extract frame {target} from "{filename}"' )
[docs] @staticmethod def _draw_maxwellian(system, rgen, sigma_v=None): """Draw Maxwellian velocities for ``system`` using ``rgen``. Replicates :py:meth:`.RandomGenerator.draw_maxwellian_velocities` exactly (same ``sigma_v`` from ``beta``/``imass`` and the same ``normal`` draw) but accepts the raw :class:`numpy.random.Generator` the infinite-swap scheduler injects as the velocity-generation random generator (``pens['rgen-eng']``), which does not carry the pyretis ``draw_maxwellian_velocities`` method. """ if sigma_v is None or np.all(np.asarray(sigma_v) < 0.0): kbt = 1.0 / system.temperature['beta'] sigma_v = np.sqrt(kbt * system.imass) npart, dim = system.vel.shape vel = rgen.normal(loc=0.0, scale=sigma_v, size=(npart, dim)) return vel, sigma_v
[docs] def _modify_velocities_streaming(self, system, vel_settings, rgen): """Draw shooting velocities for a file-backed snapshot System. Mirrors :py:meth:`.TurtleMDEngine.modify_velocities`: read the current frame, draw new velocities (or perturb them) with the injected random generator, write a ``genvel.xyz`` file and point the system at it. The kinetic-energy change is returned. """ from pyretis.engines._parts import ( # pylint: disable=C0415 convert_snapshot, read_xyz_file, write_xyz_trajectory, ) from pyretis.core.particlefunctions import ( # pylint: disable=C0415 calculate_kinetic_energy, ) filename, idx = system.config target = idx if idx is not None else 0 snapshot = None for i, snap in enumerate(read_xyz_file(filename)): if i == target: snapshot = snap break if snapshot is None: raise ValueError( f'Could not read frame {target} from "{filename}"' ) box, xyz, vel, names = convert_snapshot(snapshot) work = self._fresh_stream_system(xyz, vel) self._evaluate_potential_and_force(work) rescale = vel_settings.get('rescale') if rescale is not None and rescale is not False and rescale > 0: kin_old = rescale - work.vpot do_rescale = True else: kin_old = calculate_kinetic_energy(work)[0] do_rescale = False if is_aimless_velocity_setting(vel_settings): new_vel, _ = self._draw_maxwellian(work, rgen) work.vel = new_vel else: dvel, _ = self._draw_maxwellian( work, rgen, sigma_v=vel_settings['sigma_v']) work.vel = work.vel + dvel if vel_settings.get('momentum', False): reset_momentum(work) if do_rescale: rescale_system_velocities(work, rescale, forcefield=work.forcefield) kin_new = calculate_kinetic_energy(work)[0] dek = kin_new - kin_old # Write the kicked frame back at full xyz width. dim = work.pos.shape[1] out_pos = np.array(xyz, copy=True) out_vel = np.array(vel, copy=True) out_pos[:, :dim] = work.pos out_vel[:, :dim] = work.vel conf_out = os.path.join(self.exe_dir, 'genvel.xyz') write_xyz_trajectory( conf_out, out_pos, out_vel, names, box, append=False ) # The freshly kicked configuration is stored unreversed, so clear # any inherited velocity-reversal flag on the snapshot. system.config = (conf_out, 0) system.vel_rev = False system.ekin = kin_new return dek, kin_new
[docs] def _propagate_streaming(self, path, ens_set, system, reverse): """Propagate a file-backed snapshot System (coordinator loop). Counterpart of :py:meth:`.TurtleMDEngine._propagate_from` for the internal integrators: materialise the starting frame, run the in-memory integrator forward (or backward) one recorded frame at a time, write each frame to an xyz trajectory, and build snapshot ``System`` objects pointing at it. ``self.steps`` is accumulated so the scheduler's subcycle accounting matches the external engines. """ from pyretis.engines._parts import ( # pylint: disable=C0415 write_xyz_trajectory, ) from pyretis.core.common import counter # pylint: disable=C0415 status = f'Propagate streaming w/internal engine (reverse={reverse})' logger.debug(status) success = False self._ensure_dynamics_rgen() left, _, right = ens_set['interfaces'] work, raw_pos, raw_vel, box = self._materialize_system(system) self._evaluate_potential_and_force(work) dim = work.pos.shape[1] prefix = str(counter()) traj_file = os.path.join(self.exe_dir, f'{prefix}_internal.xyz') for i in range(path.maxlen): order = self.order_function.calculate(work) ekin = calculate_kinetic_energy(work)[0] work.ekin = ekin # Write this frame to the trajectory (full xyz width). out_pos = np.array(raw_pos, copy=True) out_vel = np.array(raw_vel, copy=True) out_pos[:, :dim] = work.pos # Frames are stored unreversed; the snapshot records the # velocity direction via ``vel_rev`` so a re-read flips them. out_vel[:, :dim] = -work.vel if reverse else work.vel write_xyz_trajectory( traj_file, out_pos, out_vel, self._stream_names, box, step=i, append=True ) snapshot = { 'order': order, 'config': (traj_file, i), 'vel_rev': reverse, 'ekin': ekin, 'vpot': work.vpot, } phase_point = self.snapshot_to_system(system, snapshot) phase_point.order = order status, success, stop, _ = self.add_to_path( path, phase_point, left, right ) if stop: logger.debug('Streaming propagate stop at step %i', i) break for _ in range(self.subcycles): if reverse: work.vel *= -1.0 self.integration_step(work) work.vel *= -1.0 else: self.integration_step(work) self.steps += 1 logger.debug('Streaming propagate done: "%s" (success: %s)', status, success) return success, status
[docs]class Verlet(MDEngine): """The Verlet MD integrator. This class defines the Verlet MD integrator. Attributes ---------- half_idt : float Half of the inverse time step: `0.5 / timestep`. timestepsq : float Squared time step: `timestep**2`. previous_pos : numpy.array Stores the previous positions of the particles. """
[docs] def __init__(self, timestep): """Set up the Verlet MD integrator. Parameters ---------- timestep : float The time step in internal units. """ super().__init__(timestep, 'Verlet MD integrator', dynamics='NVE') self.half_idt = 0.5 / self.timestep self.timestepsq = self.timestep**2 self.previous_pos = None
[docs] def set_initial_positions(self, particles): """Get initial positions for the Verlet integration. Parameters ---------- particles : object like :py:class:`.Particles` The initial configuration. Positions and velocities are required. """ self.previous_pos = particles.pos - particles.vel * self.timestep
[docs] def integration_step(self, system): """Perform one Verlet integration step. Parameters ---------- system : object like :py:class:`.System` The system to integrate/act on. Assumed to have a particle list in ``system.particles``. Returns ------- out : None Does not return anything, but alters the state of the given system. """ acc = self._get_force(system) * system.imass pos = 2.0 * system.pos - self.previous_pos + acc * self.timestepsq system.vel = (pos - self.previous_pos) * self.half_idt self.previous_pos, system.pos = system.pos, pos self._evaluate_potential_and_force(system)
[docs] def restart_info(self): """Return restart info. The restart info for the engine Verlet. Returns ------- info : dict, Contains all the updated simulation settings and counters. """ info = super().restart_info() info['class'] = 'Verlet' for attr, value in self.__dict__.items(): # ``order_function`` is transient injected state (restored # separately from the ensemble settings), not durable engine # config, so it is excluded from the restart info. if attr in ('_forcefield', 'order_function'): continue info[attr] = value return info
[docs] def load_restart_info(self, info): """Load restart information. Parameters ---------- info : dict The dictionary with the restart information, should be similar to the dict produced by :py:func:`.restart_info`. """ super().load_restart_info(info) self.timestep = info.get('timestep') self.half_idt = info.get('half_idt') self.timestepsq = info.get('timestepsq') self.previous_pos = info.get('previous_pos')
[docs]class RandomWalk(MDEngine): """A Random Walker integrator. This class defines a Random walker integrator. Attributes ---------- timestep : float The length of the step. """
[docs] def __init__(self, timestep, rgen=None, seed=0): """Set up the Random walker integrator. Parameters ---------- timestep : float The time step in internal units. rgen : string, optional This string can be used to pick a particular random generator, which is useful for testing. seed : integer, optional A seed for the random generator. """ super().__init__(timestep, 'Random Walker integrator', dynamics='NoNe') rgen_settings = {'seed': seed, 'rgen': rgen} self.rgen = create_random_generator(rgen_settings)
[docs] def integration_step(self, system): """Random Walker integration, one time step. Parameters ---------- system : object like :py:class:`.System` The system to integrate/act on. Assumed to have a particle list in ``system.particles``. Returns ------- out : None Does not return anything, but alters the state of the given `system`. """ system.vel, _ = self.rgen.draw_maxwellian_velocities(system) system.pos += self.timestep * system.vel
[docs] def restart_info(self): """Return restart info for the RandomWalk engine. The integration random generator (``rgen``) is persisted so that a restart -- including a re-run of an interrupted shot -- resumes the exact same stochastic stream (bit-identical dynamics). Returns ------- info : dict Contains all the updated simulation settings and counters. """ info = super().restart_info() info['class'] = 'RandomWalk' for attr, value in self.__dict__.items(): # ``order_function`` is transient injected state, excluded # from the restart info (see Langevin.restart_info). if attr in ('_forcefield', 'order_function'): continue if attr == 'rgen': info[attr] = value.get_state() else: info[attr] = value return info
[docs] def load_restart_info(self, info): """Load restart information for the RandomWalk engine. Parameters ---------- info : dict The dictionary with the restart information, should be similar to the dict produced by :py:func:`.restart_info`. """ super().load_restart_info(info) self.timestep = info.get('timestep') self.dynamics = info.get('dynamics') if 'rgen' in info: self.rgen = create_random_generator(info['rgen'])
[docs]class VelocityVerlet(MDEngine): """The Velocity Verlet MD integrator. This class defines the Velocity Verlet integrator. Attributes ---------- half_timestep : float Half of the timestep. """
[docs] def __init__(self, timestep): """Set up the Velocity Verlet integrator. Parameters ---------- timestep : float The time step in internal units. """ super().__init__(timestep, 'Velocity Verlet MD integrator', dynamics='NVE') self.half_timestep = self.timestep * 0.5
[docs] def integration_step(self, system): """Velocity Verlet integration, one time step. Parameters ---------- system : object like :py:class:`.System` The system to integrate/act on. Assumed to have a particle list in ``system.particles``. Returns ------- out : None Does not return anything, but alters the state of the given `system`. """ imass = system.imass system.vel += self.half_timestep * self._get_force(system) * imass system.pos += self.timestep * system.vel self._evaluate_potential_and_force(system) system.vel += self.half_timestep * self._get_force(system) * imass
[docs] def restart_info(self): """Return some restart info. The restart info for the engine VelocityVerlet. Returns ------- info : dict, Contains all the updated simulation settings and counters. """ info = super().restart_info() info['class'] = 'VelocityVerlet' for attr, value in self.__dict__.items(): # ``order_function`` is transient injected state (restored # separately from the ensemble settings), not durable engine # config, so it is excluded from the restart info. if attr in ('_forcefield', 'order_function'): continue info[attr] = value return info
[docs] def load_restart_info(self, info): """Load restart information. Parameters ---------- info : dict The dictionary with the restart information, should be similar to the dict produced by :py:func:`.restart_info`. """ super().load_restart_info(info) self.timestep = info.get('timestep') self.half_timestep = info.get('half_timestep') self.dynamics = info.get('dynamics')
[docs]class Langevin(MDEngine): """The Langevin MD integrator. This class defines a Langevin integrator. Attributes ---------- rgen : object like :py:class:`.RandomGenerator` This is the class that handles the generation of random numbers. gamma : float The friction parameter. high_friction : boolean Determines if we are in the high friction limit and should do the over-damped version. init_params : boolean If true, we will initiate parameters for the Langevin integrator when `integrate_step` is invoked. param_high : dict This contains the parameters for the high friction limit. Here we integrate the equations of motion according to: ``r(t + dt) = r(t) + dt * f(t)/m*gamma + dr``. The items in the dict are: * `sigma` : float standard deviation for the positions, used when drawing dr. * `bddt` : numpy.array Equal to ``dt*gamma/masses``, since the masses is a numpy.array this will have the same shape. param_iner : dict This dict contains the parameters for the non-high friction limit where we integrate the equations of motion according to: ``r(t + dt) = r(t) + c1 * dt * v(t) + c2*dt*dt*a(t) + dr`` and ``v(r + dt) = c0 * v(t) + (c1-c2)*dt*a(t) + c2*dt*a(t+dt) + dv``. The dict contains: * `c0` : float Corresponds to ``c0`` in the equation above. * `a1` : float Corresponds to ``c1*dt`` in the equation above. * `a2` : numpy.array Corresponds to ``c2*dt*dt/mass`` in the equation above. Here we divide by the masses in order to use the forces rather than the acceleration. Since the masses might be different for different particles, this will result in a numpy.array with shape equal to the shape of the masses. * `b1` : numpy.array Corresponds to ``(c1-c2)*dt/mass`` in the equation above. Here we also divide by the masses, resulting in a numpy.array. * `b2` : numpy.array Corresponds to ``c2*dt/mass`` in the equation above. Here we also divide by the masses, resulting in a numpy.array. * `mean` : numpy.array (2,) The means for the bivariate Gaussian distribution. * `cov` : numpy.array (2,2) This array contains the covariance for the bivariate Gaussian distribution. `param_iner['mean']` and `param_iner['cov']` are used as parameters when drawing ``dr`` and ``dv`` from the bivariate distribution. Note ---- Currently, we are using a multi-normal distribution from numpy. Consider replacing this one as it seems somewhat slow. """
[docs] def __init__(self, timestep, gamma, rgen=None, seed=0, high_friction=False): """Set up the Langevin integrator. Actually, it is very convenient to set some variables for the different particles. However, to have a uniform initialisation for the different integrators, we postpone this. This initialisation can be done later by calling explicitly the function `self._init_parameters(system)` or it will be called the first time `self.integration_step` is invoked. Parameters ---------- timestep : float The time step in internal units. gamma : float The gamma parameter for the Langevin integrator. rgen : string, optional This string can be used to pick a particular random generator, which is useful for testing. seed : integer, optional A seed for the random generator. high_friction : boolean, optional Determines if we are in the high_friction limit and should do the over-damped version. """ super().__init__(timestep, 'Langevin MD integrator', dynamics='stochastic') self.gamma = gamma self.high_friction = high_friction rgen_settings = {'seed': seed, 'rgen': rgen} self.rgen = create_random_generator(rgen_settings) self.param_high = {'sigma': None, 'bddt': None} self.param_iner = {'c0': None, 'a1': None, 'a2': None, 'b1': None, 'b2': None, 'mean': None, 'cov': None} self.init_params = True
[docs] def _init_parameters(self, system): """Extra initialisation of the Langevin integrator. Parameters ---------- system : object like :py:class:`.System` The system to integrate/act on. Assumed to have a particle list in ``system.particles``. Returns ------- out : None Does not return anything, but updates ``self.param``. """ beta = system.temperature['beta'] imasses = system.imass if self.high_friction: self.param_high['sigma'] = np.sqrt(2.0 * self.timestep * imasses/(beta * self.gamma)) self.param_high['bddt'] = self.timestep * imasses / self.gamma else: gammadt = self.gamma * self.timestep exp_gdt = np.exp(-gammadt) if self.gamma > 0.0: c_0 = exp_gdt c_1 = (1.0 - c_0) / gammadt c_2 = (1.0 - c_1) / gammadt else: raise ValueError( f'Langevin: Found gamma = {self.gamma:6.2f} < 0.' ) self.param_iner['c0'] = c_0 self.param_iner['a1'] = c_1 * self.timestep self.param_iner['a2'] = c_2 * self.timestep**2 * imasses self.param_iner['b1'] = (c_1 - c_2) * self.timestep * imasses self.param_iner['b2'] = c_2 * self.timestep * imasses self.param_iner['mean'] = [] self.param_iner['cov'] = [] self.param_iner['cho'] = [] for imass in imasses: sig_ri2 = ((self.timestep * imass[0] / (beta * self.gamma)) * (2. - (3. - 4.*exp_gdt + exp_gdt**2) / gammadt)) sig_vi2 = (1.0 - exp_gdt**2) * imass[0] / beta cov_rvi = (imass[0]/(beta * self.gamma)) * (1.0 - exp_gdt)**2 cov_matrix = np.array([[sig_ri2, cov_rvi], [cov_rvi, sig_vi2]]) self.param_iner['cov'].append(cov_matrix) self.param_iner['cho'].append(np.linalg.cholesky(cov_matrix)) self.param_iner['mean'].append(np.zeros(2))
[docs] def integration_step(self, system): """Langevin integration, one time step. Parameters ---------- system : object like :py:class:`.System` The system to integrate/act on. Assumed to have a particle list in ``system.particles``. Returns ------- out : None Does not return anything, but alters the state of the given system. """ if self.init_params: self._init_parameters(system) self.init_params = False if self.high_friction: return self.integration_step_overdamped(system) return self.integration_step_inertia(system)
[docs] def integration_step_overdamped(self, system): """Over damped Langevin integration, one time step. Parameters ---------- system : object like :py:class:`.System` The system to integrate/act on. Assumed to have a particle list in ``system.particles``. Returns ------- out : None Does not return anything, but alters the state of the given `system`. """ self._evaluate_potential_and_force(system) rands = self.rgen.normal(loc=0.0, scale=self.param_high['sigma'], size=system.vel.shape) system.pos += self.param_high['bddt'] * self._get_force(system) + rands system.vel = rands
[docs] def integration_step_inertia(self, system): """Langevin integration, one time step. Parameters ---------- system : object like :py:class:`.System` The system to integrate/act on. Assumed to have a particle list in ``system.particles``. Returns ------- out : None Does not return anything, but alters the state of the given `system`. """ pos_rand = np.zeros(system.pos.shape) vel_rand = np.zeros(system.vel.shape) if self.gamma > 0.0: mean, cov = self.param_iner['mean'], self.param_iner['cov'] cho = self.param_iner['cho'] for i, (meani, covi, choi) in enumerate( zip(mean, cov, cho, strict=True)): randxv = self.rgen.multivariate_normal( meani, covi, cho=choi, size=get_system_dim(system) ) pos_rand[i] = randxv[:, 0] vel_rand[i] = randxv[:, 1] force = self._get_force(system) system.pos += (self.param_iner['a1'] * system.vel + self.param_iner['a2'] * force + pos_rand) vel2 = (self.param_iner['c0'] * system.vel + self.param_iner['b1'] * force + vel_rand) self._evaluate_potential_and_force(system) system.vel = vel2 + self.param_iner['b2'] * self._get_force(system)
[docs] def restart_info(self): """Return restart info. The restart info for the engine Langevin. Returns ------- info : dict Contains all the updated simulation settings and counters. """ info = super().restart_info() info['class'] = 'Langevin' for attr, value in self.__dict__.items(): # ``order_function`` is transient injected state, excluded # from the restart info (see VelocityVerlet.restart_info). if attr in ('_forcefield', 'order_function'): continue if attr == 'rgen': info[attr] = value.get_state() else: info[attr] = value return info
[docs] def load_restart_info(self, info): """Load restart information. Parameters ---------- info : dict The dictionary with the restart information, should be similar to the dict produced by :py:func:`.restart_info`. """ super().load_restart_info(info) self.timestep = info.get('timestep') self.dynamics = info.get('dynamics') self.gamma = info.get('gamma') self.high_friction = info.get('high_friction') self.param_high = info.get('param_high') self.param_iner = info.get('param_iner') self.init_params = info.get('init_params') if 'rgen' in info: self.rgen = create_random_generator(info['rgen'])