Source code for pyretis.engines.external

# Copyright (c) 2026, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Definition of external engines.

This module defines the base class for external MD engines.

Important classes defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

ExternalMDEngine (:py:class:`.ExternalMDEngine`)
    The base class for external engines which defines the
    interface to external programs.

"""
from abc import abstractmethod
import hashlib
import json
import logging
import os
import re
import subprocess  # nosec B404
import shutil
import sys
import numpy as np
from pyretis.core.common import counter
from pyretis.core.systemhelp import update_system_box
from pyretis.inout.common import atomic_write
from pyretis.inout.fileio import FileIO
from pyretis.engines.engine import EngineBase
from pyretis.engines._engine_logs import (
    cap_engine_log,
    engine_output_files,
    finalize_engine_output,
    open_engine_log,
)
from pyretis.inout.formats.cp2k import read_cp2k_box
logger = logging.getLogger(__name__)  # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())


__all__ = ['ExternalMDEngine']


[docs]class ExternalMDEngine(EngineBase): """ Base class for interfacing external MD engines. This class defines the interface to external programs. The interface will define how we interact with the external programs, how we write input files for them and read output files from them. New engines should inherit from this class and implement the following methods: * :py:meth:`ExternalMDEngine.step` A method for performing a MD step with the external engine. Note that the MD step can consist of a number of subcycles. * :py:meth:`ExternalMDEngine._read_configuration` For reading output (configurations) from the external engine. This is used for calculating the order parameter(s). * :py:meth:`ExternalMDEngine._reverse_velocities` For reversing velocities in a snapshot. This method will typically make use of the method :py:meth:`ExternalMDEngine._read_configuration`. * :py:meth:`ExternalMDEngine._extract_frame` For extracting a single frame from a trajectory. * :py:meth:`ExternalMDEngine._propagate_from` The method for propagating the equations of motion using the external engine. * :py:meth:`ExternalMDEngine.modify_velocities` The method used for generating random velocities for shooting points. Note that this method is defined in :py:meth:`EngineBase.modify_velocities`. Attributes ---------- description : string A string with a description of the external engine. This can, for instance, be what program we are interfacing with. This is used for outputting information to the user. timestep : float The time step used for the external engine. subcycles : integer The number of steps the external step is composed of. That is: each external step is really composed of ``subcycles`` number of iterations. """ engine_type = 'external' #: Optional cap (in bytes) on the retained ``engine.log`` stdout #: file. The log is appended to on every external step, so over a #: long run it can grow without bound. When this is set to a #: positive integer, after each successful command the log is #: trimmed to its most-recent ``engine_log_maxsize`` bytes (the #: oldest output is dropped and a marker line is written at the #: top), reclaiming disk immediately. ``None`` (the default) keeps #: the historical unbounded behaviour, so existing runs are #: unaffected. Set it on the class or an instance, e.g. #: ``engine.engine_log_maxsize = 100 * 1024 * 1024`` for a 100 MB #: cap. A failing command's log is never trimmed. engine_log_maxsize = None #: Opt-in resume of an interrupted propagation. When True, an #: abruptly interrupted propagation (e.g. a cluster wall-time kill) #: is *continued* on restart instead of being re-run from the #: shooting point: the frames that survived on disk are reused and #: the trajectory is propagated onward from the last good frame. #: The continued segment is a new, valid path (the integrator state #: is restarted from a configuration, so it is not bit-identical to #: an uninterrupted run); such paths are flagged via #: ``path.resumed``. ``False`` (the default) keeps the historical #: behaviour exactly, so existing runs are byte-for-byte unaffected. continue_interrupted = False
[docs] def __init__(self, description, timestep, subcycles): """ Set up the external engine. Here we just set up some common properties which are useful for the execution. Parameters ---------- description : string A string with a description of the external engine. This can, for instance, be what program we are interfacing with. This is used for outputting information to the user. timestep : float The time step used in the simulation. subcycles : integer The number of sub-cycles each external integration step is composed of. """ super().__init__(description) self.timestep = timestep self.subcycles = subcycles self.ext = 'xyz' self.input_files = {}
[docs] def integration_step(self, system): """ Perform a single time step of the integration. For external engines, it does not make much sense to run single steps unless we absolutely have to. We therefore just fail here. I.e. the external engines are not intended for performing pure MD simulations. If it's absolutely needed, there is a :py:meth:`self.step()` method which can be used, for instance in the initialisation. Parameters ---------- system : object like :py:class:`.System` A system to run the integration step on. """ msg = 'External engine does **NOT** support "integration_step()"!' logger.error(msg) raise NotImplementedError(msg)
[docs] @abstractmethod def step(self, system, name): """Perform a single step with the external engine. Parameters ---------- system : object like :py:class:`.System` The system we are integrating. name : string To name the output files from the external engine. Returns ------- out : string The name of the output configuration, obtained after completing the step. """ return
[docs] @abstractmethod def _read_configuration(self, filename): """Read output configuration from external software. Parameters ---------- filename : string The file to open and read a configuration from. Returns ------- out[0] : numpy.array The dimensions of the simulation box. out[1] : numpy.array The positions found in the given filename. out[2] : numpy.array The velocities found in the given filename. """ return
[docs] @abstractmethod def _reverse_velocities(self, filename, outfile): """Reverse velocities in a given snapshot. Parameters ---------- filename : string Input file with velocities. outfile : string File to write with reversed velocities. """ return
[docs] @staticmethod def _modify_input(sourcefile, outputfile, settings, delim='='): """ Modify input file for external software. Here we assume that the input file has a syntax consisting of ``keyword = setting``. We will only replace settings for the keywords we find in the file that is also inside the ``settings`` dictionary. Parameters ---------- sourcefile : string The path of the file to use for creating the output. outputfile : string The path of the file to write. settings : dict A dictionary with settings to write. delim : string, optional The delimiter used for separation keywords from settings. """ reg = re.compile(fr'(.*?){delim}') written = set() with open(sourcefile, 'r', encoding='utf-8') as infile, \ open(outputfile, 'w', encoding='utf-8') as outfile: for line in infile: to_write = line key = reg.match(line) if key: keyword = ''.join([key.group(1), delim]) keyword_strip = key.group(1).strip() if keyword_strip in settings: to_write = f'{keyword} {settings[keyword_strip]}\n' written.add(keyword_strip) outfile.write(to_write) # Add settings not yet written: for key, value in settings.items(): if key not in written: outfile.write(f'{key} {delim} {value}\n')
[docs] @staticmethod def _read_input_settings(sourcefile, delim='='): """ Read input settings for simulation input files. Here we assume that the input file has a syntax consisting of ``keyword = setting``, where ``=`` can be any string given in the input parameter ``delim``. Parameters ---------- sourcefile : string The path of the file to use for creating the output. delim : string, optional The delimiter used for separation keywords from settings. Returns ------- settings : dict of strings The settings found in the file. Note ---- Important: We are here assuming that there will *ONLY* be one keyword per line. """ reg = re.compile(fr'(.*?){delim}') settings = {} with open(sourcefile, 'r', encoding='utf-8') as infile: for line in infile: key = reg.match(line) if key: keyword_strip = key.group(1).strip() settings[keyword_strip] = line.split(delim)[1].strip() return settings
[docs] def execute_command(self, cmd, cwd=None, inputs=None): """ Execute an external command for the engine. We are here executing a command and then waiting until it finishes. The standard out and standard error are piped to files during the execution and can be inspected if the command fails. This method returns the return code of the issued command. Parameters ---------- cmd : list of strings The command to execute. cwd : string or None, optional The current working directory to set for the command. inputs : bytes or None, optional Additional inputs to give to the command. These are not arguments but more akin to keystrokes etc. that the external command may take. Returns ------- out : int The return code of the issued command. """ cmd2 = ' '.join(cmd) logger.debug('Executing: %s', cmd2) if inputs is not None: logger.debug('With input: %s', inputs) out_name, err_name = self._engine_output_files(cwd or self.exe_dir) return_code = None with self._open_engine_log(out_name) as fout, \ self._open_engine_log(err_name) as ferr: # Check if we are running a python script: if cmd[0].endswith('.py'): cmd = [sys.executable] + cmd exe = subprocess.Popen( # nosec B603 B607 cmd, stdin=subprocess.PIPE, stdout=fout, stderr=ferr, shell=False, cwd=cwd ) exe.communicate(input=inputs) # Note: communicate will wait until process terminates. return_code = exe.returncode if return_code != 0: logger.error('Execution of external program (%s) failed!', self.description) logger.error('Attempted command: %s', cmd2) logger.error('Execution directory: %s', cwd) if inputs is not None: logger.error('Input to external program was: %s', inputs) logger.error('Return code from external program: %i', return_code) logger.error('Engine stdout/stderr: %s, %s', out_name, err_name) msg = (f'Execution of external program ({self.description}) ' f'failed with command:\n {cmd2}.\n' f'Return code: {return_code}') raise RuntimeError(msg) self._finalize_engine_output(return_code, err_name) # The command succeeded (a failure raises above and keeps the # full log); cap the retained stdout log if a size limit is set. self._cap_engine_log(out_name, self.engine_log_maxsize) return return_code
[docs] @staticmethod def _cap_engine_log(path, maxsize): """Trim an engine log to its most-recent ``maxsize`` bytes. Thin wrapper around :func:`pyretis.engines._engine_logs.cap_engine_log`. """ cap_engine_log(path, maxsize)
[docs] @staticmethod def _engine_output_files(log_dir): """Return the engine stdout/stderr log paths for a run directory. Thin wrapper around :func:`pyretis.engines._engine_logs.engine_output_files`. """ return engine_output_files(log_dir)
[docs] @staticmethod def _open_engine_log(path): """Open an engine stdout/stderr log for appending, resiliently. Thin wrapper around :func:`pyretis.engines._engine_logs.open_engine_log`. """ return open_engine_log(path)
[docs] @staticmethod def _finalize_engine_output(return_code, err_name): """Keep stdout logs and discard stderr logs from successful runs. Thin wrapper around :func:`pyretis.engines._engine_logs.finalize_engine_output`. """ finalize_engine_output(return_code, err_name)
[docs] @staticmethod def _movefile(source, dest): """Move file from source to destination.""" logger.debug('Moving: %s -> %s', source, dest) shutil.move(source, dest)
[docs] @staticmethod def _copyfile(source, dest): """Copy file from source to destination.""" logger.debug('Copy: %s -> %s', source, dest) shutil.copyfile(source, dest)
[docs] @staticmethod def _removefile(filename): """Remove a given file if it exist.""" try: os.remove(filename) logger.debug('Removing: %s', filename) except OSError: logger.debug('Could not remove: %s', filename)
[docs] def _remove_files(self, dirname, files): """Remove files from a directory. Parameters ---------- dirname : string Where we are removing. files : list of strings A list with files to remove. """ for i in files: self._removefile(os.path.join(dirname, i))
[docs] def clean_up(self): """Will remove all files from the current directory.""" dirname = self.exe_dir logger.debug('Running engine clean-up in "%s"', dirname) files = [item.name for item in os.scandir(dirname) if item.is_file()] self._remove_files(dirname, files)
[docs] @staticmethod def draw_maxwellian_velocities(vel, mass, beta, rgen, sigma_v=None): """Draw velocities from a Maxwell-Boltzmann distribution. This is the engine-independent (Python) velocity draw used when an external engine is configured to generate shooting velocities via ``velocity_generation = "maxwell"`` instead of delegating to the engine's own generator. The velocities are drawn from the injected random generator so the whole shooting move stays on one reproducible PCG64 stream. Parameters ---------- vel : numpy.ndarray The current velocities; only the shape ``(npart, dim)`` is used (the values are overwritten). mass : numpy.ndarray The particle masses, shape ``(npart, 1)``. beta : float The inverse temperature ``1 / (k_B T)`` in the engine's unit system. rgen : object like :py:class:`.RandomGenerator` The velocity random generator, injected explicitly. Its ``normal`` draw matches ``numpy.random.Generator.normal`` under the canonical PCG64 generator. sigma_v : numpy.ndarray, optional Per-particle standard deviation. When ``None`` (or any negative entry) it is computed from ``beta`` and ``mass``. Returns ------- vel : numpy.ndarray The newly drawn velocities, shape ``(npart, dim)``. sigma_v : numpy.ndarray The standard deviation used for the draw. """ if sigma_v is None or np.any(sigma_v < 0.0): kbt = 1.0 / beta sigma_v = np.sqrt(kbt * (1.0 / mass)) npart, dim = vel.shape vel = rgen.normal(loc=0.0, scale=sigma_v, size=(npart, dim)) return vel, sigma_v
[docs] def calculate_order(self, system, xyz=None, vel=None, box=None): """ Calculate order parameter from configuration in a file. Note, if ``xyz``, ``vel`` or ``box`` are given, we will **NOT** read positions, velocity and box information from the current configuration file. Parameters ---------- system : object like :py:class:`.System` This is the system that contains the particles we are investigating. The order function is injected on the engine as ``self.order_function``. xyz : numpy.array, optional The positions to use, in case we have already read them somewhere else. We will then not attempt to read them again. vel : numpy.array, optional The velocities to use, in case we already have read them. box : numpy.array, optional The current box vectors, in case we already have read them. Returns ------- out : list of floats The calculated order parameter(s). """ order_function = self.order_function if any((xyz is None, vel is None, box is None)): out = self._read_configuration(system.config[0]) box = out[0] xyz = out[1] vel = out[2] system.pos = xyz if system.vel_rev: if vel is not None: system.vel = -1.0 * vel else: system.vel = vel if box is None and self.input_files.get('template', False): # CP2K specific box initiation: box, _ = read_cp2k_box(self.input_files['template']) update_system_box(system, box) return order_function.calculate(system)
[docs] def kick_across_middle(self, system, ens_set, middle, tis_settings, rgen): """ Force a phase point across the middle interface. This is accomplished by repeatedly kicking the phase point so that it crosses the middle interface. Parameters ---------- system : object like :py:class:`.System` The working state we repeatedly kick and integrate. The order function is injected on the engine (``self.order_function``). ens_set : dict Ensemble settings (unused here directly, kept for interface symmetry with the other explicit-convention methods). middle : float This is the value for the middle interface. tis_settings : dict This dictionary contains settings for TIS. Explicitly used here: * `zero_momentum`: boolean, determines if the momentum is zeroed. * `rescale_energy`: boolean, determines if energy is re-scaled. rgen : object like :py:class:`.RandomGenerator` The velocity-generation random generator, injected per call (distinct from any integration ``self.rgen``). Returns ------- out[0] : object like :py:class:`.System` The phase-point just before the crossing the interface. out[1] : object like :py:class:`.System` The phase-point just after crossing the interface. Note ---- This function will update the input system state. """ # ``system`` is the working state we kick and integrate; # ``right_point`` is the copy we keep as the point just past the # crossing and return. right_point = system.copy() logger.info('Kicking with external integrator: %s', self.description) # We search for crossing with the middle interface and do this # by sequentially kicking the initial phase point # Let's get the starting point: initial_file = self.dump_frame(right_point) # Create a "previous file" for storing the state before a new kick: prev_file = os.path.join( self.exe_dir, f'p_{os.path.basename(initial_file)}' ) msg_file_name = os.path.join(self.exe_dir, 'msg-kick.txt') msg_file = FileIO(msg_file_name, 'w', None, backup=False) msg_file.open() msg_file.write(f'Kick initiation for {self.description}') self._copyfile(initial_file, prev_file) # Update so that we use the prev_file: system.config = (prev_file, None) logger.info('Searching for crossing with: %9.6g', middle) logger.info('Writing progress to: %s', msg_file_name) while True: msg_file.write('New kick:') # Do kick from current state: msg_file.write('\tModify velocities...') vel_settings = {'sigma_v': -1, 'momentum': False, 'rescale': None} self.modify_velocities(system, vel_settings, rgen) # Update order parameter in case it's velocity dependent: order = self.calculate_order(system) curr = order[0] msg_file.write(f'\tAfter kick: {curr}') previous = system.copy() previous.order = order # Update system by integrating forward: msg_file.write('\tIntegrate forward...') conf = self.step(system, 'kicked') curr_file = os.path.join(self.exe_dir, conf) # Compare previous order parameter and the new one: prev = curr curr = self.calculate_order(system)[0] txt = f'{prev} -> {curr} | {middle}' msg_file.write(f'\t{txt}') if (prev <= middle < curr) or (curr < middle <= prev): logger.info('Crossed middle interface: %s', txt) msg_file.write('\tCrossed middle interface!') # Middle interface was crossed, just stop the loop. self._copyfile(curr_file, prev_file) # Update file name after moving: right_point.config = (prev_file, None) break if (prev <= curr < middle) or (middle < curr <= prev): # Getting closer, keep the new point: logger.debug('Getting closer to middle: %s', txt) msg_file.write( '\tGetting closer to middle, keeping new point.' ) self._movefile(curr_file, prev_file) # Update file name after moving: system.config = (prev_file, None) else: # We did not get closer, fall back to previous point: logger.debug('Did not get closer to middle: %s', txt) msg_file.write( '\tDid not get closer, fall back to previous point.' ) system = previous.copy() curr = previous.order[0] self._removefile(curr_file) msg_file.flush() msg_file.close() self._removefile(msg_file_name) return previous, right_point
[docs] @abstractmethod def _extract_frame(self, traj_file, idx, out_file): """Extract a frame from a trajectory file. Parameters ---------- traj_file : string The trajectory file to open. idx : integer The frame number we look for. out_file : string The file to dump to. """ return
[docs] def _frame_config_index(self, frame): """Map a trajectory-frame number to its configuration index. The configuration index is what is stored in a snapshot's ``config`` tuple and later passed to :py:meth:`._extract_frame`. For most engines it equals the trajectory-frame number; engines that index configurations by MD time step (e.g. the streaming LAMMPS engine, which stores ``frame * subcycles``) override this. Parameters ---------- frame : integer The trajectory-frame number (0-based). Returns ------- out : integer The configuration index for that frame. """ return frame
[docs] def _trajectory_filename(self, name): """Return the basename of the trajectory file for a propagation. This is the file the engine writes frames to during :py:meth:`._propagate_from`. Engines whose trajectory format differs from their configuration format (e.g. GROMACS writes a ``.trr`` while configurations are ``.gro``/``.g96``) override this. Parameters ---------- name : string The trajectory label built in :py:meth:`.propagate`. Returns ------- out : string The trajectory file basename. """ return f'{name}.{self.ext}'
[docs] def _resume_marker_path(self, reverse): """Return the path of the per-direction resume marker file.""" direction = 'trajB' if reverse else 'trajF' return os.path.join(self.exe_dir, f'_resume_{direction}.json')
[docs] def _config_signature(self, filename): """Return a hash identifying a configuration's physical content. Used to confirm that an interrupted propagation we are about to resume really belongs to the shot we are now re-attempting: the shooting point is regenerated deterministically on restart, so its positions/velocities are identical. We hash the coordinates (not the raw file bytes) so format-specific metadata such as a title line does not affect the comparison. """ box, xyz, vel = self._read_configuration(filename)[:3] hasher = hashlib.sha1(usedforsecurity=False) # nosec B324 for array in (xyz, vel, box): if array is not None: hasher.update(np.ascontiguousarray(array).tobytes()) return hasher.hexdigest()
[docs] def _write_resume_marker(self, reverse, traj_file, init_sig, complete): """Write (atomically and durably) the resume marker.""" marker = { 'traj_file': os.path.basename(traj_file), 'init_sig': init_sig, 'reverse': bool(reverse), 'complete': bool(complete), } atomic_write( self._resume_marker_path(reverse), lambda fileh: fileh.write(json.dumps(marker)), binary=False, )
[docs] def _remove_resume_marker(self, reverse): """Remove the resume marker once a propagation has completed.""" marker_path = self._resume_marker_path(reverse) if os.path.isfile(marker_path): os.remove(marker_path)
[docs] def _read_resume_marker(self, reverse, init_sig): """Return a resumable partial trajectory, or None. A marker is resumable only when it records an *incomplete* propagation whose initial configuration matches ``init_sig`` (same shot) and whose partial trajectory still exists on disk. Parameters ---------- reverse : boolean The propagation direction we are about to (re)run. init_sig : string Content signature of the current shot's initial config. Returns ------- out : string or None The path to the partial trajectory to resume from, or None. """ marker_path = self._resume_marker_path(reverse) if not os.path.isfile(marker_path): return None with open(marker_path, 'r', encoding='utf-8') as fileh: marker = json.load(fileh) if marker.get('complete', True): return None if marker.get('init_sig') != init_sig: # A different shot left this marker behind; ignore it. return None partial = os.path.join(self.exe_dir, marker['traj_file']) if not os.path.isfile(partial): return None return partial
[docs] def _recompute_partial_orders(self, partial_traj, reverse): """Order parameters for every readable frame, in file order. Reads the surviving frames of an interrupted trajectory and recomputes their order parameters (the deterministic source of truth). A frame left broken by the interruption is tolerated: the underlying readers stop at the last readable frame. """ # Local import is required: pyretis.tools.recalculate_order pulls # in the engines package, so a module-level import here would be # circular. # pylint: disable=import-outside-toplevel from pyretis.tools.recalculate_order import recalculate_order options = {'minidx': None, 'maxidx': None, 'reverse': reverse} if partial_traj[-4:] == '.xyz' and \ self.input_files.get('template', False): options['box'], _ = read_cp2k_box(self.input_files['template']) orders = list( recalculate_order(self.order_function, partial_traj, options) ) if reverse: # recalculate_order reverses the list for reverse runs; undo # that here so the orders are in trajectory-file order, which # is the order in which _propagate_from stored the frames. orders = list(reversed(orders)) return orders
[docs] def _prefill_partial(self, system, partial_traj, name): """Rebuild the surviving frames of an interrupted path. Returns a tuple ``(phase_points, cont_conf)`` where ``phase_points`` are the surviving frames *except* the last good one (rebuilt from the partial trajectory, with order parameters recomputed) and ``cont_conf`` is a configuration file holding the last good frame, from which the propagation continues. The last good frame is excluded because :py:meth:`._propagate_from` re-adds it as the first frame of the continued segment. The surviving frames are deliberately *not* added to the caller's path here: the caller prepends them *after* :py:meth:`._propagate_from` so the engine's own ``update_energies`` aligns with the continued frames only. The surviving frames' energies are not recoverable from the trajectory and are therefore left unset (``None``). Returns ``(None, None)`` when there are fewer than two readable frames (nothing worth resuming). """ orders = self._recompute_partial_orders(partial_traj, reverse=system.vel_rev) if len(orders) < 2: return None, None cont_idx = len(orders) - 1 phase_points = [] for i in range(cont_idx): snapshot = {'order': orders[i], 'config': (partial_traj, self._frame_config_index(i)), 'vel_rev': system.vel_rev} phase_points.append(self.snapshot_to_system(system, snapshot)) cont_conf = os.path.join(self.exe_dir, f'resume_{name}.{self.ext}') self._extract_frame(partial_traj, self._frame_config_index(cont_idx), cont_conf) return phase_points, cont_conf
[docs] def propagate(self, path, ens_set, system, reverse=False): """ Propagate the equations of motion with the external code. This method will explicitly do the common set-up, before calling more specialised code for doing the actual propagation. Parameters ---------- path : object like :py:class:`.PathBase` This is the path we use to fill in phase-space points. We are here not returning a new path - this since we want to delegate the creation of the path to the method that is running `propagate`. ens_set : dict Ensemble settings. It contains: * `interfaces` : list of floats These interfaces define the stopping criterion. * `path_ensemble` : optional, used to build the working-file name prefix when present. system : object like :py:class:`.System` The system object gives the initial state for the integration. It is mutated in place (configuration file and velocity-reversal flag) during propagation; the caller owns the lifecycle and passes a fresh copy per call. The order function and random generator are injected on the engine (``self.order_function`` / ``self.rgen``). reverse : boolean, optional If True, the system will be propagated backward in time. Returns ------- success : boolean This is True if we generated an acceptable path. status : string A text description of the current status of the propagation. """ logger.debug('Running propagate with: "%s"', self.description) prefix = str(counter()) if ens_set.get('path_ensemble', False): prefix = ens_set['path_ensemble'].ensemble_name_simple + '_' \ + prefix if reverse: logger.debug('Running backward in time.') name = prefix + '_trajB' else: logger.debug('Running forward in time.') name = prefix + '_trajF' logger.debug('Trajectory name: "%s"', name) # Also create a message file for inspecting progress: msg_file_name = os.path.join(self.exe_dir, f'msg-{name}.txt') logger.debug('Writing propagation progress to: %s', msg_file_name) msg_file = FileIO(msg_file_name, 'w', None, backup=False) msg_file.open() msg_file.write(f'# Preparing propagation with {self.description}') msg_file.write(f'# Trajectory label: {name}') initial_file = self.dump_frame(system, deffnm=prefix + '_conf') msg_file.write(f'# Initial file: {initial_file}') logger.debug('Initial state: %s', system) if reverse != system.vel_rev: logger.debug('Reversing velocities in initial config.') msg_file.write('# Reversing velocities') basepath = os.path.dirname(initial_file) localfile = os.path.basename(initial_file) initial_conf = os.path.join(basepath, f'r_{localfile}') self._reverse_velocities(initial_file, initial_conf) else: initial_conf = initial_file msg_file.write(f'# Initial config: {initial_conf}') # Update system to point to the configuration file: system.config = (initial_conf, None) system.vel_rev = reverse # Optionally continue an interrupted propagation instead of # re-running it from scratch (see ``continue_interrupted``): prefill_points = [] saved_maxlen = path.maxlen if self.continue_interrupted: init_sig = self._config_signature(initial_conf) partial = self._read_resume_marker(reverse, init_sig) if partial is not None: prefill_points, cont_conf = self._prefill_partial( system, partial, name) if cont_conf is not None: logger.warning( 'Resuming interrupted propagation "%s" from %s ' '(%d surviving frame(s)). The continued path is a ' 'valid sample but not bit-identical to an ' 'uninterrupted run.', name, partial, len(prefill_points)) msg_file.write(f'# Resuming from partial: {partial}') initial_conf = cont_conf system.config = (initial_conf, None) path.resumed = True # Continue for the remaining length only; the surviving # frames are prepended after propagation so the engine's # update_energies aligns with the continued frames. if path.maxlen is not None: path.maxlen = max(path.maxlen - len(prefill_points), 1) # Record (durably) that a propagation for this shot is now in # progress; the marker points at the trajectory that will hold # the surviving frames if we are interrupted again. partial_for_marker = (os.path.basename(partial) if partial is not None else self._trajectory_filename(name)) self._write_resume_marker(reverse, partial_for_marker, init_sig, complete=False) # Propagate from this point: msg_file.write(f'# Interfaces: {ens_set["interfaces"]}') success, status = self._propagate_from( name, path, system, ens_set, msg_file, reverse=reverse ) if self.continue_interrupted: if prefill_points: # Prepend the surviving frames (their energies are unset) # ahead of the freshly continued frames, and restore the # full path length cap. path.phasepoints[:0] = prefill_points path.maxlen = saved_maxlen # Completed normally: drop the marker so this shot is not # mistaken for an interrupted one later. self._remove_resume_marker(reverse) msg_file.close() return success, status
[docs] @abstractmethod def _propagate_from(self, name, path, system, ens_set, msg_file, reverse=False): """ Run the actual propagation using the specific engine. This method is called after :py:meth:`.propagate`. And we assume that the necessary preparations before the actual propagation (e.g. dumping of the configuration etc.) is handled in that method. Parameters ---------- name : string A name to use for the trajectory we are generating. path : object like :py:class:`.PathBase` This is the path we use to fill in phase-space points. system : object like :py:class:`.System` The system object gives the initial state for the integration. The order function is injected on the engine (``self.order_function``). ens_set : dict Ensemble settings. It contains: * `interfaces` : list of floats These interfaces define the stopping criterion. msg_file : object like :py:class:`.FileIO` An object we use for writing out messages that are useful for inspecting the status of the current propagation. reverse : boolean, optional If True, the system will be propagated backward in time. Returns ------- success : boolean This is True if we generated an acceptable path. status : string A text description of the current status of the propagation. """ return
[docs] def _name_output(self, basename): """ Create a file name for the output file. This method is used when we dump a configuration to add the correct extension for GROMACS (either gro or g96). Parameters ---------- basename : string The base name to give to the file. Returns ------- out : string A file name with an extension. """ out_file = f'{basename}.{self.ext}' return os.path.join(self.exe_dir, out_file)
[docs] def dump_config(self, config, deffnm='conf'): """Extract configuration frame from a system if needed. Parameters ---------- config : tuple The configuration given as (filename, index). deffnm : string, optional The base name for the file we dump to. Returns ------- out : string The file name we dumped to. If we did not in fact dump, this is because the system contains a single frame and we can use it directly. Then we return simply this file name. Note ---- If the velocities should be reversed, this is handled elsewhere. """ pos_file, idx = config out_file = os.path.join(self.exe_dir, self._name_output(deffnm)) if idx is None: if pos_file != out_file: self._copyfile(pos_file, out_file) else: logger.debug('Config: %s', (config, )) self._extract_frame(pos_file, idx, out_file) return out_file
[docs] def dump_frame(self, system, deffnm='conf'): """Just dump the frame from a system object.""" return self.dump_config(system.config, deffnm=deffnm)
[docs] def dump_phasepoint(self, phasepoint, deffnm='conf'): """Just dump the frame from a system object.""" pos_file = self.dump_config(phasepoint.config, deffnm=deffnm) phasepoint.config = (pos_file, None)