# Copyright (c) 2023, infRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""A AMS external MD integrator interface.
This module defines a class for using AMS as an external engine.
Important classes defined here
------------------------------
AMSEngine (:py:class:`.AMSEngine`)
A class responsible for interfacing AMS.
"""
# pylint: disable=import-error
from __future__ import annotations
import copy
import logging
import os
import time
import weakref
from typing import TYPE_CHECKING, Any, Dict, Tuple
import numpy as np
from scm.plams.interfaces.adfsuite.ams import AMSJob
from scm.plams.interfaces.adfsuite.amsworker import AMSWorker
from scm.plams.tools.units import Units
from scm.plams.trajectories.rkffile import RKFTrajectoryFile
from pyretis.engines.external import ExternalMDEngine
from pyretis.engines._parts import box_matrix_to_list
if TYPE_CHECKING:
from pyretis.core.path import Path as InfPath
from pyretis.core._system_inf import System
from pyretis.inout.fileio import FileIO
# ``FileIO``, ``InfPath``, ``System`` are type-only references kept as
# PEP 563 forward strings until Phase 2 ports those modules.
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())
[docs]class AMSEngine(ExternalMDEngine): # , metaclass=Singleton):
"""
A class for interfacing AMS.
This class defines the interface to AMS.
Attributes
----------
input_path : string
The directory where the input files are stored.
input_files : dict of strings
The names of the input files. We expect to find the keys
``'conf'``, ``'input'`` ``'topology'``.
ext_time : float
The time to extend simulations by. It is equal to
``timestep * subcycles``.
"""
engine_type = 'internal'
needs_order = False
[docs] def __init__(self, input_path, timestep, subcycles):
"""Set up the AMS engine.
Parameters
----------
input_path : string
The absolute path to where the input files are stored.
timestep : float
The time step used in the GROMACS MD simulation.
subcycles : integer
The number of steps each GROMACS MD run is composed of.
"""
super().__init__("AMS engine", timestep, subcycles)
self.ext = "rkf"
# The native base defaults exe_dir to ``None`` and defines no step
# counter; restore the inf-base defaults the scheduler relies on
# (exe_dir ``"."`` before ``set_mdrun`` points it at the per-worker
# directory, ``self.steps`` for subcycle accounting).
self.exe_dir = "."
self.steps = 0
# Units of AMS output, must correspond to set infRETIS units
self.ene_unit = "kJ/mol"
self.dist_unit = "nm"
self.time_unit = "ps"
self.name = "ams"
# Store MD states
self.states = {}
self.oldstates = []
self.ens_name = "init"
self.n_init = 0
# If input trajectories have different boxsize set to True:
self.update_box = True
# Add input path and the input files:
self.input_path = os.path.abspath(input_path)
self.set_idx = False
# Expected input files
self.input_files = {
"input": "ams.inp",
}
# Read AMS input
inpf = os.path.join(self.input_path, self.input_files["input"])
with open(inpf, encoding="utf-8") as read_file:
inp = read_file.read()
# extract input geometry
geometry_file_line = next(
(line for line in inp.splitlines() if "GeometryFile" in line), None
)
if geometry_file_line is not None:
geometry_file_path = geometry_file_line.split()[-1].strip()
# get absolute path as string
geometry_file_path = str(
os.path.abspath(
os.path.join(self.input_path, geometry_file_path)
)
)
if "traj" in os.path.basename(geometry_file_path):
logger.error("GeometryFile is not allowed to contain 'traj'")
logger.error(
"Extracted GeometryFile path: %s", geometry_file_path
)
raise ValueError(
"GeometryFile is not allowed to contain 'traj'. "
f"Extracted GeometryFile path: {geometry_file_path}"
)
else:
logger.info(
"AMS: GeometryFile was not set in AMS input file! "
+ "- Will fail in the case InfInit is used"
)
job = AMSJob.from_input(inp)
settings = job.settings
if hasattr(settings.input.ams, "System"):
del settings.input.ams.System
molecule = job.molecule[""]
if self.update_box:
self.molecule_lattice = molecule.lattice # Check input settings
self.temperature = (
settings.input.ams.moleculardynamics.initialvelocities.temperature
)
if len(self.temperature) == 0:
logger.error("AMS: InitialVelocities Temperature was not set!")
raise ValueError("AMS: InitialVelocities Temperature was not set!")
self.temperature = float(self.temperature)
ams_timestep = settings.input.ams.moleculardynamics.timestep
if len(ams_timestep) == 0:
# Default timestep in AMS, but unknown it here
logger.error("AMS: Timestep was not set!")
raise ValueError("AMS: Timestep was not set!")
ams_timestep = float(ams_timestep) * Units.conversion_ratio(
"fs", self.time_unit
)
if timestep != ams_timestep:
logger.error("Mismatch between AMS and InfRETIS timestep!")
raise ValueError("Mismatch between AMS and InfRETIS timestep!")
self.random_velocities_method = None
initial_velocities = (
settings.input.ams.moleculardynamics.initialvelocities
)
random_velocities_method = initial_velocities.randomvelocitiesmethod
if len(random_velocities_method) > 0:
logger.info(
'AMS setting velocity generation method to "%s"',
random_velocities_method,
)
self.random_velocities_method = random_velocities_method
ams_dir = "."
# Start AMS worker
self.worker = AMSWorker(
settings,
workerdir_root=ams_dir,
keep_crashed_workerdir=True,
always_keep_workerdir=True,
)
if geometry_file_line is not None:
self.worker.CreateMDState(geometry_file_path, molecule)
state = self.worker.MolecularDynamics(
geometry_file_path, nsteps=0, setsteptozero=True
) # Also writes frame into out_file
self._add_state(geometry_file_path, state[0])
# print('self.states', self.states)
self._finalize = weakref.finalize(self, self.worker.stop)
[docs] def step(self, system, name, set_trajfile=True, set_step_to_zero=False):
"""Perform a single step with AMS.
Parameters
----------
system : object like :py:class:`.System`
The system we are integrating.
name : string
To name the output files from the AMS step.
set_trajfile : logical
Optional, True if new output file should be opened
set_step_to_zero : logical
Optional, True if new MD step should start from
number zero
Returns
-------
out : string
The name of the output configuration, obtained after
completing the step.
"""
state, idx = system.config
if idx == -1:
prev_ams_state = state # state already contains exe_dir
new_ams_state = os.path.join(self.exe_dir, name) # name does not
new_state = new_ams_state
else:
prev_ams_state = (
state + "_" + str(idx)
) # state already contains exe_dir
new_ams_state = os.path.join(
self.exe_dir, name + "_" + str(idx + 1)
) # name does not
new_state = state
idx = idx + 1
if set_trajfile:
logger.info("AMS setting output file: %s", new_state)
if os.path.exists(
new_state
): # File must never be there before PrepareMD
self._removefile(new_state, disk_only=True)
self.worker.PrepareMD(new_state)
# This might be removed, but nice way to check what is going on
logger.info("AMS step %s -> %s", prev_ams_state, new_ams_state)
self.worker.CopyMDState(prev_ams_state, new_ams_state)
states = self.worker.MolecularDynamics(
new_ams_state,
nsteps=self.subcycles,
trajectorysamplingfrequency=self.subcycles,
checkpointfrequency=0,
pipesamplingfrequency=self.subcycles,
setsteptozero=set_step_to_zero,
)
# Update system
system.set_pos((new_state, idx))
system.vel_rev = False
# Here, we are not concerned if we also got the initial state or not
# from the AMSWorker. Next state is always the last one.
system.vpot = states[-1].get_potentialenergy(unit=self.ene_unit)
system.ekin = states[-1].get_kineticenergy(unit=self.ene_unit)
system.etot = system.vpot + system.ekin
system.temp = states[-1].get_temperature()
# Save state
self._add_state(new_state, states[-1])
return name
[docs] def calculate_order(self, system, xyz=None, vel=None, box=None):
"""Calculate the order parameter from a configuration.
The AMS ``_read_configuration`` returns ``(xyz, vel, box, names)``
with the box as the THIRD element, so this override unpacks the
configuration accordingly (the native external base assumes a
different ordering).
Parameters
----------
system : object like :py:class:`.System`
The system that contains the particles we are investigating.
xyz : numpy.array, optional
The positions to use, in case we have already read them.
vel : numpy.array, optional
The velocities to use, in case we have already read them.
box : numpy.array, optional
The current box vectors, in case we have already read them.
Returns
-------
out : list of floats
The calculated order parameter(s).
"""
if any((xyz is None, vel is None, box is None)):
out = self._read_configuration(system.config[0])
xyz = out[0]
vel = out[1]
box = out[2]
if xyz is not None:
system.pos = xyz
if vel is not None:
system.vel = vel * -1.0 if system.vel_rev else vel
if box is not None:
system.box = box
if self.order_function is None:
raise ValueError("Order parameter is not defined!")
return self.order_function.calculate(system)
[docs] @staticmethod
def add_to_path(path, phase_point, left, right):
"""Add a phase point to the path and check for crossings.
This override preserves the inf-flavour semantics expected by
``_propagate_from``: it returns a 3-tuple and lets a crossing of
either interface win on the boundary (the native external base
returns a 4-tuple with different maxlen handling).
Parameters
----------
path : object like :py:class:`.PathBase`
The path to add to.
phase_point : object like :py:class:`.System`
The phase point to add to the path.
left : float
The left interface.
right : float
The right interface.
Returns
-------
status : string
A text description of the current status of the propagation.
success : boolean
True if we generated an acceptable path.
stop : boolean
True if the propagation should stop.
"""
status = "Running propagate..."
success = False
stop = False
path.append(phase_point)
if path.phasepoints[-1].order[0] < left:
status = "Crossed left interface!"
success = True
stop = True
elif path.phasepoints[-1].order[0] > right:
status = "Crossed right interface!"
success = True
stop = True
elif path.length >= path.maxlen:
status = "Max. path length exceeded"
success = False
stop = True
return status, success, stop
[docs] def dump_phasepoint(self, phasepoint, deffnm="conf"):
"""Dump the configuration of a phase point to a file.
This override preserves byte-identity with the inf-flavour
engines, which stored ``(pos_file, 0)`` as the configuration
index rather than the native ``(pos_file, None)``.
Parameters
----------
phasepoint : object like :py:class:`.System`
The phase point whose configuration we dump to file.
deffnm : string, optional
The default file name for the dumped configuration.
"""
pos_file = self.dump_config(phasepoint.config, deffnm=deffnm)
phasepoint.set_pos((pos_file, 0))
[docs] def _read_configuration(self, filename, idx=-1):
"""Read output from AMS snapshot/trajectory.
Parameters
----------
filename : string
The file to read the configuration from.
idx : integer
Optional, frame index in trajectory
Returns
-------
box : numpy.array
The box dimensions.
xyz : numpy.array
The positions.
vel : numpy.array
The velocities.
"""
if idx == -1:
idx = 0
if not self.states:
logger.info(
"Infinit requirement for 0 paths: self.set_idx = True, idx = 0"
)
self.set_idx = True
self.n_init = 0
state = self.states[filename][idx]
box = state.get_latticevectors(unit=self.dist_unit)
if len(box) == 0:
box = [float("inf"), float("inf"), float("inf")]
else:
box = box_matrix_to_list(box)
xyz = state.get_coords(unit=self.dist_unit)
vel = state.get_velocities(
dist_unit=self.dist_unit, time_unit=self.time_unit
)
return xyz, vel, box, None
[docs] def set_mdrun(self, md_items):
"""Set up the molecular dynamics run with the given parameters.
Parameters
----------
md_items : dict
A dictionary with the keys used here: ``exe_dir`` (str), the
directory where the executable is located, and ``ens`` (dict),
the ensemble information carrying ``ens_name`` (str), the
ensemble name.
Notes
-----
Sets the executable directory and ensemble name, logs the
executable directory, deletes old states that are no longer in
use, and updates the list of old states to the current states.
"""
self.exe_dir = md_items["exe_dir"]
if "ens" in md_items:
self.ens_name = md_items["ens"]["ens_name"] + "_"
else:
self.ens_name = "init"
logger.info(
"self.exe_dir %s md_items['exe_dir'] %s",
self.exe_dir,
md_items["exe_dir"],
)
delete_states = []
for state in self.oldstates:
if state in self.states:
delete_states.append(state)
for state in delete_states:
self._deletestate(state)
self.oldstates = self.states.keys()
[docs] def _reverse_velocities(self, filename, outfile):
"""Reverse velocity in a given snapshot.
Parameters
----------
filename : string
The configuration to reverse velocities in.
outfile : string
The output file for storing the configuration with
reversed velocities.
"""
logger.info("AMS reversing velocities for %s", filename)
if os.path.exists(
outfile
): # File must never be there before PrepareMD
self._removefile(outfile)
self.worker.PrepareMD(outfile)
self._copystate(
filename, outfile
) # copy only the state, file will be written later
# Here we are working with AMS internal representation of velocities,
# thus, we keep the units a.u./a.u.
vel = self.states[filename][0].get_velocities(
dist_unit="au", time_unit="au"
)
rev_vel = -1.0 * vel
self.states[outfile][0]._state["velocities"] = rev_vel
self.worker.SetVelocities(
outfile, rev_vel, dist_unit="au", time_unit="au"
)
self.worker.MolecularDynamics(outfile, nsteps=0, setsteptozero=True)
[docs] def _propagate_from(
self,
name: str,
path: InfPath,
system: System,
ens_set: Dict,
msg_file: FileIO,
reverse: bool = False,
) -> Tuple[bool, str]:
"""
Propagate with AMS from the current system configuration.
Here, we assume that this method is called after the propagate()
has been called in the parent. The parent is then responsible
for reversing the velocities and also for setting the initial
state of the system.
Parameters
----------
name : string
A name to use for the trajectory we are generating.
path : object like :py:class:`infretis.core.path.PathBase`
This is the path we use to fill in phase-space points.
ensemble: dict
It contains:
* `system`: object like :py:class:`.System`
The system object gives the initial state for the
integration. The initial state is stored and the system is
reset to the initial state when the integration is done.
* `order_function`: object like :py:class:`.OrderParameter`
The object used for calculating the order parameter.
* `interfaces`: list of floats
These interfaces define the stopping criterion.
msg_file : object like :py:class:`.FileIO`
An object we use for writing out messages that are useful
for inspecting the status of the current propagation.
reverse : boolean, optional
If True, the system will be propagated backward in time.
Returns
-------
success : boolean
This is True if we generated an acceptable path.
status : string
A text description of the current status of the propagation.
"""
status = f"propagating with AMS (reverse = {reverse})"
logger.info(status)
success = False
interfaces = ens_set["interfaces"]
left, _, right = interfaces
# Get the current order parameter:
order = self.calculate_order(system)
msg_file.write(
f'# Initial order parameter: {" ".join([str(i) for i in order])}'
)
kin_enes = []
pot_enes = []
tot_enes = []
temps = []
traj_file = os.path.join(self.exe_dir, name + "." + self.ext)
# First, process input snapshot
initial = system.config[0]
self._copystate(
initial, traj_file
) # Copy only state. Traj file will be written in the loop
logger.info(
f'AMS internal traj renaming: {traj_file} -> {traj_file+"_0"}'
)
self.worker.RenameMDState(traj_file, traj_file + "_0")
# Then, add snapshot to path and propagate further if necessary
set_trajfile = True
set_step_to_zero = True
for i in range(path.maxlen):
if i == 1:
set_trajfile = False
set_step_to_zero = False
# Calculate the order parameter using the current system:
system.vel_rev = True
system.set_pos((traj_file, i))
out = self._read_configuration(traj_file, idx=i)
order = self.calculate_order(
system, xyz=out[0], vel=out[1], box=out[2]
)
msg_file.write(f'{i} {" ".join([str(j) for j in order])}')
snapshot = {
"order": order,
"config": (traj_file, i),
"vel_rev": reverse,
}
phase_point = self.snapshot_to_system(system, snapshot)
(
status,
success,
stop,
) = self.add_to_path(path, phase_point, left, right)
kin_enes.append(
self.states[traj_file][i].get_kineticenergy(unit=self.ene_unit)
)
pot_enes.append(
self.states[traj_file][i].get_potentialenergy(
unit=self.ene_unit
)
)
tot_enes.append(kin_enes[-1] + pot_enes[-1])
temps.append(self.states[traj_file][i].get_temperature())
logger.info("OP: %f in frame %s %i", order[0], traj_file, i)
msg_file.flush()
if stop:
logger.info(
"AMS propagation ended at %i. Reason: %s", i, status
)
if i == 0:
# Write the traj file if no MD is going to be done
self.worker.PrepareMD(traj_file)
self.worker.MolecularDynamics(
traj_file + "_0", nsteps=0, setsteptozero=True
)
break
self.step(
system,
traj_file,
set_trajfile=set_trajfile,
set_step_to_zero=set_step_to_zero,
)
logger.info("AMS propagation done, obtaining energies")
path.update_energies(kin_enes, pot_enes, tot_enes, temps)
# Mirror the inf base ``propagate``: accumulate the MD steps this
# segment consumed for the scheduler's subcycle accounting.
self.steps += path.length
msg_file.write("# Propagation done.")
msg_file.flush()
return success, status
[docs] def modify_velocities(
self, system: System, vel_settings: Dict[str, Any], rgen
) -> Tuple[float, float]:
"""Modify the velocities of the current state.
This method will modify the velocities of a time slice.
Parameters
----------
ensemble : dict
It contains:
* `system`: object like :py:class:`.System`
This is the system that contains the particles we are
investigating.
vel_settings: dict
It contains:
* `sigma_v`: numpy.array, optional
These values can be used to set a standard deviation (one
for each particle) for the generated velocities.
* `aimless`: boolean, optional
Determines if we should do aimless shooting or not.
* `momentum`: boolean, optional
If True, we reset the linear momentum to zero after
generating.
* `rescale or rescale_energy`: float, optional
In some NVE simulations, we may wish to re-scale the
energy to a fixed value. If `rescale` is a float > 0,
we will re-scale the energy (after modification of
the velocities) to match the given float.
Returns
-------
dek : float
The change in the kinetic energy.
kin_new : float
The new kinetic energy.
"""
# NOTE (Phase 4b determinism gap): AMS draws kick velocities via
# worker.GenerateVelocities (AMS-internal RNG), not the injected
# ``rgen``; the argument only satisfies the unified signature.
_ = rgen
rescale = vel_settings.get(
"rescale_energy", vel_settings.get("rescale")
)
if rescale is not None and rescale is not False and rescale > 0:
msgtxt = "AMS engine does not support energy re-scale."
logger.error(msgtxt)
raise NotImplementedError(msgtxt)
kin_old = system.ekin
if vel_settings.get("aimless", False):
state_name, idx = system.config
logger.info(
"Generating velocities for %s, idx=%s", state_name, idx
)
prefix = (
self.ens_name
+ str(os.getpid())
+ "_"
+ str(int(time.time() * 1_000_000) % 1_000_000).zfill(6)
)
genvel = os.path.join(self.exe_dir, f"genvel_{prefix}." + self.ext)
if state_name in self.states:
# If kicking from new MD state, prepare it
self._copystate(state_name, genvel, idx=idx)
else:
self._extract_frame(state_name, idx, genvel)
state = self.worker.GenerateVelocities(
genvel,
self.temperature,
randomvelocitiesmethod=self.random_velocities_method,
setsteptozero=True,
)
# Can be removed, but nice way to check velocity generation
logger.info(
"AMS Epot: %f Ekin: %f",
state.get_potentialenergy(unit=self.ene_unit),
state.get_kineticenergy(unit=self.ene_unit),
)
# Write rkf file
logger.info("AMS setting output file: %s", genvel)
if os.path.exists(
genvel
): # File must never be there before PrepareMD
self._removefile(genvel, disk_only=True)
self.worker.PrepareMD(genvel)
# self.worker.MolecularDynamics(genvel, nsteps=0)
# Update system
kin_new = state.get_kineticenergy(unit=self.ene_unit)
system.set_pos((genvel, -1))
system.vel_rev = False
system.ekin = kin_new
system.vpot = state.get_potentialenergy(unit=self.ene_unit)
system.etot = system.ekin + system.vpot
system.temp = state.get_temperature()
self._add_state(genvel, state, rewrite=True)
else: # Soft velocity change, from a Gaussian distribution:
msgtxt = "AMS engine only support aimless shooting!"
logger.error(msgtxt)
raise NotImplementedError(msgtxt)
if vel_settings.get("momentum", False):
pass
if kin_old is None or kin_new is None:
dek = float("inf")
logger.warning(
"Kinetic energy not found for previous point."
"\n(This happens when the initial configuration "
"does not contain energies.)"
)
else:
dek = kin_new - kin_old
return dek, kin_new
def _add_state(self, key, state, rewrite=False):
if key not in self.states or rewrite:
self.states[key] = []
self.states[key].append(state)
else:
self.states[key].append(state)
return
def _renamestate(self, source, dest):
if dest in self.states:
self._deletestate(dest)
self.states[dest] = self.states[source]
# Dirty usage of filename to recognize trajectories and snapshots
# because we do not have system object with index here...
if "traj" in os.path.basename(source):
logger.info("AMS Moving traj: %s -> %s", source, dest)
for i in range(len(self.states[dest])):
self.worker.RenameMDState(
source + "_" + str(i), dest + "_" + str(i)
)
else:
logger.info("AMS Moving snap: %s -> %s", source, dest)
self.worker.RenameMDState(source, dest)
del self.states[source]
def _copystate(self, source, dest, idx=-1):
if source == dest:
print(
"------------------------------------------------------------"
)
print("WARNING: source == dest in ams._copystate")
print("This should only happen in pytest")
print(
"------------------------------------------------------------"
)
pass
else:
if dest in self.states:
logger.info("AMS snap exists delete: %s", dest)
self._deletestate(dest)
if "traj" not in os.path.basename(source):
logger.info("AMS Copying snap to snap: %s -> %s", source, dest)
self.states[dest] = [copy.deepcopy(self.states[source][0])]
self.worker.CopyMDState(source, dest)
else:
logger.info(
"AMS Copying traj snap to snap: %s, %i -> %s",
source,
idx,
dest,
)
self.states[dest] = [copy.deepcopy(self.states[source][idx])]
self.worker.CopyMDState(source + "_" + str(idx), dest)
[docs] def _deletestate(self, filename):
"""
Delete the state associated with the given filename.
This method distinguishes between trajectory and snapshot files based
on the presence of the substring "traj" in the filename. For trajectory
files, it deletes multiple states indexed by appending an underscore
and an index to the filename. For snapshot files, it deletes the state
directly.
Args:
filename (str): The name of the file whose state is to be deleted.
Raises:
KeyError: If the filename is not found in the states dictionary.
"""
# Dirty usage of filename to recognize trajectories and snapshots
# because we do not have system object with index here...
if "traj" in os.path.basename(filename):
logger.info("AMS Deleting traj: %s", filename)
# self.worker.DeleteMDState(filename)
# self.oldstates.append(filename)
for i in range(len(self.states[filename])):
self.worker.DeleteMDState(filename + "_" + str(i))
else:
logger.info("AMS Deleting snap: %s", filename)
try:
self.worker.DeleteMDState(filename)
except Exception as e:
if "MD state with given title not found" in str(e):
print("MD state with given title not found: ", filename)
logger.info("AMS error in DeleteMDState: %s", str(e))
else:
print("else: tried but failed but wrong")
raise e
del self.states[filename]
[docs] def _movefile(self, source, dest):
"""
Move a file from source to destination and update internal states.
This method moves a file from the specified source path to the
destination path using the superclass's _movefile method. Additionally,
it updates the internal states if the source path is present in the
states.
Args:
source (str): The path of the file to be moved.
dest (str): The destination path where the file should be moved.
"""
super()._movefile(source, dest)
# When swapping paths and running from restart,
# all MD states might not be in memory
if source in self.states:
self._renamestate(source, dest)
[docs] def _copyfile(self, source, dest):
"""
Copy a file from source to destination and updates the state.
This method first calls the superclass's _copyfile method to perform
the actual file copy. If the destination file is already in the states,
it deletes the existing state. Finally, it copies the state from the
source to the destination.
Args:
source (str): The path to the source file.
dest (str): The path to the destination file.
"""
super()._copyfile(source, dest)
if dest in self.states:
self._deletestate(dest)
self._copystate(source, dest)
[docs] def _removefile(self, filename, disk_only=False):
"""
Remove a file from the system and optionally from the internal state.
This method removes a file by calling the superclass's _removefile
method. It can also remove the file from the internal state if it
represents a molecular dynamics (MD) state.
Args:
filename (str): The name of the file to be removed.
disk_only (bool, optional): If True, only remove the file from the
disk and not from the internal state. Defaults to False.
Returns:
None
"""
super()._removefile(filename)
# We could use os.remove() directly, but let's be consistent
if disk_only:
return
# We have to check if file represents a MD state,
# because the method is also called for removing log files
# or states are not present when restarting
if filename in self.states:
self._deletestate(filename)