"""A TurtleMD integrator interface.
This module defines a class for using the TurtleMD.
"""
from __future__ import annotations
import logging
import os
from collections import defaultdict
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
import numpy as np
from turtlemd.integrators import (
LangevinInertia,
LangevinOverdamped,
VelocityVerlet,
Verlet,
)
from turtlemd.potentials.lennardjones import LennardJonesCut
from turtlemd.potentials.well import DoubleWell, DoubleWellPair
from turtlemd.simulation import MDSimulation
from turtlemd.system.box import Box as TBox
from turtlemd.system.particles import Particles as TParticles
from turtlemd.system.system import System as TSystem
from pyretis.engines.external import ExternalMDEngine
from pyretis.engines._parts import (
convert_snapshot,
kinetic_energy,
read_xyz_file,
reset_momentum,
write_xyz_trajectory,
)
if TYPE_CHECKING:
from pyretis.core.path import Path
from pyretis.core._system_inf import System
from pyretis.inout.fileio import FileIO
# Type-only references (``FileIO``, ``Path``, ``System``) stay as forward
# strings, kept valid by ``from __future__ import annotations`` at the
# top of the module. Phase 2 ports the corresponding modules so static
# type checkers can resolve them.
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())
POTENTIAL_MAPS = {
"doublewell": DoubleWell,
"lennardjones": LennardJonesCut,
"doublewellpair": DoubleWellPair,
}
INTEGRATOR_MAPS = {
"langevininertia": LangevinInertia,
"langevinoverdamped": LangevinOverdamped,
"velocityverlet": VelocityVerlet,
"verlet": Verlet,
}
[docs]class TurtleMDEngine(ExternalMDEngine):
"""Interface the TurtleMD engine.
To do:
* Add support for multiple potentials?
* Velocity generation adds needs to account for
the dimensionality of the system
"""
engine_type = "internal"
needs_order = False
[docs] def __init__(
self,
timestep: float,
subcycles: int,
temperature: float,
boltzmann: float,
integrator: Dict[str, Any],
potential: Dict[str, Any],
particles: Dict[str, Any],
box: Dict[str, Any],
):
"""Initialize the TurtleMD engine.
Args:
timestep: The simulation timestep.
subcycles: The number of subcycles to execute per InfRetis step.
temperature: The temperature of the simulation.
boltzmann: The value of Boltzmanns constant (kB).
integrator: The name of the integrator to use for
TurtleMD.
potential: The name of the potential to use for TurtleMD.
particles: The mass and name of the particles in the system.
box: Definition of the simulation box.
"""
self.temperature = temperature
self.timestep = timestep
self.subcycles = subcycles
self.name = "turtlemd"
super().__init__(
"TurtleMD internal engine", self.timestep, self.subcycles
)
# The native base defaults exe_dir to ``None`` and defines no step
# counter; the infinite-swapping scheduler sets the per-worker
# exe_dir via ``set_mdrun`` but still expects the inf-base defaults
# for standalone use (``"."``) and reads ``self.steps`` for subcycle
# accounting (see ``_tis_inf.run_md``). Restore both here.
self.exe_dir = "."
self.steps = 0
self.boltzmann = boltzmann
self._beta = 1 / (self.boltzmann * self.temperature)
self.subcycles = subcycles
self.integrator = INTEGRATOR_MAPS[integrator["class"].lower()]
self.integrator_settings = integrator["settings"]
potential_class = POTENTIAL_MAPS[potential["class"].lower()]
self.box = TBox(**box)
# if lennard-jones or pairdoublewell potential we need to set some
# parameters after the the class is initiated
if potential["class"].lower() == "lennardjones":
lj_params = {}
for key in potential["settings"]["parameters"]:
# turn str into int since .toml cannot read int keys
lj_params[int(key)] = potential["settings"]["parameters"][key]
# initiate LJ potential class
self.potential = [potential_class()]
# set the parameters of the LJ potential
self.potential[0].set_parameters(lj_params)
elif potential["class"].lower() == "doublewellpair":
dwell = potential_class(types=(1, 1), dim=self.box.dim)
dwell.set_parameters(potential["settings"]["parameters"])
self.potential = [dwell]
else:
self.potential = [potential_class(**potential["settings"])]
if integrator["class"].lower() in [
"langevininertia",
"langevinoverdamped",
]:
# TODO:
print("do something wrt random gen. so that only")
print("langevin integrato needs it")
self.dim = self.potential[0].dim
self.mass = np.reshape(particles["mass"], (-1, 1))
self.names = particles["name"]
self.particles = TParticles(dim=self.dim)
for i, pos in enumerate(particles["pos"]):
self.particles.add_particle(
pos, mass=self.mass[i], name=self.names[i]
)
self.system = TSystem(self.box, self.particles, self.potential)
@property
def beta(self):
"""The thermodynamic temperature in units of the engine."""
return self._beta
[docs] def step(self, system, name):
"""Perform a single step with the engine.
This engine is an internal integrator that is never kicked, so a
single :py:meth:`step` is not supported. The method exists only so
the class can be instantiated (the native external base declares it
as an abstract method).
"""
raise NotImplementedError(
"TurtleMDEngine does not support single-step integration "
'("step()"); it is propagated via "_propagate_from".'
)
[docs] def calculate_order(self, system, xyz=None, vel=None, box=None):
"""Calculate the order parameter of the current system."""
if any((xyz is None, vel is None, box is None)):
out = self._read_configuration(system.config[0])
xyz = out[0]
vel = out[1]
box = out[2]
if xyz is not None:
system.pos = xyz
if vel is not None:
system.vel = vel * -1.0 if system.vel_rev else vel
if box is not None:
system.box = box
if self.order_function is None:
raise ValueError("Order parameter is not defined!")
return self.order_function.calculate(system)
[docs] @staticmethod
def add_to_path(path, phase_point, left, right):
"""Add a phase point and perform some checks."""
status = "Running propagate..."
success = False
stop = False
path.append(phase_point)
if path.phasepoints[-1].order[0] < left:
status = "Crossed left interface!"
success = True
stop = True
elif path.phasepoints[-1].order[0] > right:
status = "Crossed right interface!"
success = True
stop = True
elif path.length >= path.maxlen:
status = "Max. path length exceeded"
success = False
stop = True
return status, success, stop
[docs] def dump_phasepoint(self, phasepoint, deffnm="conf"):
"""Dump the current configuration from a system object to a file."""
pos_file = self.dump_config(phasepoint.config, deffnm=deffnm)
phasepoint.set_pos((pos_file, 0))
[docs] def draw_maxwellian_velocities(self, vel, mass, beta, rgen, sigma_v=None):
"""Draw velocities from a Gaussian distribution."""
if sigma_v is None or np.any(sigma_v < 0.0):
kbt = 1.0 / beta
sigma_v = np.sqrt(kbt * (1 / mass))
npart, dim = vel.shape
vel = rgen.normal(loc=0.0, scale=sigma_v, size=(npart, dim))
return vel, sigma_v
[docs] def _propagate_from(
self,
name: str,
path: Path,
system: System,
ens_set: Dict[str, Any],
msg_file: FileIO,
reverse: bool = False,
) -> Tuple[bool, str]:
"""Propagate the equations of motion from the given system.
We assume the following:
* Box does not change (constant volume simulation)
* Box is orthogonal
"""
status = f"propagating with TurtleMD (reverse = {reverse})"
interfaces = ens_set["interfaces"]
logger.debug(status)
success = False
left, _, right = interfaces
# Get positions and velocities from the input file.
initial_conf = system.config[0]
# these variables will be used later
pos, vel, box, atoms = self._read_configuration(initial_conf)
# initialize turtlemd system
particles = TParticles(dim=self.dim)
for i in range(self.particles.npart):
particles.add_particle(
pos[i][: self.dim],
vel=vel[i][: self.dim],
mass=self.particles.mass[i],
name=self.particles.name[i],
)
tmd_system = TSystem(
box=self.box, particles=particles, potentials=self.potential
)
if hasattr(self, "rgen"):
seed = self.rgen.integers(0, 1e9)
else:
raise ValueError("Missing random generator!")
tmd_simulation = MDSimulation(
system=tmd_system,
integrator=self.integrator(
timestep=self.timestep,
**self.integrator_settings,
seed=seed,
),
steps=path.maxlen * self.subcycles,
)
order = self.calculate_order(
system, xyz=pos, vel=vel, box=tmd_system.box.length
)
traj_file = os.path.join(self.exe_dir, f"{name}.{self.ext}")
# Create a message file with some info about this run:
msg_file.write(
f'# Initial order parameter: {" ".join([str(i) for i in order])}'
)
msg_file.write(f"# Trajectory file is: {traj_file}")
logger.debug("Running TurtleMD")
step_nr = 0
# dict for storing ene rgies
thermo = defaultdict(list)
# loop over n subcycles
# The first step of the loop is the initial phase point, i.e., for i=0
# turtlemd does not integrate the equations of motion, it just
# returns the initial system
for i, step in enumerate(tmd_simulation.run()):
if (i) % (self.subcycles) == 0:
thermoi = step.thermo(self.boltzmann)
for key, val in thermoi.items():
thermo[key].append(val)
# update coordinates, velocities and box
# for the relevant dimensions. We need this here
# because we use xyz format for trajecories, which has
# 3 dimensions for coords, vel and the box.
pos[:, : self.dim] = tmd_system.particles.pos
vel[:, : self.dim] = tmd_system.particles.vel
if box is not None:
box[: self.dim] = tmd_system.box.length
write_xyz_trajectory(
traj_file, pos, vel, atoms, box, step=step_nr
)
order = self.calculate_order(
system,
xyz=tmd_system.particles.pos,
vel=tmd_system.particles.vel,
box=tmd_system.box.length,
)
msg_file.write(
f'{step_nr} {" ".join([str(j) for j in order])}'
)
snapshot = {
"order": order,
"config": (traj_file, step_nr),
"vel_rev": reverse,
}
phase_point = self.snapshot_to_system(system, snapshot)
status, success, stop = self.add_to_path(
path, phase_point, left, right
)
if stop:
logger.debug(
"TurtleMD propagation ended at %i. Reason: %s",
step_nr,
status,
)
break
step_nr += 1
msg_file.write("# Propagation done.")
ekin = np.array(thermo["ekin"]) * tmd_system.particles.npart
vpot = np.array(thermo["vpot"]) * tmd_system.particles.npart
etot = ekin + vpot
temp = np.array(thermo["temperature"])
path.update_energies(ekin, vpot, etot, temp)
# Mirror the inf base ``propagate``: accumulate the MD steps this
# segment consumed so the scheduler's subcycle accounting stays
# byte-identical.
self.steps += path.length
return success, status
[docs] @staticmethod
def _read_configuration(
filename: str,
) -> Tuple[np.ndarray, np.ndarray, Optional[np.ndarray], List[str]]:
"""Read TurtleMD output configuration.
This method reads the specified TurtleMD output configuration and
extracts the configuration which includes the positions, velocities,
box dimensions and particle names. It is used when calculating the
order parameter from a TurtleMD simulation.
Args:
filename: The path to the file to read the configuration from.
Returns:
A tuple containing:
- xyz: An array of atomic positions.
- vel: An array of atomic velocities.
- box: An array of box dimensions or None if not available.
- names: A list of atom names found in the file.
"""
for snapshot in read_xyz_file(filename):
box, xyz, vel, names = convert_snapshot(snapshot)
return xyz, vel, box, names
raise ValueError("Missing TurtleMD configuration")
[docs] def _reverse_velocities(self, filename: str, outfile: str) -> None:
"""Reverse velocities in the given snapshot.
Args:
filename: The path to the file containing the configuration
to reverse the velocities of.
outfile : The path to the output file for storing the
configuration with reversed velocities.
"""
xyz, vel, box, names = self._read_configuration(filename)
write_xyz_trajectory(
outfile, xyz, -1.0 * vel, names, box, append=False
)
[docs] def modify_velocities(
self, system: System, vel_settings: Dict[str, Any], rgen
) -> Tuple[float, float]:
"""Modify the velocities of all particles.
Parameters
----------
system : object like :py:class:`.System`
The system whose particle velocities are to be modified.
vel_settings : dict
A dict; ``zero_momentum`` (bool) resets the linear momentum to
zero after generating velocities internally.
Returns
-------
dek : float
The change in kinetic energy from the velocity modification.
kin_new : float
The new kinetic energy of the system.
"""
mass = self.mass
beta = self.beta
pos = self.dump_frame(system)
xyz, vel, box, atoms = self._read_configuration(pos)
kin_old = kinetic_energy(vel, mass)[0]
vel, _ = self.draw_maxwellian_velocities(vel, mass, beta, rgen)
# we do not reset momentum by default
if vel_settings.get("zero_momentum", False):
vel = reset_momentum(vel, mass)
conf_out = os.path.join(self.exe_dir, f"genvel.{self.ext}")
write_xyz_trajectory(conf_out, xyz, vel, atoms, box, append=False)
kin_new = kinetic_energy(vel, mass)[0]
system.config = (conf_out, 0)
system.ekin = kin_new
if kin_old == 0.0:
dek = float("inf")
logger.debug(
"Kinetic energy not found for previous point."
"\n(This happens when the initial configuration "
"does not contain energies.)"
)
else:
dek = kin_new - kin_old
return dek, kin_new