Source code for pyretis.engines.turtlemd

"""A TurtleMD integrator interface.

This module defines a class for using the TurtleMD.

"""

from __future__ import annotations

import logging
import os
from collections import defaultdict
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple

import numpy as np
from turtlemd.integrators import (
    LangevinInertia,
    LangevinOverdamped,
    VelocityVerlet,
    Verlet,
)
from turtlemd.potentials.lennardjones import LennardJonesCut
from turtlemd.potentials.well import DoubleWell, DoubleWellPair
from turtlemd.simulation import MDSimulation
from turtlemd.system.box import Box as TBox
from turtlemd.system.particles import Particles as TParticles
from turtlemd.system.system import System as TSystem

from pyretis.engines.external import ExternalMDEngine
from pyretis.engines._parts import (
    convert_snapshot,
    kinetic_energy,
    read_xyz_file,
    reset_momentum,
    write_xyz_trajectory,
)

if TYPE_CHECKING:
    from pyretis.core.path import Path
    from pyretis.core._system_inf import System
    from pyretis.inout.fileio import FileIO

# Type-only references (``FileIO``, ``Path``, ``System``) stay as forward
# strings, kept valid by ``from __future__ import annotations`` at the
# top of the module. Phase 2 ports the corresponding modules so static
# type checkers can resolve them.


logger = logging.getLogger(__name__)  # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())

POTENTIAL_MAPS = {
    "doublewell": DoubleWell,
    "lennardjones": LennardJonesCut,
    "doublewellpair": DoubleWellPair,
}

INTEGRATOR_MAPS = {
    "langevininertia": LangevinInertia,
    "langevinoverdamped": LangevinOverdamped,
    "velocityverlet": VelocityVerlet,
    "verlet": Verlet,
}


