# Copyright (c) 2026, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Definition of PyRETIS engines.
This module defines the base class for the engines.
Important classes defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
EngineBase (:py:class:`.EngineBase`)
The base class for engines.
"""
from abc import ABCMeta, abstractmethod
import logging
import os
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())
__all__ = ['EngineBase']
[docs]class EngineBase(metaclass=ABCMeta):
"""
Abstract base class for engines.
The engines perform molecular dynamics (or Monte Carlo) and they
are assumed to act on a system. Typically they will integrate
Newtons equation of motion in time for that system.
Attributes
----------
description : string
Short string description of the engine. Used for printing
information about the integrator.
exe_dir : string
A directory where the engine is going to be executed.
engine_type : string or None
Describe the type of engine as an "internal" or "external"
engine. If this is undefined, this variable is set to None.
needs_order : boolean
Determines if the engine needs an internal order parameter
or not. If not, it is assumed that the order parameter is
calculated by the engine.
"""
engine_type = None
needs_order = True
default_units = None
#: The order function is injected on the engine per use (by the
#: simulation caller) under the explicit/injected convention; this
#: class-level default documents the attribute and gives a sane value
#: before it is set. It is transient state -- excluded from restart
#: info -- not durable engine config.
order_function = None
[docs] def __init__(self, description):
"""Just add the description."""
self.description = description
self._exe_dir = None
@property
def time_per_step(self):
"""Return the simulation time between two recorded path points.
This is ``timestep * subcycles`` -- the physical time per PyRETIS
step, expressed in the engine's own time unit -- and is the
factor that converts a path length (in steps) into a time when
computing fluxes and rates. The time step belongs to the engine:
each subclass sets ``self.timestep`` from its own authoritative
source (the ``[engine]`` setting for engines PyRETIS drives
directly, or the external program's input for engines that own
their own time step). ``subcycles`` defaults to 1 when an engine
does not define it.
Returns
-------
float or None
``timestep * subcycles``, or ``None`` if the engine has no
``timestep``.
"""
timestep = getattr(self, 'timestep', None)
if timestep is None:
return None
return timestep * getattr(self, 'subcycles', 1)
@property
def exe_dir(self):
"""Return the directory we are currently using."""
return self._exe_dir
@exe_dir.setter
def exe_dir(self, exe_dir):
"""Set the directory for executing."""
self._exe_dir = exe_dir
if exe_dir is not None:
logger.debug('Setting exe_dir to "%s"', exe_dir)
if self.engine_type == 'external' and not os.path.isdir(exe_dir):
logger.warning(('"Exe dir" for "%s" is set to "%s" which does'
' not exist!'), self.description, exe_dir)
[docs] @abstractmethod
def integration_step(self, system):
"""Perform one time step of the integration."""
return
[docs] def set_mdrun(self, md_items):
"""Configure per-worker, durable run state on the engine.
The unified (explicit/injected) convention keeps only durable,
per-worker configuration on the engine; the transient
``System``/``rgen``/order function are injected per call. The
infinite-swapping scheduler calls this once per worker to point
the engine at its private execution directory (and, for engines
that launch an external command, its worker-unique launch
command). The default handles the common case -- set ``exe_dir``
from ``md_items`` when provided; subclasses that need more
(e.g. a per-worker ``mdrun`` command) override and extend it.
Parameters
----------
md_items : dict
Per-worker run items. The key ``exe_dir`` (when present)
sets the engine execution directory.
"""
exe_dir = md_items.get('exe_dir')
if exe_dir is not None:
self.exe_dir = exe_dir
[docs] @staticmethod
def add_to_path(path, phase_point, left, right):
"""
Add a phase point and perform some checks.
This method is intended to be used by the propagate methods.
Parameters
----------
path : object like :py:class:`.PathBase`
The path to add to.
phase_point : object like py:class:`.System`
The phase point to add to the path.
left : float
The left interface.
right : float
The right interface.
"""
status = 'Running propagate...'
success = False
stop = False
add = path.append(phase_point)
if not add:
if path.length >= path.maxlen:
status = 'Max. path length exceeded'
else: # pragma: no cover
status = 'Could not add for unknown reason'
success = False
stop = True
if path.phasepoints[-1].order[0] < left:
status = 'Crossed left interface!'
success = True
stop = True
elif path.phasepoints[-1].order[0] > right:
status = 'Crossed right interface!'
success = True
stop = True
if path.length == path.maxlen:
status = 'Max. path length exceeded!'
success = False
stop = True
return status, success, stop, add
[docs] @abstractmethod
def propagate(self, path, ens_set, system, reverse=False):
"""Propagate equations of motion."""
return
[docs] @abstractmethod
def modify_velocities(self, system, vel_settings, rgen):
"""Modify the velocities of the current state.
Parameters
----------
system : object like :py:class:`.System`
The state whose velocities we modify.
vel_settings: dict
It contains all the info for the velocity:
* `sigma_v` : numpy.array, optional
Negative values select aimless shooting. Zero or positive
values set the standard deviation, one for each particle,
for soft velocity perturbations.
* `momentum` : boolean, optional
If True, we reset the linear momentum to zero after
generating.
* `rescale` : float, optional
In some NVE simulations, we may wish to re-scale the
energy to a fixed value. If `rescale` is a float > 0,
we will re-scale the energy (after modification of
the velocities) to match the given float.
rgen : object like :py:class:`.RandomGenerator`
The random generator used to draw the new velocities,
injected explicitly per call. It is deliberately kept
distinct from any integration random generator an engine may
own as ``self.rgen`` (e.g. Langevin/random-walk dynamics), so
modifying velocities never perturbs the dynamics stream.
Returns
-------
dek : float
The change in the kinetic energy.
kin_new : float
The new kinetic energy.
"""
return
[docs] @abstractmethod
def calculate_order(self, system, xyz=None, vel=None, box=None):
"""Obtain the order parameter.
The order function is injected on the engine as
``self.order_function``; ``system`` is the state to evaluate.
"""
return
[docs] @abstractmethod
def dump_phasepoint(self, phasepoint, deffnm=None):
"""Dump phase point to a file."""
return
[docs] @abstractmethod
def kick_across_middle(self, system, ens_set, middle,
tis_settings, rgen):
"""Force a phase point across the middle interface.
``rgen`` is the velocity-generation random generator, injected
explicitly (kept distinct from any integration ``self.rgen``).
"""
return
[docs] @abstractmethod
def clean_up(self):
"""Perform clean up after using the engine."""
return
[docs] @classmethod
def get_default_units(cls, settings=None):
"""Return the default unit system for this engine.
Parameters
----------
settings : dict, optional
Full simulation settings or engine settings. Engines may
override this method and use the settings to resolve their
default units dynamically.
Returns
-------
out : string or None
The default unit system for the engine, or ``None`` if the
engine does not define one.
"""
_ = settings
return cls.default_units
[docs] @staticmethod
def snapshot_to_system(system, snapshot):
"""Convert a snapshot to a system object.
This uses the unified ``System`` property API
(``pos``/``vel``/``vpot``/``ekin``), which routes to
``system.particles`` when a harness-style engine has attached one
and otherwise stores the values directly on the snapshot-based
system. It therefore works for both particle-based engines (the
internal integrators) and snapshot-based engines (the external/
streaming engines, whose ``particles`` is ``None``) -- the latter
previously hit ``None.pos`` via the removed ``system.particles``
attribute access.
"""
system_copy = system.copy()
system_copy.order = snapshot.get('order', None)
system_copy.pos = snapshot.get('pos', None)
system_copy.vel = snapshot.get('vel', None)
system_copy.vpot = snapshot.get('vpot', None)
system_copy.ekin = snapshot.get('ekin', None)
for external in ('config', 'vel_rev', 'top'):
if hasattr(system_copy, external) and external in snapshot:
setattr(system_copy, external, snapshot[external])
return system_copy
[docs] def __eq__(self, other):
"""Check if two engines are equal."""
if self.__class__ != other.__class__:
logger.debug('%s and %s.__class__ differ', self, other)
return False
# ``order_function`` is transient injected state (dropped from
# restart pickles), so a restored engine legitimately lacks it;
# exclude it from the attribute-set comparison.
if (set(self.__dict__) - {'order_function'}
!= set(other.__dict__) - {'order_function'}):
logger.debug('%s and %s.__dict__ differ', self, other)
return False
for i in ['engine_type', 'needs_order',
'description', '_exe_dir', 'timestep']:
if hasattr(self, i):
if getattr(self, i) != getattr(other, i):
logger.debug('%s for %s and %s, attributes are %s and %s',
i, self, other,
getattr(self, i), getattr(other, i))
return False
if hasattr(self, 'rgen'):
# pylint: disable=no-member
if (self.rgen.__class__ != other.rgen.__class__
or set(self.rgen.__dict__) != set(other.rgen.__dict__)):
logger.debug('rgen class differs')
return False
# pylint: disable=no-member
for att1, att2 in zip(self.rgen.__dict__, other.rgen.__dict__):
# pylint: disable=no-member
if self.rgen.__dict__[att1] != other.rgen.__dict__[att2]:
logger.debug('rgen class attribute %s and %s differs',
att1, att2)
return False
return True
[docs] def __ne__(self, other):
"""Check if two engines are not equal."""
return not self == other
[docs] def __getstate__(self):
"""Return the engine state for pickling, minus transient fields.
The order function is injected on the engine per use (the
explicit/injected convention); it is not durable engine state
and must not travel in restart pickles -- it is restored
separately from the ensemble settings and may itself be
unpicklable (e.g. an order parameter whose class was imported
from a standalone file). It falls back to the class-level
``order_function`` default after unpickling and is re-injected
before the engine is next used.
"""
state = dict(self.__dict__)
state.pop('order_function', None)
return state
[docs] @classmethod
def can_use_order_function(cls, order_function):
"""Fail if the engine can't be used with an empty order parameter."""
if order_function is None and cls.needs_order:
raise ValueError(
'No order parameter was defined, but the '
'engine *does* require it.'
)
[docs] def restart_info(self):
"""General method.
Returns the info to allow an engine exact restart.
Returns
-------
info : dict
Contains all the updated simulation settings and counters.
"""
info = {'description': self.description}
return info
[docs] def load_restart_info(self, info=None):
"""Load restart information.
Parameters
----------
info : dict
The dictionary with the restart information, should be
similar to the dict produced by :py:func:`.restart_info`.
"""
self.description = info.get('description')
[docs] def __str__(self):
"""Return the string description of the integrator."""
return self.description