Source code for pyretis.engines.ams

# Copyright (c) 2023, infRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""A AMS external MD integrator interface.

This module defines a class for using AMS as an external engine.

Important classes defined here
------------------------------

AMSEngine (:py:class:`.AMSEngine`)
    A class responsible for interfacing AMS.
"""
# pylint: disable=import-error

from __future__ import annotations

import copy
import logging
import os
import time
import weakref
from typing import TYPE_CHECKING, Any, Dict, Tuple

import numpy as np
from scm.plams.interfaces.adfsuite.ams import AMSJob
from scm.plams.interfaces.adfsuite.amsworker import AMSWorker
from scm.plams.tools.units import Units
from scm.plams.trajectories.rkffile import RKFTrajectoryFile

from pyretis.engines.external import ExternalMDEngine
from pyretis.engines._parts import box_matrix_to_list

if TYPE_CHECKING:
    from pyretis.core.path import Path as InfPath
    from pyretis.core._system_inf import System
    from pyretis.inout.fileio import FileIO

# ``FileIO``, ``InfPath``, ``System`` are type-only references kept as
# PEP 563 forward strings until Phase 2 ports those modules.

logger = logging.getLogger(__name__)  # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())


[docs]class AMSEngine(ExternalMDEngine): # , metaclass=Singleton): """ A class for interfacing AMS. This class defines the interface to AMS. Attributes ---------- input_path : string The directory where the input files are stored. input_files : dict of strings The names of the input files. We expect to find the keys ``'conf'``, ``'input'`` ``'topology'``. ext_time : float The time to extend simulations by. It is equal to ``timestep * subcycles``. """ engine_type = 'internal' needs_order = False
[docs] def __init__(self, input_path, timestep, subcycles): """Set up the AMS engine. Parameters ---------- input_path : string The absolute path to where the input files are stored. timestep : float The time step used in the GROMACS MD simulation. subcycles : integer The number of steps each GROMACS MD run is composed of. """ super().__init__("AMS engine", timestep, subcycles) self.ext = "rkf" # The native base defaults exe_dir to ``None`` and defines no step # counter; restore the inf-base defaults the scheduler relies on # (exe_dir ``"."`` before ``set_mdrun`` points it at the per-worker # directory, ``self.steps`` for subcycle accounting). self.exe_dir = "." self.steps = 0 # Units of AMS output, must correspond to set infRETIS units self.ene_unit = "kJ/mol" self.dist_unit = "nm" self.time_unit = "ps" self.name = "ams" # Store MD states self.states = {} self.oldstates = [] self.ens_name = "init" self.n_init = 0 # If input trajectories have different boxsize set to True: self.update_box = True # Add input path and the input files: self.input_path = os.path.abspath(input_path) self.set_idx = False # Expected input files self.input_files = { "input": "ams.inp", } # Read AMS input inpf = os.path.join(self.input_path, self.input_files["input"]) with open(inpf, encoding="utf-8") as read_file: inp = read_file.read() # extract input geometry geometry_file_line = next( (line for line in inp.splitlines() if "GeometryFile" in line), None ) if geometry_file_line is not None: geometry_file_path = geometry_file_line.split()[-1].strip() # get absolute path as string geometry_file_path = str( os.path.abspath( os.path.join(self.input_path, geometry_file_path) ) ) if "traj" in os.path.basename(geometry_file_path): logger.error("GeometryFile is not allowed to contain 'traj'") logger.error( "Extracted GeometryFile path: %s", geometry_file_path ) raise ValueError( "GeometryFile is not allowed to contain 'traj'. " f"Extracted GeometryFile path: {geometry_file_path}" ) else: logger.info( "AMS: GeometryFile was not set in AMS input file! " + "- Will fail in the case InfInit is used" ) job = AMSJob.from_input(inp) settings = job.settings if hasattr(settings.input.ams, "System"): del settings.input.ams.System molecule = job.molecule[""] if self.update_box: self.molecule_lattice = molecule.lattice # Check input settings self.temperature = ( settings.input.ams.moleculardynamics.initialvelocities.temperature ) if len(self.temperature) == 0: logger.error("AMS: InitialVelocities Temperature was not set!") raise ValueError("AMS: InitialVelocities Temperature was not set!") self.temperature = float(self.temperature) ams_timestep = settings.input.ams.moleculardynamics.timestep if len(ams_timestep) == 0: # Default timestep in AMS, but unknown it here logger.error("AMS: Timestep was not set!") raise ValueError("AMS: Timestep was not set!") ams_timestep = float(ams_timestep) * Units.conversion_ratio( "fs", self.time_unit ) if timestep != ams_timestep: logger.error("Mismatch between AMS and InfRETIS timestep!") raise ValueError("Mismatch between AMS and InfRETIS timestep!") self.random_velocities_method = None initial_velocities = ( settings.input.ams.moleculardynamics.initialvelocities ) random_velocities_method = initial_velocities.randomvelocitiesmethod if len(random_velocities_method) > 0: logger.info( 'AMS setting velocity generation method to "%s"', random_velocities_method, ) self.random_velocities_method = random_velocities_method ams_dir = "." # Start AMS worker self.worker = AMSWorker( settings, workerdir_root=ams_dir, keep_crashed_workerdir=True, always_keep_workerdir=True, ) if geometry_file_line is not None: self.worker.CreateMDState(geometry_file_path, molecule) state = self.worker.MolecularDynamics( geometry_file_path, nsteps=0, setsteptozero=True ) # Also writes frame into out_file self._add_state(geometry_file_path, state[0]) # print('self.states', self.states) self._finalize = weakref.finalize(self, self.worker.stop)
[docs] def step(self, system, name, set_trajfile=True, set_step_to_zero=False): """Perform a single step with AMS. Parameters ---------- system : object like :py:class:`.System` The system we are integrating. name : string To name the output files from the AMS step. set_trajfile : logical Optional, True if new output file should be opened set_step_to_zero : logical Optional, True if new MD step should start from number zero Returns ------- out : string The name of the output configuration, obtained after completing the step. """ state, idx = system.config if idx == -1: prev_ams_state = state # state already contains exe_dir new_ams_state = os.path.join(self.exe_dir, name) # name does not new_state = new_ams_state else: prev_ams_state = ( state + "_" + str(idx) ) # state already contains exe_dir new_ams_state = os.path.join( self.exe_dir, name + "_" + str(idx + 1) ) # name does not new_state = state idx = idx + 1 if set_trajfile: logger.info("AMS setting output file: %s", new_state) if os.path.exists( new_state ): # File must never be there before PrepareMD self._removefile(new_state, disk_only=True) self.worker.PrepareMD(new_state) # This might be removed, but nice way to check what is going on logger.info("AMS step %s -> %s", prev_ams_state, new_ams_state) self.worker.CopyMDState(prev_ams_state, new_ams_state) states = self.worker.MolecularDynamics( new_ams_state, nsteps=self.subcycles, trajectorysamplingfrequency=self.subcycles, checkpointfrequency=0, pipesamplingfrequency=self.subcycles, setsteptozero=set_step_to_zero, ) # Update system system.set_pos((new_state, idx)) system.vel_rev = False # Here, we are not concerned if we also got the initial state or not # from the AMSWorker. Next state is always the last one. system.vpot = states[-1].get_potentialenergy(unit=self.ene_unit) system.ekin = states[-1].get_kineticenergy(unit=self.ene_unit) system.etot = system.vpot + system.ekin system.temp = states[-1].get_temperature() # Save state self._add_state(new_state, states[-1]) return name
[docs] def calculate_order(self, system, xyz=None, vel=None, box=None): """Calculate the order parameter from a configuration. The AMS ``_read_configuration`` returns ``(xyz, vel, box, names)`` with the box as the THIRD element, so this override unpacks the configuration accordingly (the native external base assumes a different ordering). Parameters ---------- system : object like :py:class:`.System` The system that contains the particles we are investigating. xyz : numpy.array, optional The positions to use, in case we have already read them. vel : numpy.array, optional The velocities to use, in case we have already read them. box : numpy.array, optional The current box vectors, in case we have already read them. Returns ------- out : list of floats The calculated order parameter(s). """ if any((xyz is None, vel is None, box is None)): out = self._read_configuration(system.config[0]) xyz = out[0] vel = out[1] box = out[2] if xyz is not None: system.pos = xyz if vel is not None: system.vel = vel * -1.0 if system.vel_rev else vel if box is not None: system.box = box if self.order_function is None: raise ValueError("Order parameter is not defined!") return self.order_function.calculate(system)
[docs] @staticmethod def add_to_path(path, phase_point, left, right): """Add a phase point to the path and check for crossings. This override preserves the inf-flavour semantics expected by ``_propagate_from``: it returns a 3-tuple and lets a crossing of either interface win on the boundary (the native external base returns a 4-tuple with different maxlen handling). Parameters ---------- path : object like :py:class:`.PathBase` The path to add to. phase_point : object like :py:class:`.System` The phase point to add to the path. left : float The left interface. right : float The right interface. Returns ------- status : string A text description of the current status of the propagation. success : boolean True if we generated an acceptable path. stop : boolean True if the propagation should stop. """ status = "Running propagate..." success = False stop = False path.append(phase_point) if path.phasepoints[-1].order[0] < left: status = "Crossed left interface!" success = True stop = True elif path.phasepoints[-1].order[0] > right: status = "Crossed right interface!" success = True stop = True elif path.length >= path.maxlen: status = "Max. path length exceeded" success = False stop = True return status, success, stop
[docs] def dump_phasepoint(self, phasepoint, deffnm="conf"): """Dump the configuration of a phase point to a file. This override preserves byte-identity with the inf-flavour engines, which stored ``(pos_file, 0)`` as the configuration index rather than the native ``(pos_file, None)``. Parameters ---------- phasepoint : object like :py:class:`.System` The phase point whose configuration we dump to file. deffnm : string, optional The default file name for the dumped configuration. """ pos_file = self.dump_config(phasepoint.config, deffnm=deffnm) phasepoint.set_pos((pos_file, 0))
[docs] def _read_configuration(self, filename, idx=-1): """Read output from AMS snapshot/trajectory. Parameters ---------- filename : string The file to read the configuration from. idx : integer Optional, frame index in trajectory Returns ------- box : numpy.array The box dimensions. xyz : numpy.array The positions. vel : numpy.array The velocities. """ if idx == -1: idx = 0 if not self.states: logger.info( "Infinit requirement for 0 paths: self.set_idx = True, idx = 0" ) self.set_idx = True self.n_init = 0 state = self.states[filename][idx] box = state.get_latticevectors(unit=self.dist_unit) if len(box) == 0: box = [float("inf"), float("inf"), float("inf")] else: box = box_matrix_to_list(box) xyz = state.get_coords(unit=self.dist_unit) vel = state.get_velocities( dist_unit=self.dist_unit, time_unit=self.time_unit ) return xyz, vel, box, None
[docs] def set_mdrun(self, md_items): """Set up the molecular dynamics run with the given parameters. Parameters ---------- md_items : dict A dictionary with the keys used here: ``exe_dir`` (str), the directory where the executable is located, and ``ens`` (dict), the ensemble information carrying ``ens_name`` (str), the ensemble name. Notes ----- Sets the executable directory and ensemble name, logs the executable directory, deletes old states that are no longer in use, and updates the list of old states to the current states. """ self.exe_dir = md_items["exe_dir"] if "ens" in md_items: self.ens_name = md_items["ens"]["ens_name"] + "_" else: self.ens_name = "init" logger.info( "self.exe_dir %s md_items['exe_dir'] %s", self.exe_dir, md_items["exe_dir"], ) delete_states = [] for state in self.oldstates: if state in self.states: delete_states.append(state) for state in delete_states: self._deletestate(state) self.oldstates = self.states.keys()
[docs] def _reverse_velocities(self, filename, outfile): """Reverse velocity in a given snapshot. Parameters ---------- filename : string The configuration to reverse velocities in. outfile : string The output file for storing the configuration with reversed velocities. """ logger.info("AMS reversing velocities for %s", filename) if os.path.exists( outfile ): # File must never be there before PrepareMD self._removefile(outfile) self.worker.PrepareMD(outfile) self._copystate( filename, outfile ) # copy only the state, file will be written later # Here we are working with AMS internal representation of velocities, # thus, we keep the units a.u./a.u. vel = self.states[filename][0].get_velocities( dist_unit="au", time_unit="au" ) rev_vel = -1.0 * vel self.states[outfile][0]._state["velocities"] = rev_vel self.worker.SetVelocities( outfile, rev_vel, dist_unit="au", time_unit="au" ) self.worker.MolecularDynamics(outfile, nsteps=0, setsteptozero=True)
[docs] def _extract_frame_from_disk(self, traj_file, idx, out_file): """Extract a frame from a trajectory file from disk. Parameters ---------- traj_file : string The AMS file to open. idx : integer The frame number we look for. out_file : string The file to dump to. Note ---- This will only properly work if the frames in the input trajectory are uniformly spaced in time. """ logger.info( "AMS extracting frame from disk: %s, %i -> %s", traj_file, idx, out_file, ) rkf = RKFTrajectoryFile(traj_file) rkf.store_mddata() molecule = rkf.get_plamsmol() rkf.read_frame(idx, molecule=molecule) if self.update_box: molecule.lattice = self.molecule_lattice if os.path.exists( out_file ): # file must never be there before PrepareMD self._removefile(out_file) if out_file in self.states: self._deletestate(out_file) self.worker.PrepareMD(out_file) try: self.worker.CreateMDState(out_file, molecule) except Exception as e: if "MD state with given title already exists" in str(e): print("MD state with given title already exists: ", out_file) logger.error("AMS error in CreateMDState: %s", str(e)) self.worker.DeleteMDState(out_file) self.worker.CreateMDState(out_file, molecule) else: raise e if "Velocities" in rkf.mddata: vel = rkf.mddata["Velocities"] vel = np.reshape( vel, (-1, 3) ) # RKFTrajectoryFile returns 1D array self.worker.SetVelocities( out_file, vel, dist_unit="bohr", time_unit="fs" ) # Units used in rkf file state = self.worker.MolecularDynamics( out_file, nsteps=0, setsteptozero=True ) # Also writes frame into out_file self._add_state(out_file, state[0])
[docs] def _extract_frame(self, traj_file, idx, out_file): """Extract a frame from a trajectory file. Parameters ---------- traj_file : string The AMS file to open. idx : integer The frame number we look for. out_file : string The file to dump to. Note ---- This will only properly work if the frames in the input trajectory are uniformly spaced in time. """ if traj_file in self.states: logger.info( "AMS extracting frame: %s, %i -> %s", traj_file, idx, out_file ) self._copystate(traj_file, out_file, idx=idx) self.worker.PrepareMD(out_file) self.worker.MolecularDynamics( out_file, nsteps=0, setsteptozero=True ) # Writes traj file else: try: self._extract_frame_from_disk(traj_file, idx, out_file) except Exception as e: logger.error( "Error extracting frame from %s, idx=%s: %s", traj_file, idx, e, ) logger.info( "Wait until file is written: %s, idx=%s", traj_file, idx ) # wait until the file is ready wait_seconds = 0 last_mtime = None while wait_seconds < 1200: if os.path.exists(traj_file): current_mtime = os.path.getmtime(traj_file) if ( last_mtime is not None and current_mtime != last_mtime ): # File has been updated break last_mtime = current_mtime else: last_mtime = None # Reset if file disappeared time.sleep(0.1) wait_seconds += 0.1 if not os.path.exists(traj_file): logger.error( "File %s did not appear within 1200 seconds.", traj_file, ) raise RuntimeError( f"Failed to extract frame for {traj_file}, " + f"idx={idx}, after waiting for file availability" ) from e elif wait_seconds >= 1200: logger.warning( "File %s exists but still changed in 1200 seconds.", traj_file, ) raise RuntimeError( f"Failed to extract frame for {traj_file}, " + f"idx={idx}, after waiting for file availability" ) from e else: logger.info( "File is available and updated after %.1f seconds: %s", wait_seconds, traj_file, ) self._extract_frame_from_disk(traj_file, idx, out_file)
[docs] def _propagate_from( self, name: str, path: InfPath, system: System, ens_set: Dict, msg_file: FileIO, reverse: bool = False, ) -> Tuple[bool, str]: """ Propagate with AMS from the current system configuration. Here, we assume that this method is called after the propagate() has been called in the parent. The parent is then responsible for reversing the velocities and also for setting the initial state of the system. Parameters ---------- name : string A name to use for the trajectory we are generating. path : object like :py:class:`infretis.core.path.PathBase` This is the path we use to fill in phase-space points. ensemble: dict It contains: * `system`: object like :py:class:`.System` The system object gives the initial state for the integration. The initial state is stored and the system is reset to the initial state when the integration is done. * `order_function`: object like :py:class:`.OrderParameter` The object used for calculating the order parameter. * `interfaces`: list of floats These interfaces define the stopping criterion. msg_file : object like :py:class:`.FileIO` An object we use for writing out messages that are useful for inspecting the status of the current propagation. reverse : boolean, optional If True, the system will be propagated backward in time. Returns ------- success : boolean This is True if we generated an acceptable path. status : string A text description of the current status of the propagation. """ status = f"propagating with AMS (reverse = {reverse})" logger.info(status) success = False interfaces = ens_set["interfaces"] left, _, right = interfaces # Get the current order parameter: order = self.calculate_order(system) msg_file.write( f'# Initial order parameter: {" ".join([str(i) for i in order])}' ) kin_enes = [] pot_enes = [] tot_enes = [] temps = [] traj_file = os.path.join(self.exe_dir, name + "." + self.ext) # First, process input snapshot initial = system.config[0] self._copystate( initial, traj_file ) # Copy only state. Traj file will be written in the loop logger.info( f'AMS internal traj renaming: {traj_file} -> {traj_file+"_0"}' ) self.worker.RenameMDState(traj_file, traj_file + "_0") # Then, add snapshot to path and propagate further if necessary set_trajfile = True set_step_to_zero = True for i in range(path.maxlen): if i == 1: set_trajfile = False set_step_to_zero = False # Calculate the order parameter using the current system: system.vel_rev = True system.set_pos((traj_file, i)) out = self._read_configuration(traj_file, idx=i) order = self.calculate_order( system, xyz=out[0], vel=out[1], box=out[2] ) msg_file.write(f'{i} {" ".join([str(j) for j in order])}') snapshot = { "order": order, "config": (traj_file, i), "vel_rev": reverse, } phase_point = self.snapshot_to_system(system, snapshot) ( status, success, stop, ) = self.add_to_path(path, phase_point, left, right) kin_enes.append( self.states[traj_file][i].get_kineticenergy(unit=self.ene_unit) ) pot_enes.append( self.states[traj_file][i].get_potentialenergy( unit=self.ene_unit ) ) tot_enes.append(kin_enes[-1] + pot_enes[-1]) temps.append(self.states[traj_file][i].get_temperature()) logger.info("OP: %f in frame %s %i", order[0], traj_file, i) msg_file.flush() if stop: logger.info( "AMS propagation ended at %i. Reason: %s", i, status ) if i == 0: # Write the traj file if no MD is going to be done self.worker.PrepareMD(traj_file) self.worker.MolecularDynamics( traj_file + "_0", nsteps=0, setsteptozero=True ) break self.step( system, traj_file, set_trajfile=set_trajfile, set_step_to_zero=set_step_to_zero, ) logger.info("AMS propagation done, obtaining energies") path.update_energies(kin_enes, pot_enes, tot_enes, temps) # Mirror the inf base ``propagate``: accumulate the MD steps this # segment consumed for the scheduler's subcycle accounting. self.steps += path.length msg_file.write("# Propagation done.") msg_file.flush() return success, status
[docs] def modify_velocities( self, system: System, vel_settings: Dict[str, Any], rgen ) -> Tuple[float, float]: """Modify the velocities of the current state. This method will modify the velocities of a time slice. Parameters ---------- ensemble : dict It contains: * `system`: object like :py:class:`.System` This is the system that contains the particles we are investigating. vel_settings: dict It contains: * `sigma_v`: numpy.array, optional These values can be used to set a standard deviation (one for each particle) for the generated velocities. * `aimless`: boolean, optional Determines if we should do aimless shooting or not. * `momentum`: boolean, optional If True, we reset the linear momentum to zero after generating. * `rescale or rescale_energy`: float, optional In some NVE simulations, we may wish to re-scale the energy to a fixed value. If `rescale` is a float > 0, we will re-scale the energy (after modification of the velocities) to match the given float. Returns ------- dek : float The change in the kinetic energy. kin_new : float The new kinetic energy. """ # NOTE (Phase 4b determinism gap): AMS draws kick velocities via # worker.GenerateVelocities (AMS-internal RNG), not the injected # ``rgen``; the argument only satisfies the unified signature. _ = rgen rescale = vel_settings.get( "rescale_energy", vel_settings.get("rescale") ) if rescale is not None and rescale is not False and rescale > 0: msgtxt = "AMS engine does not support energy re-scale." logger.error(msgtxt) raise NotImplementedError(msgtxt) kin_old = system.ekin if vel_settings.get("aimless", False): state_name, idx = system.config logger.info( "Generating velocities for %s, idx=%s", state_name, idx ) prefix = ( self.ens_name + str(os.getpid()) + "_" + str(int(time.time() * 1_000_000) % 1_000_000).zfill(6) ) genvel = os.path.join(self.exe_dir, f"genvel_{prefix}." + self.ext) if state_name in self.states: # If kicking from new MD state, prepare it self._copystate(state_name, genvel, idx=idx) else: self._extract_frame(state_name, idx, genvel) state = self.worker.GenerateVelocities( genvel, self.temperature, randomvelocitiesmethod=self.random_velocities_method, setsteptozero=True, ) # Can be removed, but nice way to check velocity generation logger.info( "AMS Epot: %f Ekin: %f", state.get_potentialenergy(unit=self.ene_unit), state.get_kineticenergy(unit=self.ene_unit), ) # Write rkf file logger.info("AMS setting output file: %s", genvel) if os.path.exists( genvel ): # File must never be there before PrepareMD self._removefile(genvel, disk_only=True) self.worker.PrepareMD(genvel) # self.worker.MolecularDynamics(genvel, nsteps=0) # Update system kin_new = state.get_kineticenergy(unit=self.ene_unit) system.set_pos((genvel, -1)) system.vel_rev = False system.ekin = kin_new system.vpot = state.get_potentialenergy(unit=self.ene_unit) system.etot = system.ekin + system.vpot system.temp = state.get_temperature() self._add_state(genvel, state, rewrite=True) else: # Soft velocity change, from a Gaussian distribution: msgtxt = "AMS engine only support aimless shooting!" logger.error(msgtxt) raise NotImplementedError(msgtxt) if vel_settings.get("momentum", False): pass if kin_old is None or kin_new is None: dek = float("inf") logger.warning( "Kinetic energy not found for previous point." "\n(This happens when the initial configuration " "does not contain energies.)" ) else: dek = kin_new - kin_old return dek, kin_new
def _add_state(self, key, state, rewrite=False): if key not in self.states or rewrite: self.states[key] = [] self.states[key].append(state) else: self.states[key].append(state) return def _renamestate(self, source, dest): if dest in self.states: self._deletestate(dest) self.states[dest] = self.states[source] # Dirty usage of filename to recognize trajectories and snapshots # because we do not have system object with index here... if "traj" in os.path.basename(source): logger.info("AMS Moving traj: %s -> %s", source, dest) for i in range(len(self.states[dest])): self.worker.RenameMDState( source + "_" + str(i), dest + "_" + str(i) ) else: logger.info("AMS Moving snap: %s -> %s", source, dest) self.worker.RenameMDState(source, dest) del self.states[source] def _copystate(self, source, dest, idx=-1): if source == dest: print( "------------------------------------------------------------" ) print("WARNING: source == dest in ams._copystate") print("This should only happen in pytest") print( "------------------------------------------------------------" ) pass else: if dest in self.states: logger.info("AMS snap exists delete: %s", dest) self._deletestate(dest) if "traj" not in os.path.basename(source): logger.info("AMS Copying snap to snap: %s -> %s", source, dest) self.states[dest] = [copy.deepcopy(self.states[source][0])] self.worker.CopyMDState(source, dest) else: logger.info( "AMS Copying traj snap to snap: %s, %i -> %s", source, idx, dest, ) self.states[dest] = [copy.deepcopy(self.states[source][idx])] self.worker.CopyMDState(source + "_" + str(idx), dest)
[docs] def _deletestate(self, filename): """ Delete the state associated with the given filename. This method distinguishes between trajectory and snapshot files based on the presence of the substring "traj" in the filename. For trajectory files, it deletes multiple states indexed by appending an underscore and an index to the filename. For snapshot files, it deletes the state directly. Args: filename (str): The name of the file whose state is to be deleted. Raises: KeyError: If the filename is not found in the states dictionary. """ # Dirty usage of filename to recognize trajectories and snapshots # because we do not have system object with index here... if "traj" in os.path.basename(filename): logger.info("AMS Deleting traj: %s", filename) # self.worker.DeleteMDState(filename) # self.oldstates.append(filename) for i in range(len(self.states[filename])): self.worker.DeleteMDState(filename + "_" + str(i)) else: logger.info("AMS Deleting snap: %s", filename) try: self.worker.DeleteMDState(filename) except Exception as e: if "MD state with given title not found" in str(e): print("MD state with given title not found: ", filename) logger.info("AMS error in DeleteMDState: %s", str(e)) else: print("else: tried but failed but wrong") raise e del self.states[filename]
[docs] def _movefile(self, source, dest): """ Move a file from source to destination and update internal states. This method moves a file from the specified source path to the destination path using the superclass's _movefile method. Additionally, it updates the internal states if the source path is present in the states. Args: source (str): The path of the file to be moved. dest (str): The destination path where the file should be moved. """ super()._movefile(source, dest) # When swapping paths and running from restart, # all MD states might not be in memory if source in self.states: self._renamestate(source, dest)
[docs] def _copyfile(self, source, dest): """ Copy a file from source to destination and updates the state. This method first calls the superclass's _copyfile method to perform the actual file copy. If the destination file is already in the states, it deletes the existing state. Finally, it copies the state from the source to the destination. Args: source (str): The path to the source file. dest (str): The path to the destination file. """ super()._copyfile(source, dest) if dest in self.states: self._deletestate(dest) self._copystate(source, dest)
[docs] def _removefile(self, filename, disk_only=False): """ Remove a file from the system and optionally from the internal state. This method removes a file by calling the superclass's _removefile method. It can also remove the file from the internal state if it represents a molecular dynamics (MD) state. Args: filename (str): The name of the file to be removed. disk_only (bool, optional): If True, only remove the file from the disk and not from the internal state. Defaults to False. Returns: None """ super()._removefile(filename) # We could use os.remove() directly, but let's be consistent if disk_only: return # We have to check if file represents a MD state, # because the method is also called for removing log files # or states are not present when restarting if filename in self.states: self._deletestate(filename)