# Copyright (c) 2026, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Definition of external engines.
This module defines the base class for external MD engines.
Important classes defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
ExternalMDEngine (:py:class:`.ExternalMDEngine`)
The base class for external engines which defines the
interface to external programs.
"""
from abc import abstractmethod
import hashlib
import json
import logging
import os
import re
import subprocess # nosec B404
import shutil
import sys
import numpy as np
from pyretis.core.common import counter
from pyretis.core.systemhelp import update_system_box
from pyretis.inout.common import atomic_write
from pyretis.inout.fileio import FileIO
from pyretis.engines.engine import EngineBase
from pyretis.engines._engine_logs import (
cap_engine_log,
engine_output_files,
finalize_engine_output,
open_engine_log,
)
from pyretis.inout.formats.cp2k import read_cp2k_box
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())
__all__ = ['ExternalMDEngine']
[docs]class ExternalMDEngine(EngineBase):
"""
Base class for interfacing external MD engines.
This class defines the interface to external programs. The
interface will define how we interact with the external programs,
how we write input files for them and read output files from them.
New engines should inherit from this class and implement the
following methods:
* :py:meth:`ExternalMDEngine.step`
A method for performing a MD step with the external
engine. Note that the MD step can consist of a number
of subcycles.
* :py:meth:`ExternalMDEngine._read_configuration`
For reading output (configurations) from the external engine.
This is used for calculating the order parameter(s).
* :py:meth:`ExternalMDEngine._reverse_velocities`
For reversing velocities in a snapshot. This method
will typically make use of the method
:py:meth:`ExternalMDEngine._read_configuration`.
* :py:meth:`ExternalMDEngine._extract_frame`
For extracting a single frame from a trajectory.
* :py:meth:`ExternalMDEngine._propagate_from`
The method for propagating the equations of motion using
the external engine.
* :py:meth:`ExternalMDEngine.modify_velocities`
The method used for generating random velocities for
shooting points. Note that this method is defined in
:py:meth:`EngineBase.modify_velocities`.
Attributes
----------
description : string
A string with a description of the external engine.
This can, for instance, be what program we are interfacing with.
This is used for outputting information to the user.
timestep : float
The time step used for the external engine.
subcycles : integer
The number of steps the external step is composed of. That is:
each external step is really composed of ``subcycles`` number
of iterations.
"""
engine_type = 'external'
#: Optional cap (in bytes) on the retained ``engine.log`` stdout
#: file. The log is appended to on every external step, so over a
#: long run it can grow without bound. When this is set to a
#: positive integer, after each successful command the log is
#: trimmed to its most-recent ``engine_log_maxsize`` bytes (the
#: oldest output is dropped and a marker line is written at the
#: top), reclaiming disk immediately. ``None`` (the default) keeps
#: the historical unbounded behaviour, so existing runs are
#: unaffected. Set it on the class or an instance, e.g.
#: ``engine.engine_log_maxsize = 100 * 1024 * 1024`` for a 100 MB
#: cap. A failing command's log is never trimmed.
engine_log_maxsize = None
#: Opt-in resume of an interrupted propagation. When True, an
#: abruptly interrupted propagation (e.g. a cluster wall-time kill)
#: is *continued* on restart instead of being re-run from the
#: shooting point: the frames that survived on disk are reused and
#: the trajectory is propagated onward from the last good frame.
#: The continued segment is a new, valid path (the integrator state
#: is restarted from a configuration, so it is not bit-identical to
#: an uninterrupted run); such paths are flagged via
#: ``path.resumed``. ``False`` (the default) keeps the historical
#: behaviour exactly, so existing runs are byte-for-byte unaffected.
continue_interrupted = False
[docs] def __init__(self, description, timestep, subcycles):
"""
Set up the external engine.
Here we just set up some common properties which are useful
for the execution.
Parameters
----------
description : string
A string with a description of the external engine.
This can, for instance, be what program we are interfacing
with. This is used for outputting information to the user.
timestep : float
The time step used in the simulation.
subcycles : integer
The number of sub-cycles each external integration step is
composed of.
"""
super().__init__(description)
self.timestep = timestep
self.subcycles = subcycles
self.ext = 'xyz'
self.input_files = {}
[docs] def integration_step(self, system):
"""
Perform a single time step of the integration.
For external engines, it does not make much sense to run single
steps unless we absolutely have to. We therefore just fail here.
I.e. the external engines are not intended for performing pure
MD simulations.
If it's absolutely needed, there is a :py:meth:`self.step()`
method which can be used, for instance in the initialisation.
Parameters
----------
system : object like :py:class:`.System`
A system to run the integration step on.
"""
msg = 'External engine does **NOT** support "integration_step()"!'
logger.error(msg)
raise NotImplementedError(msg)
[docs] @abstractmethod
def step(self, system, name):
"""Perform a single step with the external engine.
Parameters
----------
system : object like :py:class:`.System`
The system we are integrating.
name : string
To name the output files from the external engine.
Returns
-------
out : string
The name of the output configuration, obtained after
completing the step.
"""
return
[docs] @abstractmethod
def _read_configuration(self, filename):
"""Read output configuration from external software.
Parameters
----------
filename : string
The file to open and read a configuration from.
Returns
-------
out[0] : numpy.array
The dimensions of the simulation box.
out[1] : numpy.array
The positions found in the given filename.
out[2] : numpy.array
The velocities found in the given filename.
"""
return
[docs] @abstractmethod
def _reverse_velocities(self, filename, outfile):
"""Reverse velocities in a given snapshot.
Parameters
----------
filename : string
Input file with velocities.
outfile : string
File to write with reversed velocities.
"""
return
[docs] def execute_command(self, cmd, cwd=None, inputs=None):
"""
Execute an external command for the engine.
We are here executing a command and then waiting until it
finishes. The standard out and standard error are piped to
files during the execution and can be inspected if the
command fails. This method returns the return code of the
issued command.
Parameters
----------
cmd : list of strings
The command to execute.
cwd : string or None, optional
The current working directory to set for the command.
inputs : bytes or None, optional
Additional inputs to give to the command. These are not
arguments but more akin to keystrokes etc. that the external
command may take.
Returns
-------
out : int
The return code of the issued command.
"""
cmd2 = ' '.join(cmd)
logger.debug('Executing: %s', cmd2)
if inputs is not None:
logger.debug('With input: %s', inputs)
out_name, err_name = self._engine_output_files(cwd or self.exe_dir)
return_code = None
with self._open_engine_log(out_name) as fout, \
self._open_engine_log(err_name) as ferr:
# Check if we are running a python script:
if cmd[0].endswith('.py'):
cmd = [sys.executable] + cmd
exe = subprocess.Popen( # nosec B603 B607
cmd,
stdin=subprocess.PIPE,
stdout=fout,
stderr=ferr,
shell=False,
cwd=cwd
)
exe.communicate(input=inputs)
# Note: communicate will wait until process terminates.
return_code = exe.returncode
if return_code != 0:
logger.error('Execution of external program (%s) failed!',
self.description)
logger.error('Attempted command: %s', cmd2)
logger.error('Execution directory: %s', cwd)
if inputs is not None:
logger.error('Input to external program was: %s', inputs)
logger.error('Return code from external program: %i',
return_code)
logger.error('Engine stdout/stderr: %s, %s', out_name,
err_name)
msg = (f'Execution of external program ({self.description}) '
f'failed with command:\n {cmd2}.\n'
f'Return code: {return_code}')
raise RuntimeError(msg)
self._finalize_engine_output(return_code, err_name)
# The command succeeded (a failure raises above and keeps the
# full log); cap the retained stdout log if a size limit is set.
self._cap_engine_log(out_name, self.engine_log_maxsize)
return return_code
[docs] @staticmethod
def _cap_engine_log(path, maxsize):
"""Trim an engine log to its most-recent ``maxsize`` bytes.
Thin wrapper around
:func:`pyretis.engines._engine_logs.cap_engine_log`.
"""
cap_engine_log(path, maxsize)
[docs] @staticmethod
def _engine_output_files(log_dir):
"""Return the engine stdout/stderr log paths for a run directory.
Thin wrapper around
:func:`pyretis.engines._engine_logs.engine_output_files`.
"""
return engine_output_files(log_dir)
[docs] @staticmethod
def _open_engine_log(path):
"""Open an engine stdout/stderr log for appending, resiliently.
Thin wrapper around
:func:`pyretis.engines._engine_logs.open_engine_log`.
"""
return open_engine_log(path)
[docs] @staticmethod
def _finalize_engine_output(return_code, err_name):
"""Keep stdout logs and discard stderr logs from successful runs.
Thin wrapper around
:func:`pyretis.engines._engine_logs.finalize_engine_output`.
"""
finalize_engine_output(return_code, err_name)
[docs] @staticmethod
def _movefile(source, dest):
"""Move file from source to destination."""
logger.debug('Moving: %s -> %s', source, dest)
shutil.move(source, dest)
[docs] @staticmethod
def _copyfile(source, dest):
"""Copy file from source to destination."""
logger.debug('Copy: %s -> %s', source, dest)
shutil.copyfile(source, dest)
[docs] @staticmethod
def _removefile(filename):
"""Remove a given file if it exist."""
try:
os.remove(filename)
logger.debug('Removing: %s', filename)
except OSError:
logger.debug('Could not remove: %s', filename)
[docs] def _remove_files(self, dirname, files):
"""Remove files from a directory.
Parameters
----------
dirname : string
Where we are removing.
files : list of strings
A list with files to remove.
"""
for i in files:
self._removefile(os.path.join(dirname, i))
[docs] def clean_up(self):
"""Will remove all files from the current directory."""
dirname = self.exe_dir
logger.debug('Running engine clean-up in "%s"', dirname)
files = [item.name for item in os.scandir(dirname) if item.is_file()]
self._remove_files(dirname, files)
[docs] @staticmethod
def draw_maxwellian_velocities(vel, mass, beta, rgen, sigma_v=None):
"""Draw velocities from a Maxwell-Boltzmann distribution.
This is the engine-independent (Python) velocity draw used when an
external engine is configured to generate shooting velocities via
``velocity_generation = "maxwell"`` instead of delegating to the
engine's own generator. The velocities are drawn from the injected
random generator so the whole shooting move stays on one
reproducible PCG64 stream.
Parameters
----------
vel : numpy.ndarray
The current velocities; only the shape ``(npart, dim)`` is
used (the values are overwritten).
mass : numpy.ndarray
The particle masses, shape ``(npart, 1)``.
beta : float
The inverse temperature ``1 / (k_B T)`` in the engine's unit
system.
rgen : object like :py:class:`.RandomGenerator`
The velocity random generator, injected explicitly. Its
``normal`` draw matches ``numpy.random.Generator.normal`` under
the canonical PCG64 generator.
sigma_v : numpy.ndarray, optional
Per-particle standard deviation. When ``None`` (or any
negative entry) it is computed from ``beta`` and ``mass``.
Returns
-------
vel : numpy.ndarray
The newly drawn velocities, shape ``(npart, dim)``.
sigma_v : numpy.ndarray
The standard deviation used for the draw.
"""
if sigma_v is None or np.any(sigma_v < 0.0):
kbt = 1.0 / beta
sigma_v = np.sqrt(kbt * (1.0 / mass))
npart, dim = vel.shape
vel = rgen.normal(loc=0.0, scale=sigma_v, size=(npart, dim))
return vel, sigma_v
[docs] def calculate_order(self, system, xyz=None, vel=None, box=None):
"""
Calculate order parameter from configuration in a file.
Note, if ``xyz``, ``vel`` or ``box`` are given, we will
**NOT** read positions, velocity and box information from the
current configuration file.
Parameters
----------
system : object like :py:class:`.System`
This is the system that contains the particles we are
investigating. The order function is injected on the engine
as ``self.order_function``.
xyz : numpy.array, optional
The positions to use, in case we have already read them
somewhere else. We will then not attempt to read them again.
vel : numpy.array, optional
The velocities to use, in case we already have read them.
box : numpy.array, optional
The current box vectors, in case we already have read them.
Returns
-------
out : list of floats
The calculated order parameter(s).
"""
order_function = self.order_function
if any((xyz is None, vel is None, box is None)):
out = self._read_configuration(system.config[0])
box = out[0]
xyz = out[1]
vel = out[2]
system.pos = xyz
if system.vel_rev:
if vel is not None:
system.vel = -1.0 * vel
else:
system.vel = vel
if box is None and self.input_files.get('template', False):
# CP2K specific box initiation:
box, _ = read_cp2k_box(self.input_files['template'])
update_system_box(system, box)
return order_function.calculate(system)
[docs] def kick_across_middle(self, system, ens_set, middle, tis_settings,
rgen):
"""
Force a phase point across the middle interface.
This is accomplished by repeatedly kicking the phase point so
that it crosses the middle interface.
Parameters
----------
system : object like :py:class:`.System`
The working state we repeatedly kick and integrate. The
order function is injected on the engine
(``self.order_function``).
ens_set : dict
Ensemble settings (unused here directly, kept for interface
symmetry with the other explicit-convention methods).
middle : float
This is the value for the middle interface.
tis_settings : dict
This dictionary contains settings for TIS. Explicitly used here:
* `zero_momentum`: boolean, determines if the momentum is zeroed.
* `rescale_energy`: boolean, determines if energy is re-scaled.
rgen : object like :py:class:`.RandomGenerator`
The velocity-generation random generator, injected per call
(distinct from any integration ``self.rgen``).
Returns
-------
out[0] : object like :py:class:`.System`
The phase-point just before the crossing the interface.
out[1] : object like :py:class:`.System`
The phase-point just after crossing the interface.
Note
----
This function will update the input system state.
"""
# ``system`` is the working state we kick and integrate;
# ``right_point`` is the copy we keep as the point just past the
# crossing and return.
right_point = system.copy()
logger.info('Kicking with external integrator: %s', self.description)
# We search for crossing with the middle interface and do this
# by sequentially kicking the initial phase point
# Let's get the starting point:
initial_file = self.dump_frame(right_point)
# Create a "previous file" for storing the state before a new kick:
prev_file = os.path.join(
self.exe_dir, f'p_{os.path.basename(initial_file)}'
)
msg_file_name = os.path.join(self.exe_dir, 'msg-kick.txt')
msg_file = FileIO(msg_file_name, 'w', None, backup=False)
msg_file.open()
msg_file.write(f'Kick initiation for {self.description}')
self._copyfile(initial_file, prev_file)
# Update so that we use the prev_file:
system.config = (prev_file, None)
logger.info('Searching for crossing with: %9.6g', middle)
logger.info('Writing progress to: %s', msg_file_name)
while True:
msg_file.write('New kick:')
# Do kick from current state:
msg_file.write('\tModify velocities...')
vel_settings = {'sigma_v': -1,
'momentum': False, 'rescale': None}
self.modify_velocities(system, vel_settings, rgen)
# Update order parameter in case it's velocity dependent:
order = self.calculate_order(system)
curr = order[0]
msg_file.write(f'\tAfter kick: {curr}')
previous = system.copy()
previous.order = order
# Update system by integrating forward:
msg_file.write('\tIntegrate forward...')
conf = self.step(system, 'kicked')
curr_file = os.path.join(self.exe_dir, conf)
# Compare previous order parameter and the new one:
prev = curr
curr = self.calculate_order(system)[0]
txt = f'{prev} -> {curr} | {middle}'
msg_file.write(f'\t{txt}')
if (prev <= middle < curr) or (curr < middle <= prev):
logger.info('Crossed middle interface: %s', txt)
msg_file.write('\tCrossed middle interface!')
# Middle interface was crossed, just stop the loop.
self._copyfile(curr_file, prev_file)
# Update file name after moving:
right_point.config = (prev_file, None)
break
if (prev <= curr < middle) or (middle < curr <= prev):
# Getting closer, keep the new point:
logger.debug('Getting closer to middle: %s', txt)
msg_file.write(
'\tGetting closer to middle, keeping new point.'
)
self._movefile(curr_file, prev_file)
# Update file name after moving:
system.config = (prev_file, None)
else: # We did not get closer, fall back to previous point:
logger.debug('Did not get closer to middle: %s', txt)
msg_file.write(
'\tDid not get closer, fall back to previous point.'
)
system = previous.copy()
curr = previous.order[0]
self._removefile(curr_file)
msg_file.flush()
msg_file.close()
self._removefile(msg_file_name)
return previous, right_point
[docs] def _frame_config_index(self, frame):
"""Map a trajectory-frame number to its configuration index.
The configuration index is what is stored in a snapshot's
``config`` tuple and later passed to :py:meth:`._extract_frame`.
For most engines it equals the trajectory-frame number; engines
that index configurations by MD time step (e.g. the streaming
LAMMPS engine, which stores ``frame * subcycles``) override this.
Parameters
----------
frame : integer
The trajectory-frame number (0-based).
Returns
-------
out : integer
The configuration index for that frame.
"""
return frame
[docs] def _trajectory_filename(self, name):
"""Return the basename of the trajectory file for a propagation.
This is the file the engine writes frames to during
:py:meth:`._propagate_from`. Engines whose trajectory format
differs from their configuration format (e.g. GROMACS writes a
``.trr`` while configurations are ``.gro``/``.g96``) override
this.
Parameters
----------
name : string
The trajectory label built in :py:meth:`.propagate`.
Returns
-------
out : string
The trajectory file basename.
"""
return f'{name}.{self.ext}'
[docs] def _resume_marker_path(self, reverse):
"""Return the path of the per-direction resume marker file."""
direction = 'trajB' if reverse else 'trajF'
return os.path.join(self.exe_dir, f'_resume_{direction}.json')
[docs] def _config_signature(self, filename):
"""Return a hash identifying a configuration's physical content.
Used to confirm that an interrupted propagation we are about to
resume really belongs to the shot we are now re-attempting: the
shooting point is regenerated deterministically on restart, so
its positions/velocities are identical. We hash the coordinates
(not the raw file bytes) so format-specific metadata such as a
title line does not affect the comparison.
"""
box, xyz, vel = self._read_configuration(filename)[:3]
hasher = hashlib.sha1(usedforsecurity=False) # nosec B324
for array in (xyz, vel, box):
if array is not None:
hasher.update(np.ascontiguousarray(array).tobytes())
return hasher.hexdigest()
[docs] def _write_resume_marker(self, reverse, traj_file, init_sig, complete):
"""Write (atomically and durably) the resume marker."""
marker = {
'traj_file': os.path.basename(traj_file),
'init_sig': init_sig,
'reverse': bool(reverse),
'complete': bool(complete),
}
atomic_write(
self._resume_marker_path(reverse),
lambda fileh: fileh.write(json.dumps(marker)),
binary=False,
)
[docs] def _remove_resume_marker(self, reverse):
"""Remove the resume marker once a propagation has completed."""
marker_path = self._resume_marker_path(reverse)
if os.path.isfile(marker_path):
os.remove(marker_path)
[docs] def _read_resume_marker(self, reverse, init_sig):
"""Return a resumable partial trajectory, or None.
A marker is resumable only when it records an *incomplete*
propagation whose initial configuration matches ``init_sig``
(same shot) and whose partial trajectory still exists on disk.
Parameters
----------
reverse : boolean
The propagation direction we are about to (re)run.
init_sig : string
Content signature of the current shot's initial config.
Returns
-------
out : string or None
The path to the partial trajectory to resume from, or None.
"""
marker_path = self._resume_marker_path(reverse)
if not os.path.isfile(marker_path):
return None
with open(marker_path, 'r', encoding='utf-8') as fileh:
marker = json.load(fileh)
if marker.get('complete', True):
return None
if marker.get('init_sig') != init_sig:
# A different shot left this marker behind; ignore it.
return None
partial = os.path.join(self.exe_dir, marker['traj_file'])
if not os.path.isfile(partial):
return None
return partial
[docs] def _recompute_partial_orders(self, partial_traj, reverse):
"""Order parameters for every readable frame, in file order.
Reads the surviving frames of an interrupted trajectory and
recomputes their order parameters (the deterministic source of
truth). A frame left broken by the interruption is tolerated:
the underlying readers stop at the last readable frame.
"""
# Local import is required: pyretis.tools.recalculate_order pulls
# in the engines package, so a module-level import here would be
# circular.
# pylint: disable=import-outside-toplevel
from pyretis.tools.recalculate_order import recalculate_order
options = {'minidx': None, 'maxidx': None, 'reverse': reverse}
if partial_traj[-4:] == '.xyz' and \
self.input_files.get('template', False):
options['box'], _ = read_cp2k_box(self.input_files['template'])
orders = list(
recalculate_order(self.order_function, partial_traj, options)
)
if reverse:
# recalculate_order reverses the list for reverse runs; undo
# that here so the orders are in trajectory-file order, which
# is the order in which _propagate_from stored the frames.
orders = list(reversed(orders))
return orders
[docs] def _prefill_partial(self, system, partial_traj, name):
"""Rebuild the surviving frames of an interrupted path.
Returns a tuple ``(phase_points, cont_conf)`` where ``phase_points``
are the surviving frames *except* the last good one (rebuilt from
the partial trajectory, with order parameters recomputed) and
``cont_conf`` is a configuration file holding the last good frame,
from which the propagation continues. The last good frame is
excluded because :py:meth:`._propagate_from` re-adds it as the
first frame of the continued segment.
The surviving frames are deliberately *not* added to the caller's
path here: the caller prepends them *after* :py:meth:`._propagate_from`
so the engine's own ``update_energies`` aligns with the continued
frames only. The surviving frames' energies are not recoverable
from the trajectory and are therefore left unset (``None``).
Returns ``(None, None)`` when there are fewer than two readable
frames (nothing worth resuming).
"""
orders = self._recompute_partial_orders(partial_traj,
reverse=system.vel_rev)
if len(orders) < 2:
return None, None
cont_idx = len(orders) - 1
phase_points = []
for i in range(cont_idx):
snapshot = {'order': orders[i],
'config': (partial_traj, self._frame_config_index(i)),
'vel_rev': system.vel_rev}
phase_points.append(self.snapshot_to_system(system, snapshot))
cont_conf = os.path.join(self.exe_dir, f'resume_{name}.{self.ext}')
self._extract_frame(partial_traj,
self._frame_config_index(cont_idx), cont_conf)
return phase_points, cont_conf
[docs] def propagate(self, path, ens_set, system, reverse=False):
"""
Propagate the equations of motion with the external code.
This method will explicitly do the common set-up, before
calling more specialised code for doing the actual propagation.
Parameters
----------
path : object like :py:class:`.PathBase`
This is the path we use to fill in phase-space points.
We are here not returning a new path - this since we want
to delegate the creation of the path to the method
that is running `propagate`.
ens_set : dict
Ensemble settings. It contains:
* `interfaces` : list of floats
These interfaces define the stopping criterion.
* `path_ensemble` : optional, used to build the working-file
name prefix when present.
system : object like :py:class:`.System`
The system object gives the initial state for the
integration. It is mutated in place (configuration file and
velocity-reversal flag) during propagation; the caller owns
the lifecycle and passes a fresh copy per call. The order
function and random generator are injected on the engine
(``self.order_function`` / ``self.rgen``).
reverse : boolean, optional
If True, the system will be propagated backward in time.
Returns
-------
success : boolean
This is True if we generated an acceptable path.
status : string
A text description of the current status of the propagation.
"""
logger.debug('Running propagate with: "%s"', self.description)
prefix = str(counter())
if ens_set.get('path_ensemble', False):
prefix = ens_set['path_ensemble'].ensemble_name_simple + '_' \
+ prefix
if reverse:
logger.debug('Running backward in time.')
name = prefix + '_trajB'
else:
logger.debug('Running forward in time.')
name = prefix + '_trajF'
logger.debug('Trajectory name: "%s"', name)
# Also create a message file for inspecting progress:
msg_file_name = os.path.join(self.exe_dir, f'msg-{name}.txt')
logger.debug('Writing propagation progress to: %s', msg_file_name)
msg_file = FileIO(msg_file_name, 'w', None, backup=False)
msg_file.open()
msg_file.write(f'# Preparing propagation with {self.description}')
msg_file.write(f'# Trajectory label: {name}')
initial_file = self.dump_frame(system, deffnm=prefix + '_conf')
msg_file.write(f'# Initial file: {initial_file}')
logger.debug('Initial state: %s', system)
if reverse != system.vel_rev:
logger.debug('Reversing velocities in initial config.')
msg_file.write('# Reversing velocities')
basepath = os.path.dirname(initial_file)
localfile = os.path.basename(initial_file)
initial_conf = os.path.join(basepath, f'r_{localfile}')
self._reverse_velocities(initial_file, initial_conf)
else:
initial_conf = initial_file
msg_file.write(f'# Initial config: {initial_conf}')
# Update system to point to the configuration file:
system.config = (initial_conf, None)
system.vel_rev = reverse
# Optionally continue an interrupted propagation instead of
# re-running it from scratch (see ``continue_interrupted``):
prefill_points = []
saved_maxlen = path.maxlen
if self.continue_interrupted:
init_sig = self._config_signature(initial_conf)
partial = self._read_resume_marker(reverse, init_sig)
if partial is not None:
prefill_points, cont_conf = self._prefill_partial(
system, partial, name)
if cont_conf is not None:
logger.warning(
'Resuming interrupted propagation "%s" from %s '
'(%d surviving frame(s)). The continued path is a '
'valid sample but not bit-identical to an '
'uninterrupted run.', name, partial,
len(prefill_points))
msg_file.write(f'# Resuming from partial: {partial}')
initial_conf = cont_conf
system.config = (initial_conf, None)
path.resumed = True
# Continue for the remaining length only; the surviving
# frames are prepended after propagation so the engine's
# update_energies aligns with the continued frames.
if path.maxlen is not None:
path.maxlen = max(path.maxlen - len(prefill_points), 1)
# Record (durably) that a propagation for this shot is now in
# progress; the marker points at the trajectory that will hold
# the surviving frames if we are interrupted again.
partial_for_marker = (os.path.basename(partial)
if partial is not None
else self._trajectory_filename(name))
self._write_resume_marker(reverse, partial_for_marker, init_sig,
complete=False)
# Propagate from this point:
msg_file.write(f'# Interfaces: {ens_set["interfaces"]}')
success, status = self._propagate_from(
name,
path,
system,
ens_set,
msg_file,
reverse=reverse
)
if self.continue_interrupted:
if prefill_points:
# Prepend the surviving frames (their energies are unset)
# ahead of the freshly continued frames, and restore the
# full path length cap.
path.phasepoints[:0] = prefill_points
path.maxlen = saved_maxlen
# Completed normally: drop the marker so this shot is not
# mistaken for an interrupted one later.
self._remove_resume_marker(reverse)
msg_file.close()
return success, status
[docs] @abstractmethod
def _propagate_from(self, name, path, system, ens_set, msg_file,
reverse=False):
"""
Run the actual propagation using the specific engine.
This method is called after :py:meth:`.propagate`. And we
assume that the necessary preparations before the actual
propagation (e.g. dumping of the configuration etc.) is
handled in that method.
Parameters
----------
name : string
A name to use for the trajectory we are generating.
path : object like :py:class:`.PathBase`
This is the path we use to fill in phase-space points.
system : object like :py:class:`.System`
The system object gives the initial state for the
integration. The order function is injected on the engine
(``self.order_function``).
ens_set : dict
Ensemble settings. It contains:
* `interfaces` : list of floats
These interfaces define the stopping criterion.
msg_file : object like :py:class:`.FileIO`
An object we use for writing out messages that are useful
for inspecting the status of the current propagation.
reverse : boolean, optional
If True, the system will be propagated backward in time.
Returns
-------
success : boolean
This is True if we generated an acceptable path.
status : string
A text description of the current status of the propagation.
"""
return
[docs] def _name_output(self, basename):
"""
Create a file name for the output file.
This method is used when we dump a configuration to add
the correct extension for GROMACS (either gro or g96).
Parameters
----------
basename : string
The base name to give to the file.
Returns
-------
out : string
A file name with an extension.
"""
out_file = f'{basename}.{self.ext}'
return os.path.join(self.exe_dir, out_file)
[docs] def dump_config(self, config, deffnm='conf'):
"""Extract configuration frame from a system if needed.
Parameters
----------
config : tuple
The configuration given as (filename, index).
deffnm : string, optional
The base name for the file we dump to.
Returns
-------
out : string
The file name we dumped to. If we did not in fact dump, this is
because the system contains a single frame and we can use it
directly. Then we return simply this file name.
Note
----
If the velocities should be reversed, this is handled elsewhere.
"""
pos_file, idx = config
out_file = os.path.join(self.exe_dir, self._name_output(deffnm))
if idx is None:
if pos_file != out_file:
self._copyfile(pos_file, out_file)
else:
logger.debug('Config: %s', (config, ))
self._extract_frame(pos_file, idx, out_file)
return out_file
[docs] def dump_frame(self, system, deffnm='conf'):
"""Just dump the frame from a system object."""
return self.dump_config(system.config, deffnm=deffnm)
[docs] def dump_phasepoint(self, phasepoint, deffnm='conf'):
"""Just dump the frame from a system object."""
pos_file = self.dump_config(phasepoint.config,
deffnm=deffnm)
phasepoint.config = (pos_file, None)