"""An ASE integrator interface."""
from __future__ import annotations
import logging
import os
from pathlib import Path
from typing import TYPE_CHECKING, Dict, Tuple, Union
import numpy as np
from ase import units
from ase.io import read, write
from ase.io.trajectory import Trajectory
from ase.md.langevin import Langevin
from ase.md.velocitydistribution import (
MaxwellBoltzmannDistribution,
Stationary,
)
from ase.md.verlet import VelocityVerlet
from pyretis.engines.external import ExternalMDEngine
from pyretis.engines.factory import import_external as create_external
if TYPE_CHECKING:
from pyretis.core.path import Path as InfPath
from pyretis.core._system_inf import System
from pyretis.inout.fileio import FileIO
# ``FileIO``, ``InfPath``, ``System`` are used only as type annotations,
# resolved lazily via PEP 563 deferred evaluation. The corresponding
# modules will land in Phase 2.
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())
[docs]class ASEEngine(ExternalMDEngine):
"""An ASE engine class."""
engine_type = "internal"
needs_order = False
[docs] def __init__(
self,
timestep: float,
temperature: float,
subcycles: int,
input_path: str,
integrator: str,
calculator_settings: Dict,
langevin_friction: float = -1.0,
langevin_fixcm: float = -1.0,
exe_path: Union[str, Path] = Path(".").resolve(),
):
"""
Initialize the ase engine.
langevin_fixcm: removes center of mass motion. Should not be used for
low dimensional systems like double well.
"""
super().__init__("ASE external engine", timestep, subcycles)
# The native base defaults exe_dir to ``None`` and defines no step
# counter; restore the inf-base defaults the scheduler relies on
# (exe_dir ``"."`` for standalone use, ``self.steps`` for subcycle
# accounting in ``_tis_inf.run_md``).
self.exe_dir = "."
self.steps = 0
self.timestep = timestep
self.subcycles = subcycles
self.temperature = temperature
self.input_path = Path(exe_path) / input_path
self.ext = "traj"
self.name = "ase"
# TODO: make this non-manual
# by reading in from .toml or .py?
# Create calculator
self.calc = create_external(calculator_settings, "ASE calculator", [])
# integrator stuff
integrator = integrator.lower()
integrator_map = {
"langevin": Langevin,
"velocityverlet": VelocityVerlet,
}
if integrator not in integrator_map:
raise ValueError(f"{integrator} not in integrator map.")
self.integrator_class = integrator_map[integrator]
if integrator == "langevin":
if langevin_fixcm == -1.0:
raise ValueError("'langevin_fixcm' not set in [engine]")
if langevin_friction == -1.0:
raise ValueError("'langevin_friction' not set in [engine]")
self.integrator_settings = {
"timestep": self.timestep * units.fs,
"temperature_K": self.temperature,
"friction": langevin_friction / units.fs,
"fixcm": langevin_fixcm,
}
elif integrator == "velocityverlet":
self.integrator_settings = {
"timestep": self.timestep * units.fs,
}
self.kb = 8.61733326e-5 # eV/K
self._beta = 1 / (self.temperature * self.kb)
def _extract_frame(self, traj_file: str, idx: int, out_file: str) -> None:
traj = Trajectory(traj_file)
atoms = traj[idx]
traj.close()
write(out_file, atoms)
def _read_configuration(
self,
filename: str,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, None]:
atoms = read(filename)
if isinstance(atoms, list):
atoms = atoms[0]
return (
atoms.positions,
atoms.get_velocities(),
atoms.cell.diagonal(),
None,
)
[docs] def _recompute_partial_orders(self, partial_traj, reverse):
"""Order parameters per frame of a partial ASE ``.traj`` dump.
``recalculate_order`` has no ASE-trajectory handler, so the frames
are read directly via ASE. ``Trajectory`` exposes only complete
frames, so a final frame left half-written by an interruption is
naturally excluded -- the surviving frames are kept ("use the last
good frame" recovery, as for the other engines).
"""
# Lazy imports to keep the core dependency confined to the resume
# path and mirror the rest of this module's structure.
# pylint: disable=import-outside-toplevel
from pyretis.core import System, ParticlesExt
from pyretis.core.systemhelp import update_system_box
if self.order_function is None:
raise ValueError("Order parameter is not defined!")
system = System(box=None)
system.particles = ParticlesExt(dim=3)
orders = []
traj = Trajectory(partial_traj)
try:
for atoms in traj:
vel = atoms.get_velocities()
if reverse and vel is not None:
vel = -1.0 * vel
system.pos = atoms.positions
system.vel = vel
update_system_box(system, atoms.cell.diagonal())
orders.append(self.order_function.calculate(system))
finally:
traj.close()
return orders
[docs] def step(self, system, name):
"""Perform a single MD step with the ASE engine.
This engine is never kicked; ``step`` exists only so the class
can be instantiated as a concrete subclass of the abstract
:py:class:`.ExternalMDEngine` base.
"""
raise NotImplementedError(
"ASEEngine does not implement single-step integration; "
"it is not intended to be used for kick initialisation."
)
[docs] def calculate_order(self, system, xyz=None, vel=None, box=None):
"""Calculate the order parameter.
The engine's ``_read_configuration`` returns the box as the
third element (positions, velocities, box, names), so this
override reads them in that order instead of the box-first
ordering assumed by the native external base.
"""
if any((xyz is None, vel is None, box is None)):
out = self._read_configuration(system.config[0])
xyz = out[0]
vel = out[1]
box = out[2]
if xyz is not None:
system.pos = xyz
if vel is not None:
system.vel = vel * -1.0 if system.vel_rev else vel
if box is not None:
system.box = box
if self.order_function is None:
raise ValueError("Order parameter is not defined!")
return self.order_function.calculate(system)
[docs] @staticmethod
def add_to_path(path, phase_point, left, right):
"""Add a phase point and check for stopping conditions.
Uses the inf logic where a crossing wins on the boundary; the
engine's ``_propagate_from`` unpacks the three returned values
(status, success, stop).
"""
status = "Running propagate..."
success = False
stop = False
path.append(phase_point)
if path.phasepoints[-1].order[0] < left:
status = "Crossed left interface!"
success = True
stop = True
elif path.phasepoints[-1].order[0] > right:
status = "Crossed right interface!"
success = True
stop = True
elif path.length >= path.maxlen:
status = "Max. path length exceeded"
success = False
stop = True
return status, success, stop
[docs] def dump_phasepoint(self, phasepoint, deffnm="conf"):
"""Dump a phase point to a file using the inf convention."""
pos_file = self.dump_config(phasepoint.config, deffnm=deffnm)
phasepoint.set_pos((pos_file, 0))
def _propagate_from(
self,
name: str,
path: InfPath,
system: System,
ens_set: Dict,
msg_file: FileIO,
reverse: bool = False,
) -> Tuple[bool, str]:
logger.info("Propagating with ASE (reverse = %s)", reverse)
interfaces = ens_set["interfaces"]
left, _, right = interfaces
initial_conf = system.config[0]
atoms = read(initial_conf)
if isinstance(atoms, list):
atoms = atoms[0]
# TODO: Fix box stuff, now it only takes lengths and not angles
order = self.calculate_order(
system,
xyz=atoms.positions,
vel=atoms.get_velocities(),
box=atoms.cell.diagonal(),
)
msg_file.write(
f'# Initial order parameter: {" ".join([str(i) for i in order])}'
)
traj_file = os.path.join(self.exe_dir, f"{name}.traj")
traj = Trajectory(traj_file, "w")
msg_file.write(f"# Trajectory file is: {traj_file}")
dyn = self.integrator_class(atoms, **self.integrator_settings)
atoms.calc = self.calc
# we give the calculator object the system and order
# information in case it is needed during force calculations
# using e.g. force mixing with quantis. Can't give it directly
# because the oderparameter is only calculated every subcycles
self.calc.inf_system = system
self.calc.inf_calculate_order = self.calculate_order
# TODO: check order, here we first calculate the forces of init conf
# and then continue to the main loop. Is this correct?
# This way the forces are set in self.calc.results from
# the given initial configuration
self.calc.calculate(atoms)
step_nr = 0
ekin = []
vpot = []
temp = []
etot = []
# integrator step is taken at the end of every loop,
# such that frame 0 is also written
for i in range(self.subcycles * path.maxlen):
energy = self.calc.results["energy"]
forces = self.calc.results["forces"]
stress = self.calc.results.get("stress", None)
if (i) % (self.subcycles) == 0:
ekin.append(atoms.get_kinetic_energy())
vpot.append(self.calc.results["energy"])
etot.append(atoms.get_total_energy())
temp.append(atoms.get_temperature())
# NOTE: Writing atoms removes all results from
# the calculator (and therefore atoms)!
traj.write(atoms, forces=forces, energy=energy, stress=stress)
order = self.calculate_order(
system,
xyz=atoms.positions,
vel=atoms.get_velocities(),
box=atoms.cell.diagonal(),
)
msg_file.write(
f'{step_nr} {" ".join([str(j) for j in order])}'
)
snapshot = {
"order": order,
"config": (traj_file, step_nr),
"vel_rev": reverse,
}
phase_point = self.snapshot_to_system(system, snapshot)
status, success, stop = self.add_to_path(
path, phase_point, left, right
)
if stop:
logger.info(
"ASE propagation ended at step %s. Reason: %s",
step_nr,
status,
)
break
step_nr += 1
dyn.step(forces=forces)
msg_file.write("# Propagation done.")
traj.close()
path.update_energies(ekin, vpot, etot, temp)
# Mirror the inf base ``propagate``: accumulate the MD steps this
# segment consumed for the scheduler's subcycle accounting.
self.steps += path.length
return success, status
[docs] def modify_velocities(
self, system: System, vel_settings: Dict, rgen
) -> Tuple[float, float]:
"""Modify the velocities.
TODO: what about fixcm with langevin integrator
and zero_momentum?
"""
# NOTE (Phase 4b determinism gap): ASE draws the kick velocities
# from MaxwellBoltzmannDistribution (ASE's own global RNG), not
# from the injected ``rgen``; the argument only satisfies the
# unified signature. Wiring ASE's RNG to ``rgen`` is a separate,
# user-gated decision.
_ = rgen
fname = self.dump_frame(system)
atoms = read(fname)
if isinstance(atoms, list):
atoms = atoms[0]
kin_old = atoms.get_kinetic_energy()
MaxwellBoltzmannDistribution(atoms, temperature_K=self.temperature)
kin_new = atoms.get_kinetic_energy()
if vel_settings.get("zero_momentum", False):
# TODO: should we preserve temperature or not?
# The other engines do not bother to preserve the temperature
Stationary(atoms, preserve_temperature=False)
conf_out = os.path.join(self.exe_dir, "genvel.traj")
atoms.write(conf_out)
system.config = (conf_out, 0)
system.ekin = kin_new
if kin_old == 0.0:
dek = float("inf")
logger.info(
"Kinetic energy not found for previous point."
"\n(This happens when the initial configuration "
"does not contain energies.)"
)
else:
dek = kin_new - kin_old
return dek, kin_new
def _reverse_velocities(self, filename: str, outfile: str) -> None:
atoms = read(filename)
if isinstance(atoms, list):
atoms = atoms[0]
vel = atoms.get_velocities()
atoms.set_velocities(-vel)
write(outfile, atoms)