Source code for pyretis.engines.ase

"""An ASE integrator interface."""

from __future__ import annotations

import logging
import os
from pathlib import Path
from typing import TYPE_CHECKING, Dict, Tuple, Union

import numpy as np
from ase import units
from ase.io import read, write
from ase.io.trajectory import Trajectory
from ase.md.langevin import Langevin
from ase.md.velocitydistribution import (
    MaxwellBoltzmannDistribution,
    Stationary,
)
from ase.md.verlet import VelocityVerlet

from pyretis.engines.external import ExternalMDEngine
from pyretis.engines.factory import import_external as create_external

if TYPE_CHECKING:
    from pyretis.core.path import Path as InfPath
    from pyretis.core._system_inf import System
    from pyretis.inout.fileio import FileIO

# ``FileIO``, ``InfPath``, ``System`` are used only as type annotations,
# resolved lazily via PEP 563 deferred evaluation. The corresponding
# modules will land in Phase 2.

logger = logging.getLogger(__name__)  # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())


[docs]class ASEEngine(ExternalMDEngine): """An ASE engine class.""" engine_type = "internal" needs_order = False
[docs] def __init__( self, timestep: float, temperature: float, subcycles: int, input_path: str, integrator: str, calculator_settings: Dict, langevin_friction: float = -1.0, langevin_fixcm: float = -1.0, exe_path: Union[str, Path] = Path(".").resolve(), ): """ Initialize the ase engine. langevin_fixcm: removes center of mass motion. Should not be used for low dimensional systems like double well. """ super().__init__("ASE external engine", timestep, subcycles) # The native base defaults exe_dir to ``None`` and defines no step # counter; restore the inf-base defaults the scheduler relies on # (exe_dir ``"."`` for standalone use, ``self.steps`` for subcycle # accounting in ``_tis_inf.run_md``). self.exe_dir = "." self.steps = 0 self.timestep = timestep self.subcycles = subcycles self.temperature = temperature self.input_path = Path(exe_path) / input_path self.ext = "traj" self.name = "ase" # TODO: make this non-manual # by reading in from .toml or .py? # Create calculator self.calc = create_external(calculator_settings, "ASE calculator", []) # integrator stuff integrator = integrator.lower() integrator_map = { "langevin": Langevin, "velocityverlet": VelocityVerlet, } if integrator not in integrator_map: raise ValueError(f"{integrator} not in integrator map.") self.integrator_class = integrator_map[integrator] if integrator == "langevin": if langevin_fixcm == -1.0: raise ValueError("'langevin_fixcm' not set in [engine]") if langevin_friction == -1.0: raise ValueError("'langevin_friction' not set in [engine]") self.integrator_settings = { "timestep": self.timestep * units.fs, "temperature_K": self.temperature, "friction": langevin_friction / units.fs, "fixcm": langevin_fixcm, } elif integrator == "velocityverlet": self.integrator_settings = { "timestep": self.timestep * units.fs, } self.kb = 8.61733326e-5 # eV/K self._beta = 1 / (self.temperature * self.kb)
def _extract_frame(self, traj_file: str, idx: int, out_file: str) -> None: traj = Trajectory(traj_file) atoms = traj[idx] traj.close() write(out_file, atoms) def _read_configuration( self, filename: str, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, None]: atoms = read(filename) if isinstance(atoms, list): atoms = atoms[0] return ( atoms.positions, atoms.get_velocities(), atoms.cell.diagonal(), None, )
[docs] def _recompute_partial_orders(self, partial_traj, reverse): """Order parameters per frame of a partial ASE ``.traj`` dump. ``recalculate_order`` has no ASE-trajectory handler, so the frames are read directly via ASE. ``Trajectory`` exposes only complete frames, so a final frame left half-written by an interruption is naturally excluded -- the surviving frames are kept ("use the last good frame" recovery, as for the other engines). """ # Lazy imports to keep the core dependency confined to the resume # path and mirror the rest of this module's structure. # pylint: disable=import-outside-toplevel from pyretis.core import System, ParticlesExt from pyretis.core.systemhelp import update_system_box if self.order_function is None: raise ValueError("Order parameter is not defined!") system = System(box=None) system.particles = ParticlesExt(dim=3) orders = [] traj = Trajectory(partial_traj) try: for atoms in traj: vel = atoms.get_velocities() if reverse and vel is not None: vel = -1.0 * vel system.pos = atoms.positions system.vel = vel update_system_box(system, atoms.cell.diagonal()) orders.append(self.order_function.calculate(system)) finally: traj.close() return orders
[docs] def step(self, system, name): """Perform a single MD step with the ASE engine. This engine is never kicked; ``step`` exists only so the class can be instantiated as a concrete subclass of the abstract :py:class:`.ExternalMDEngine` base. """ raise NotImplementedError( "ASEEngine does not implement single-step integration; " "it is not intended to be used for kick initialisation." )
[docs] def calculate_order(self, system, xyz=None, vel=None, box=None): """Calculate the order parameter. The engine's ``_read_configuration`` returns the box as the third element (positions, velocities, box, names), so this override reads them in that order instead of the box-first ordering assumed by the native external base. """ if any((xyz is None, vel is None, box is None)): out = self._read_configuration(system.config[0]) xyz = out[0] vel = out[1] box = out[2] if xyz is not None: system.pos = xyz if vel is not None: system.vel = vel * -1.0 if system.vel_rev else vel if box is not None: system.box = box if self.order_function is None: raise ValueError("Order parameter is not defined!") return self.order_function.calculate(system)
[docs] @staticmethod def add_to_path(path, phase_point, left, right): """Add a phase point and check for stopping conditions. Uses the inf logic where a crossing wins on the boundary; the engine's ``_propagate_from`` unpacks the three returned values (status, success, stop). """ status = "Running propagate..." success = False stop = False path.append(phase_point) if path.phasepoints[-1].order[0] < left: status = "Crossed left interface!" success = True stop = True elif path.phasepoints[-1].order[0] > right: status = "Crossed right interface!" success = True stop = True elif path.length >= path.maxlen: status = "Max. path length exceeded" success = False stop = True return status, success, stop
[docs] def dump_phasepoint(self, phasepoint, deffnm="conf"): """Dump a phase point to a file using the inf convention.""" pos_file = self.dump_config(phasepoint.config, deffnm=deffnm) phasepoint.set_pos((pos_file, 0))
def _propagate_from( self, name: str, path: InfPath, system: System, ens_set: Dict, msg_file: FileIO, reverse: bool = False, ) -> Tuple[bool, str]: logger.info("Propagating with ASE (reverse = %s)", reverse) interfaces = ens_set["interfaces"] left, _, right = interfaces initial_conf = system.config[0] atoms = read(initial_conf) if isinstance(atoms, list): atoms = atoms[0] # TODO: Fix box stuff, now it only takes lengths and not angles order = self.calculate_order( system, xyz=atoms.positions, vel=atoms.get_velocities(), box=atoms.cell.diagonal(), ) msg_file.write( f'# Initial order parameter: {" ".join([str(i) for i in order])}' ) traj_file = os.path.join(self.exe_dir, f"{name}.traj") traj = Trajectory(traj_file, "w") msg_file.write(f"# Trajectory file is: {traj_file}") dyn = self.integrator_class(atoms, **self.integrator_settings) atoms.calc = self.calc # we give the calculator object the system and order # information in case it is needed during force calculations # using e.g. force mixing with quantis. Can't give it directly # because the oderparameter is only calculated every subcycles self.calc.inf_system = system self.calc.inf_calculate_order = self.calculate_order # TODO: check order, here we first calculate the forces of init conf # and then continue to the main loop. Is this correct? # This way the forces are set in self.calc.results from # the given initial configuration self.calc.calculate(atoms) step_nr = 0 ekin = [] vpot = [] temp = [] etot = [] # integrator step is taken at the end of every loop, # such that frame 0 is also written for i in range(self.subcycles * path.maxlen): energy = self.calc.results["energy"] forces = self.calc.results["forces"] stress = self.calc.results.get("stress", None) if (i) % (self.subcycles) == 0: ekin.append(atoms.get_kinetic_energy()) vpot.append(self.calc.results["energy"]) etot.append(atoms.get_total_energy()) temp.append(atoms.get_temperature()) # NOTE: Writing atoms removes all results from # the calculator (and therefore atoms)! traj.write(atoms, forces=forces, energy=energy, stress=stress) order = self.calculate_order( system, xyz=atoms.positions, vel=atoms.get_velocities(), box=atoms.cell.diagonal(), ) msg_file.write( f'{step_nr} {" ".join([str(j) for j in order])}' ) snapshot = { "order": order, "config": (traj_file, step_nr), "vel_rev": reverse, } phase_point = self.snapshot_to_system(system, snapshot) status, success, stop = self.add_to_path( path, phase_point, left, right ) if stop: logger.info( "ASE propagation ended at step %s. Reason: %s", step_nr, status, ) break step_nr += 1 dyn.step(forces=forces) msg_file.write("# Propagation done.") traj.close() path.update_energies(ekin, vpot, etot, temp) # Mirror the inf base ``propagate``: accumulate the MD steps this # segment consumed for the scheduler's subcycle accounting. self.steps += path.length return success, status
[docs] def modify_velocities( self, system: System, vel_settings: Dict, rgen ) -> Tuple[float, float]: """Modify the velocities. TODO: what about fixcm with langevin integrator and zero_momentum? """ # NOTE (Phase 4b determinism gap): ASE draws the kick velocities # from MaxwellBoltzmannDistribution (ASE's own global RNG), not # from the injected ``rgen``; the argument only satisfies the # unified signature. Wiring ASE's RNG to ``rgen`` is a separate, # user-gated decision. _ = rgen fname = self.dump_frame(system) atoms = read(fname) if isinstance(atoms, list): atoms = atoms[0] kin_old = atoms.get_kinetic_energy() MaxwellBoltzmannDistribution(atoms, temperature_K=self.temperature) kin_new = atoms.get_kinetic_energy() if vel_settings.get("zero_momentum", False): # TODO: should we preserve temperature or not? # The other engines do not bother to preserve the temperature Stationary(atoms, preserve_temperature=False) conf_out = os.path.join(self.exe_dir, "genvel.traj") atoms.write(conf_out) system.config = (conf_out, 0) system.ekin = kin_new if kin_old == 0.0: dek = float("inf") logger.info( "Kinetic energy not found for previous point." "\n(This happens when the initial configuration " "does not contain energies.)" ) else: dek = kin_new - kin_old return dek, kin_new
def _reverse_velocities(self, filename: str, outfile: str) -> None: atoms = read(filename) if isinstance(atoms, list): atoms = atoms[0] vel = atoms.get_velocities() atoms.set_velocities(-vel) write(outfile, atoms)