[docs]class TurtleMDEngine(ExternalMDEngine): """Interface the TurtleMD engine. To do: * Add support for multiple potentials? * Velocity generation adds needs to account for the dimensionality of the system """ engine_type = "internal" needs_order = False
[docs] def __init__( self, timestep: float, subcycles: int, temperature: float, boltzmann: float, integrator: Dict[str, Any], potential: Dict[str, Any], particles: Dict[str, Any], box: Dict[str, Any], ): """Initialize the TurtleMD engine. Args: timestep: The simulation timestep. subcycles: The number of subcycles to execute per InfRetis step. temperature: The temperature of the simulation. boltzmann: The value of Boltzmanns constant (kB). integrator: The name of the integrator to use for TurtleMD. potential: The name of the potential to use for TurtleMD. particles: The mass and name of the particles in the system. box: Definition of the simulation box. """ self.temperature = temperature self.timestep = timestep self.subcycles = subcycles self.name = "turtlemd" super().__init__( "TurtleMD internal engine", self.timestep, self.subcycles ) # The native base defaults exe_dir to ``None`` and defines no step # counter; the infinite-swapping scheduler sets the per-worker # exe_dir via ``set_mdrun`` but still expects the inf-base defaults # for standalone use (``"."``) and reads ``self.steps`` for subcycle # accounting (see ``_tis_inf.run_md``). Restore both here. self.exe_dir = "." self.steps = 0 self.boltzmann = boltzmann self._beta = 1 / (self.boltzmann * self.temperature) self.subcycles = subcycles self.integrator = INTEGRATOR_MAPS[integrator["class"].lower()] self.integrator_settings = integrator["settings"] potential_class = POTENTIAL_MAPS[potential["class"].lower()] self.box = TBox(**box) # if lennard-jones or pairdoublewell potential we need to set some # parameters after the the class is initiated if potential["class"].lower() == "lennardjones": lj_params = {} for key in potential["settings"]["parameters"]: # turn str into int since .toml cannot read int keys lj_params[int(key)] = potential["settings"]["parameters"][key] # initiate LJ potential class self.potential = [potential_class()] # set the parameters of the LJ potential self.potential[0].set_parameters(lj_params) elif potential["class"].lower() == "doublewellpair": dwell = potential_class(types=(1, 1), dim=self.box.dim) dwell.set_parameters(potential["settings"]["parameters"]) self.potential = [dwell] else: self.potential = [potential_class(**potential["settings"])] if integrator["class"].lower() in [ "langevininertia", "langevinoverdamped", ]: # TODO: print("do something wrt random gen. so that only") print("langevin integrato needs it") self.dim = self.potential[0].dim self.mass = np.reshape(particles["mass"], (-1, 1)) self.names = particles["name"] self.particles = TParticles(dim=self.dim) for i, pos in enumerate(particles["pos"]): self.particles.add_particle( pos, mass=self.mass[i], name=self.names[i] ) self.system = TSystem(self.box, self.particles, self.potential)
@property def beta(self): """The thermodynamic temperature in units of the engine.""" return self._beta
[docs] def step(self, system, name): """Perform a single step with the engine. This engine is an internal integrator that is never kicked, so a single :py:meth:`step` is not supported. The method exists only so the class can be instantiated (the native external base declares it as an abstract method). """ raise NotImplementedError( "TurtleMDEngine does not support single-step integration " '("step()"); it is propagated via "_propagate_from".' )
[docs] def calculate_order(self, system, xyz=None, vel=None, box=None): """Calculate the order parameter of the current system.""" if any((xyz is None, vel is None, box is None)): out = self._read_configuration(system.config[0]) xyz = out[0] vel = out[1] box = out[2] if xyz is not None: system.pos = xyz if vel is not None: system.vel = vel * -1.0 if system.vel_rev else vel if box is not None: system.box = box if self.order_function is None: raise ValueError("Order parameter is not defined!") return self.order_function.calculate(system)
[docs] @staticmethod def add_to_path(path, phase_point, left, right): """Add a phase point and perform some checks.""" status = "Running propagate..." success = False stop = False path.append(phase_point) if path.phasepoints[-1].order[0] < left: status = "Crossed left interface!" success = True stop = True elif path.phasepoints[-1].order[0] > right: status = "Crossed right interface!" success = True stop = True elif path.length >= path.maxlen: status = "Max. path length exceeded" success = False stop = True return status, success, stop
[docs] def dump_phasepoint(self, phasepoint, deffnm="conf"): """Dump the current configuration from a system object to a file.""" pos_file = self.dump_config(phasepoint.config, deffnm=deffnm) phasepoint.set_pos((pos_file, 0))
[docs] def draw_maxwellian_velocities(self, vel, mass, beta, rgen, sigma_v=None): """Draw velocities from a Gaussian distribution.""" if sigma_v is None or np.any(sigma_v < 0.0): kbt = 1.0 / beta sigma_v = np.sqrt(kbt * (1 / mass)) npart, dim = vel.shape vel = rgen.normal(loc=0.0, scale=sigma_v, size=(npart, dim)) return vel, sigma_v
[docs] def _extract_frame(self, traj_file: str, idx: int, out_file: str) -> None: """ Extract a frame from a trajectory file. This method is used by `self.dump_config` when we are dumping from a trajectory file. It is not used if we are dumping from a single config file. Args: traj_file: The trajectory file to dump from. idx: The frame number we look for. out_file: The file to dump to. """ for i, snapshot in enumerate(read_xyz_file(traj_file)): if i == idx: box, xyz, vel, names = convert_snapshot(snapshot) if os.path.isfile(out_file): logger.debug("TurtleMD will overwrite %s", out_file) write_xyz_trajectory( out_file, xyz, vel, names, box, append=False ) return logger.error( "TurtleMD could not extract index %i from %s!", idx, traj_file )
[docs] def _propagate_from( self, name: str, path: Path, system: System, ens_set: Dict[str, Any], msg_file: FileIO, reverse: bool = False, ) -> Tuple[bool, str]: """Propagate the equations of motion from the given system. We assume the following: * Box does not change (constant volume simulation) * Box is orthogonal """ status = f"propagating with TurtleMD (reverse = {reverse})" interfaces = ens_set["interfaces"] logger.debug(status) success = False left, _, right = interfaces # Get positions and velocities from the input file. initial_conf = system.config[0] # these variables will be used later pos, vel, box, atoms = self._read_configuration(initial_conf) # initialize turtlemd system particles = TParticles(dim=self.dim) for i in range(self.particles.npart): particles.add_particle( pos[i][: self.dim], vel=vel[i][: self.dim], mass=self.particles.mass[i], name=self.particles.name[i], ) tmd_system = TSystem( box=self.box, particles=particles, potentials=self.potential ) if hasattr(self, "rgen"): seed = self.rgen.integers(0, 1e9) else: raise ValueError("Missing random generator!") tmd_simulation = MDSimulation( system=tmd_system, integrator=self.integrator( timestep=self.timestep, **self.integrator_settings, seed=seed, ), steps=path.maxlen * self.subcycles, ) order = self.calculate_order( system, xyz=pos, vel=vel, box=tmd_system.box.length ) traj_file = os.path.join(self.exe_dir, f"{name}.{self.ext}") # Create a message file with some info about this run: msg_file.write( f'# Initial order parameter: {" ".join([str(i) for i in order])}' ) msg_file.write(f"# Trajectory file is: {traj_file}") logger.debug("Running TurtleMD") step_nr = 0 # dict for storing ene rgies thermo = defaultdict(list) # loop over n subcycles # The first step of the loop is the initial phase point, i.e., for i=0 # turtlemd does not integrate the equations of motion, it just # returns the initial system for i, step in enumerate(tmd_simulation.run()): if (i) % (self.subcycles) == 0: thermoi = step.thermo(self.boltzmann) for key, val in thermoi.items(): thermo[key].append(val) # update coordinates, velocities and box # for the relevant dimensions. We need this here # because we use xyz format for trajecories, which has # 3 dimensions for coords, vel and the box. pos[:, : self.dim] = tmd_system.particles.pos vel[:, : self.dim] = tmd_system.particles.vel if box is not None: box[: self.dim] = tmd_system.box.length write_xyz_trajectory( traj_file, pos, vel, atoms, box, step=step_nr ) order = self.calculate_order( system, xyz=tmd_system.particles.pos, vel=tmd_system.particles.vel, box=tmd_system.box.length, ) msg_file.write( f'{step_nr} {" ".join([str(j) for j in order])}' ) snapshot = { "order": order, "config": (traj_file, step_nr), "vel_rev": reverse, } phase_point = self.snapshot_to_system(system, snapshot) status, success, stop = self.add_to_path( path, phase_point, left, right ) if stop: logger.debug( "TurtleMD propagation ended at %i. Reason: %s", step_nr, status, ) break step_nr += 1 msg_file.write("# Propagation done.") ekin = np.array(thermo["ekin"]) * tmd_system.particles.npart vpot = np.array(thermo["vpot"]) * tmd_system.particles.npart etot = ekin + vpot temp = np.array(thermo["temperature"]) path.update_energies(ekin, vpot, etot, temp) # Mirror the inf base ``propagate``: accumulate the MD steps this # segment consumed so the scheduler's subcycle accounting stays # byte-identical. self.steps += path.length return success, status
[docs] @staticmethod def _read_configuration( filename: str, ) -> Tuple[np.ndarray, np.ndarray, Optional[np.ndarray], List[str]]: """Read TurtleMD output configuration. This method reads the specified TurtleMD output configuration and extracts the configuration which includes the positions, velocities, box dimensions and particle names. It is used when calculating the order parameter from a TurtleMD simulation. Args: filename: The path to the file to read the configuration from. Returns: A tuple containing: - xyz: An array of atomic positions. - vel: An array of atomic velocities. - box: An array of box dimensions or None if not available. - names: A list of atom names found in the file. """ for snapshot in read_xyz_file(filename): box, xyz, vel, names = convert_snapshot(snapshot) return xyz, vel, box, names raise ValueError("Missing TurtleMD configuration")
[docs] def _reverse_velocities(self, filename: str, outfile: str) -> None: """Reverse velocities in the given snapshot. Args: filename: The path to the file containing the configuration to reverse the velocities of. outfile : The path to the output file for storing the configuration with reversed velocities. """ xyz, vel, box, names = self._read_configuration(filename) write_xyz_trajectory( outfile, xyz, -1.0 * vel, names, box, append=False )
[docs] def modify_velocities( self, system: System, vel_settings: Dict[str, Any], rgen ) -> Tuple[float, float]: """Modify the velocities of all particles. Parameters ---------- system : object like :py:class:`.System` The system whose particle velocities are to be modified. vel_settings : dict A dict; ``zero_momentum`` (bool) resets the linear momentum to zero after generating velocities internally. Returns ------- dek : float The change in kinetic energy from the velocity modification. kin_new : float The new kinetic energy of the system. """ mass = self.mass beta = self.beta pos = self.dump_frame(system) xyz, vel, box, atoms = self._read_configuration(pos) kin_old = kinetic_energy(vel, mass)[0] vel, _ = self.draw_maxwellian_velocities(vel, mass, beta, rgen) # we do not reset momentum by default if vel_settings.get("zero_momentum", False): vel = reset_momentum(vel, mass) conf_out = os.path.join(self.exe_dir, f"genvel.{self.ext}") write_xyz_trajectory(conf_out, xyz, vel, atoms, box, append=False) kin_new = kinetic_energy(vel, mass)[0] system.config = (conf_out, 0) system.ekin = kin_new if kin_old == 0.0: dek = float("inf") logger.debug( "Kinetic energy not found for previous point." "\n(This happens when the initial configuration " "does not contain energies.)" ) else: dek = kin_new - kin_old return dek, kin_new