Source code for pyretis.engines.engine

# Copyright (c) 2026, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Definition of PyRETIS engines.

This module defines the base class for the engines.

Important classes defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

EngineBase (:py:class:`.EngineBase`)
    The base class for engines.
"""
from abc import ABCMeta, abstractmethod
import logging
import os
logger = logging.getLogger(__name__)  # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())


__all__ = ['EngineBase']


[docs]class EngineBase(metaclass=ABCMeta): """ Abstract base class for engines. The engines perform molecular dynamics (or Monte Carlo) and they are assumed to act on a system. Typically they will integrate Newtons equation of motion in time for that system. Attributes ---------- description : string Short string description of the engine. Used for printing information about the integrator. exe_dir : string A directory where the engine is going to be executed. engine_type : string or None Describe the type of engine as an "internal" or "external" engine. If this is undefined, this variable is set to None. needs_order : boolean Determines if the engine needs an internal order parameter or not. If not, it is assumed that the order parameter is calculated by the engine. """ engine_type = None needs_order = True default_units = None #: The order function is injected on the engine per use (by the #: simulation caller) under the explicit/injected convention; this #: class-level default documents the attribute and gives a sane value #: before it is set. It is transient state -- excluded from restart #: info -- not durable engine config. order_function = None
[docs] def __init__(self, description): """Just add the description.""" self.description = description self._exe_dir = None
@property def time_per_step(self): """Return the simulation time between two recorded path points. This is ``timestep * subcycles`` -- the physical time per PyRETIS step, expressed in the engine's own time unit -- and is the factor that converts a path length (in steps) into a time when computing fluxes and rates. The time step belongs to the engine: each subclass sets ``self.timestep`` from its own authoritative source (the ``[engine]`` setting for engines PyRETIS drives directly, or the external program's input for engines that own their own time step). ``subcycles`` defaults to 1 when an engine does not define it. Returns ------- float or None ``timestep * subcycles``, or ``None`` if the engine has no ``timestep``. """ timestep = getattr(self, 'timestep', None) if timestep is None: return None return timestep * getattr(self, 'subcycles', 1) @property def exe_dir(self): """Return the directory we are currently using.""" return self._exe_dir @exe_dir.setter def exe_dir(self, exe_dir): """Set the directory for executing.""" self._exe_dir = exe_dir if exe_dir is not None: logger.debug('Setting exe_dir to "%s"', exe_dir) if self.engine_type == 'external' and not os.path.isdir(exe_dir): logger.warning(('"Exe dir" for "%s" is set to "%s" which does' ' not exist!'), self.description, exe_dir)
[docs] @abstractmethod def integration_step(self, system): """Perform one time step of the integration.""" return
[docs] def set_mdrun(self, md_items): """Configure per-worker, durable run state on the engine. The unified (explicit/injected) convention keeps only durable, per-worker configuration on the engine; the transient ``System``/``rgen``/order function are injected per call. The infinite-swapping scheduler calls this once per worker to point the engine at its private execution directory (and, for engines that launch an external command, its worker-unique launch command). The default handles the common case -- set ``exe_dir`` from ``md_items`` when provided; subclasses that need more (e.g. a per-worker ``mdrun`` command) override and extend it. Parameters ---------- md_items : dict Per-worker run items. The key ``exe_dir`` (when present) sets the engine execution directory. """ exe_dir = md_items.get('exe_dir') if exe_dir is not None: self.exe_dir = exe_dir
[docs] @staticmethod def add_to_path(path, phase_point, left, right): """ Add a phase point and perform some checks. This method is intended to be used by the propagate methods. Parameters ---------- path : object like :py:class:`.PathBase` The path to add to. phase_point : object like py:class:`.System` The phase point to add to the path. left : float The left interface. right : float The right interface. """ status = 'Running propagate...' success = False stop = False add = path.append(phase_point) if not add: if path.length >= path.maxlen: status = 'Max. path length exceeded' else: # pragma: no cover status = 'Could not add for unknown reason' success = False stop = True if path.phasepoints[-1].order[0] < left: status = 'Crossed left interface!' success = True stop = True elif path.phasepoints[-1].order[0] > right: status = 'Crossed right interface!' success = True stop = True if path.length == path.maxlen: status = 'Max. path length exceeded!' success = False stop = True return status, success, stop, add
[docs] @abstractmethod def propagate(self, path, ens_set, system, reverse=False): """Propagate equations of motion.""" return
[docs] @abstractmethod def modify_velocities(self, system, vel_settings, rgen): """Modify the velocities of the current state. Parameters ---------- system : object like :py:class:`.System` The state whose velocities we modify. vel_settings: dict It contains all the info for the velocity: * `sigma_v` : numpy.array, optional Negative values select aimless shooting. Zero or positive values set the standard deviation, one for each particle, for soft velocity perturbations. * `momentum` : boolean, optional If True, we reset the linear momentum to zero after generating. * `rescale` : float, optional In some NVE simulations, we may wish to re-scale the energy to a fixed value. If `rescale` is a float > 0, we will re-scale the energy (after modification of the velocities) to match the given float. rgen : object like :py:class:`.RandomGenerator` The random generator used to draw the new velocities, injected explicitly per call. It is deliberately kept distinct from any integration random generator an engine may own as ``self.rgen`` (e.g. Langevin/random-walk dynamics), so modifying velocities never perturbs the dynamics stream. Returns ------- dek : float The change in the kinetic energy. kin_new : float The new kinetic energy. """ return
[docs] @abstractmethod def calculate_order(self, system, xyz=None, vel=None, box=None): """Obtain the order parameter. The order function is injected on the engine as ``self.order_function``; ``system`` is the state to evaluate. """ return
[docs] @abstractmethod def dump_phasepoint(self, phasepoint, deffnm=None): """Dump phase point to a file.""" return
[docs] @abstractmethod def kick_across_middle(self, system, ens_set, middle, tis_settings, rgen): """Force a phase point across the middle interface. ``rgen`` is the velocity-generation random generator, injected explicitly (kept distinct from any integration ``self.rgen``). """ return
[docs] @abstractmethod def clean_up(self): """Perform clean up after using the engine.""" return
[docs] @classmethod def get_default_units(cls, settings=None): """Return the default unit system for this engine. Parameters ---------- settings : dict, optional Full simulation settings or engine settings. Engines may override this method and use the settings to resolve their default units dynamically. Returns ------- out : string or None The default unit system for the engine, or ``None`` if the engine does not define one. """ _ = settings return cls.default_units
[docs] @staticmethod def snapshot_to_system(system, snapshot): """Convert a snapshot to a system object. This uses the unified ``System`` property API (``pos``/``vel``/``vpot``/``ekin``), which routes to ``system.particles`` when a harness-style engine has attached one and otherwise stores the values directly on the snapshot-based system. It therefore works for both particle-based engines (the internal integrators) and snapshot-based engines (the external/ streaming engines, whose ``particles`` is ``None``) -- the latter previously hit ``None.pos`` via the removed ``system.particles`` attribute access. """ system_copy = system.copy() system_copy.order = snapshot.get('order', None) system_copy.pos = snapshot.get('pos', None) system_copy.vel = snapshot.get('vel', None) system_copy.vpot = snapshot.get('vpot', None) system_copy.ekin = snapshot.get('ekin', None) for external in ('config', 'vel_rev', 'top'): if hasattr(system_copy, external) and external in snapshot: setattr(system_copy, external, snapshot[external]) return system_copy
[docs] def __eq__(self, other): """Check if two engines are equal.""" if self.__class__ != other.__class__: logger.debug('%s and %s.__class__ differ', self, other) return False # ``order_function`` is transient injected state (dropped from # restart pickles), so a restored engine legitimately lacks it; # exclude it from the attribute-set comparison. if (set(self.__dict__) - {'order_function'} != set(other.__dict__) - {'order_function'}): logger.debug('%s and %s.__dict__ differ', self, other) return False for i in ['engine_type', 'needs_order', 'description', '_exe_dir', 'timestep']: if hasattr(self, i): if getattr(self, i) != getattr(other, i): logger.debug('%s for %s and %s, attributes are %s and %s', i, self, other, getattr(self, i), getattr(other, i)) return False if hasattr(self, 'rgen'): # pylint: disable=no-member if (self.rgen.__class__ != other.rgen.__class__ or set(self.rgen.__dict__) != set(other.rgen.__dict__)): logger.debug('rgen class differs') return False # pylint: disable=no-member for att1, att2 in zip(self.rgen.__dict__, other.rgen.__dict__): # pylint: disable=no-member if self.rgen.__dict__[att1] != other.rgen.__dict__[att2]: logger.debug('rgen class attribute %s and %s differs', att1, att2) return False return True
[docs] def __ne__(self, other): """Check if two engines are not equal.""" return not self == other
[docs] def __getstate__(self): """Return the engine state for pickling, minus transient fields. The order function is injected on the engine per use (the explicit/injected convention); it is not durable engine state and must not travel in restart pickles -- it is restored separately from the ensemble settings and may itself be unpicklable (e.g. an order parameter whose class was imported from a standalone file). It falls back to the class-level ``order_function`` default after unpickling and is re-injected before the engine is next used. """ state = dict(self.__dict__) state.pop('order_function', None) return state
[docs] @classmethod def can_use_order_function(cls, order_function): """Fail if the engine can't be used with an empty order parameter.""" if order_function is None and cls.needs_order: raise ValueError( 'No order parameter was defined, but the ' 'engine *does* require it.' )
[docs] def restart_info(self): """General method. Returns the info to allow an engine exact restart. Returns ------- info : dict Contains all the updated simulation settings and counters. """ info = {'description': self.description} return info
[docs] def load_restart_info(self, info=None): """Load restart information. Parameters ---------- info : dict The dictionary with the restart information, should be similar to the dict produced by :py:func:`.restart_info`. """ self.description = info.get('description')
[docs] def __str__(self): """Return the string description of the integrator.""" return self.description