"""A continuous CP2K external MD integrator interface.
This module defines the canonical CP2K engine, selected by
``class = "cp2k"``. CP2K is run as a single persistent process for a
whole propagation segment and its position/velocity trajectories are
read *on-the-fly*, so the path is filled incrementally and CP2K is
terminated as soon as a stopping interface is crossed. The wavefunction
is warm-started from a per-ensemble ``.wfn`` restart for speed.
The relaunch (start-and-stop / restart-chain) engine, which advances
CP2K one order-parameter point at a time, remains available as
``class = "cp2k_steps"`` (:py:mod:`pyretis.engines.cp2k_steps`).
Important classes defined here
------------------------------
CP2KEngine (:py:class:`.CP2KEngine`)
The continuous CP2K engine.
"""
from __future__ import annotations
import logging
import os
import signal
import subprocess # nosec B404
import time
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Optional,
Tuple,
Union,
)
import numpy as np
from pyretis.core.systemhelp import update_system_box
# The streaming engine SUBCLASSES the relaunch engine ``CP2KEngineSteps``
# (mirroring ``LAMMPSEngine(LAMMPSEngineSteps)`` /
# ``GromacsEngine(GromacsEngineSteps)``): both read the SAME input files
# (``initial.xyz`` + ``cp2k.inp``) and take the SAME constructor args, so a
# user can switch ``class = cp2k`` <-> ``class = cp2k_steps`` with the same
# config + input directory. The base provides the constructor, input-file
# discovery and the shared helpers (``add_input_files``, ``_extract_frame``);
# this module overrides only the run loop and the streaming-specific I/O.
# The remaining ``_parts``-native helpers (PERIODIC_TABLE,
# ReadAndProcessOnTheFly, xyz_reader, kinetic_energy, reset_momentum) are
# imported lazily inside the methods that use them: a top-level
# ``import pyretis.engines._parts`` pulls in ``pyretis.setup`` while
# ``pyretis.engines`` is still initialising (this module is imported from
# ``engines/__init__``), which is a circular import. The streaming and
# velocity paths are runtime-only, so importing at call time is safe.
from pyretis.engines.cp2k_steps import CP2KEngineSteps
from pyretis.inout.formats.xyz import (
convert_snapshot,
read_xyz_file,
write_xyz_trajectory,
)
# E4.1 prep (2026-05-27): the CP2K input parser (read_cp2k_input +
# tree manipulators) used to be a verbatim duplicate of the
# pyretis.inout.formats.cp2k version. After harmonising the parser
# (commit ccd26b3a), it is imported from the canonical home so the
# two flavours can no longer drift.
from pyretis.inout.formats.cp2k import (
read_cp2k_box,
read_cp2k_energy,
read_cp2k_input,
set_parents,
update_cp2k_input,
)
if TYPE_CHECKING:
from pyretis.core.path import Path as InfPath
from pyretis.core._system_inf import System
from pyretis.inout.fileio import FileIO
# ``FileIO``, ``InfPath``, ``System`` references stay as PEP 563
# forward strings (``from __future__ import annotations`` above).
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())
OUTPUT_FILES = {
"energy": "{}-1.ener",
"restart": "{}-1.restart",
"pos": "{}-pos-1.xyz",
"vel": "{}-vel-1.xyz",
"wfn": "{}-RESTART.wfn",
"wfn-bak": "{}-RESTART.wfn.bak-",
}
[docs]def guess_particle_mass(particle_no: int, particle_type: str) -> float:
"""Guess a particle mass from its type and convert to CP2K units.
Args:
particle_no: The particle number. This is only used
for output to the logger.
particle_type: The particle type as a string. This
should be an element from the periodic table.
Returns:
The guessed particle mass as a float.
"""
# Lazy import (see module-level note) to avoid the _parts -> setup
# circular import at module load.
from pyretis.engines._parts import PERIODIC_TABLE
logger.info(
(
"Mass not specified for particle no. %i\n"
'Will guess from particle type "%s"'
),
particle_no,
particle_type,
)
mass = PERIODIC_TABLE.get(particle_type, None)
if mass is None:
msg = (
f"Could not infer mass for particle number {particle_no}"
f" with element '{particle_type}' as its not in our periodic"
"table."
)
raise ValueError(msg)
particle_mass = 1822.8884858012982 * mass
logger.info(
"-> Using a mass of %f g/mol (%f in internal units)",
mass,
particle_mass,
)
return particle_mass
[docs]def write_for_run_vel(
infile: Union[str, Path],
outfile: Union[str, Path],
timestep: float,
nsteps: int,
subcycles: int,
posfile: str,
vel: np.ndarray,
name: str = "md_step",
restart_wfn: str = "doesnotexist.wfn",
print_freq: Optional[int] = None,
) -> None:
"""Create input file to perform n steps.
Note, a single step actually consists of a number of subcycles.
But from PyRETIS' point of view, this is a single step.
Further, we here assume that we start from a given xyz file and
we also explicitly give the velocities here.
Args:
infile: Path to the input template to use.
outfile: Path to the input file to create.
timestep: The time step to use for the simulation.
nsteps: The number of PyRETIS steps to perform.
subcycles: The number of sub-cycles to perform.
posfile: The (base)name for the input file to read positions from.
vel: The velocities to set in the input.
name: A name for the CP2K project.
print_freq: How often we should print to the trajectory file.
"""
if print_freq is None:
print_freq = subcycles
to_update: Dict[str, Any] = {
"GLOBAL": {
"data": [f"PROJECT {name}", "RUN_TYPE MD", "PRINT_LEVEL LOW"],
"replace": True,
},
"MOTION->MD": {
"data": {"STEPS": nsteps * subcycles, "TIMESTEP": timestep}
},
"MOTION->PRINT->RESTART": {
"data": ["BACKUP_COPIES 0"],
"replace": True,
},
"MOTION->PRINT->RESTART->EACH": {"data": {"MD": print_freq}},
"MOTION->PRINT->VELOCITIES->EACH": {"data": {"MD": print_freq}},
"MOTION->PRINT->TRAJECTORY->EACH": {"data": {"MD": print_freq}},
"FORCE_EVAL->SUBSYS->TOPOLOGY": {
"data": {"COORD_FILE_NAME": posfile, "COORD_FILE_FORMAT": "xyz"}
},
"FORCE_EVAL->SUBSYS->VELOCITY": {
"data": [],
"replace": True,
},
"FORCE_EVAL->DFT->SCF->PRINT->RESTART": {
"data": ["BACKUP_COPIES 0"],
"replace": True,
},
"FORCE_EVAL->DFT": {
"data": {"WFN_RESTART_FILE_NAME": restart_wfn},
},
}
for veli in vel:
to_update["FORCE_EVAL->SUBSYS->VELOCITY"]["data"].append(
f"{veli[0]} {veli[1]} {veli[2]}"
)
remove = ["EXT_RESTART", "FORCE_EVAL->SUBSYS->COORD"]
update_cp2k_input(infile, outfile, update=to_update, remove=remove)
[docs]class CP2KEngine(CP2KEngineSteps):
"""Streaming CP2K engine.
This engine shares its configuration and input convention with the
relaunch engine :py:class:`.CP2KEngineSteps` (same ``[engine]``
settings, same ``initial.xyz`` / ``cp2k.inp`` / ``extra_files`` input
directory) so a user can switch ``class = cp2k`` <-> ``class =
cp2k_steps`` and have it just work -- as for the two GROMACS and LAMMPS
engines. The two engines build the CP2K run from the *same* input
template; they differ only in how the trajectory is consumed: this
engine runs CP2K as a single persistent process and reads its
position/velocity trajectories on the fly, terminating CP2K early when
an interface is crossed. The wavefunction is warm-started from a
per-ensemble ``.wfn`` restart for speed.
Attributes:
cp2k: The command for executing CP2K.
input_path: The directory where the input files are stored.
timestep: The time step used in the CP2K MD simulation.
subcycles: The number of steps each CP2K run is composed of.
extra_files: List of extra files which may be required to run CP2K.
sleep: A time in seconds, used to wait for files to be ready.
"""
# CP2K drives a real external binary, so it is an ``external`` engine.
# Set it explicitly so the native code paths that branch on
# ``engine_type`` (e.g. ``initiate_load`` external branch,
# ``simulationio`` particle handling) keep the external behaviour.
# ``needs_order`` is ``True``: CP2K does not compute the order
# parameter itself -- this engine calls ``self.calculate_order`` with
# an injected ``self.order_function``. ``default_units = 'cp2k'`` is
# inherited from :py:class:`.CP2KEngineSteps`.
engine_type = "external"
needs_order = True
[docs] def __init__(
self,
cp2k: str,
input_path: str,
timestep: float,
subcycles: int,
temperature: Optional[float] = None,
extra_files: Optional[List[str]] = None,
exe_path: Union[str, Path] = Path(".").resolve(),
sleep: float = 0.1,
):
"""Set up the CP2K MD engine.
Args:
cp2k: The CP2K executable.
input_path: The path to the directory containing CP2K input files.
timestep: The time step used in the CP2K simulation.
subcycles: The number of steps each CP2K run is composed of.
temperature: The temperature of the simulation.
extra_files: List of extra files which may be required to run CP2K.
exe_path: The path on which the engine is executed
sleep: A time in seconds, used to wait for files to be ready.
"""
# Delegate the shared setup (command, input path, input-file
# discovery, ``self.ext``, ``extra_files``) to the relaunch base,
# so the two engines read the SAME ``initial.xyz`` + ``cp2k.inp``
# by the SAME convention. The base signature uses ``conf`` /
# ``template`` keywords; pass the CP2K defaults explicitly.
super().__init__(
cp2k,
input_path,
timestep,
subcycles,
extra_files=extra_files,
exe_path=exe_path,
conf="initial.xyz",
template="cp2k.inp",
)
# Streaming-specific state. The native base defaults ``exe_dir`` to
# ``None`` and defines no step counter; the infinite-swapping
# scheduler sets the per-worker exe_dir via ``set_mdrun`` but still
# expects the inf-base defaults for standalone use (``"."``) and
# reads ``self.steps`` for subcycle accounting (see
# ``_tis_inf.run_md``). Restore both here.
self.exe_dir = "."
self.steps = 0
self.name = "cp2k"
self.sleep = sleep
# add mass, temperature and unit information to engine
# which is needed for velocity modification
_, _, _, atoms = self._read_configuration(self.input_files["conf"])
mass = []
if atoms is not None:
mass = [
guess_particle_mass(i, name) for i, name in enumerate(atoms)
]
self.mass = np.reshape(mass, (len(mass), 1))
# The MD temperature lives in the cp2k.inp template. The [engine]
# ``temperature`` setting is an optional cross-check: when omitted,
# the template value is used, so the SAME config + input directory
# drives either engine (``class = cp2k`` <-> ``cp2k_steps``) -- the
# relaunch engine never needs a ``temperature`` setting. If the
# template sets no temperature (NVE with NVT shooting), it stays
# ``None`` and velocity generation is unavailable (as before).
section = "MOTION->MD"
nodes = read_cp2k_input(self.input_files["template"])
node_ref = set_parents(nodes)
md_settings = node_ref[section]
cp2k_temp = None
for data in md_settings.data:
if "temperature" in data.lower():
cp2k_temp = float(data.split()[1])
if temperature is None:
temperature = cp2k_temp
elif cp2k_temp is not None and cp2k_temp != temperature:
msg = (
f"The temperature in the cp2k input {cp2k_temp} !="
+ f" {temperature} which is specified in the .toml. "
+ "They should be the same since we generate "
+ "velocities internally at the .toml temperature."
)
raise ValueError(msg)
self.temperature = temperature
self.kb = 3.16681534e-6 # hartree
self._beta = (
None if temperature is None else 1 / (temperature * self.kb)
)
# ``self.extra_files`` is built by the relaunch base from the same
# ``extra_files`` argument, so it is not re-done here.
@property
def beta(self):
"""The thermodynamic temperature in units of the engine."""
return self._beta
[docs] def step(self, system, name):
"""Perform a single step with the engine.
CP2K is propagated via :py:meth:`_propagate_from`; a bare
single :py:meth:`step` is not used by the infinite-swapping
scheduler. The method exists only so the class can be
instantiated (the native external base declares it abstract).
"""
raise NotImplementedError(
"CP2KEngine does not support single-step integration "
'("step()"); it is propagated via "_propagate_from".'
)
[docs] @staticmethod
def add_to_path(path, phase_point, left, right):
"""Add a phase point and perform some checks.
This is the three-value variant (status, success, stop)
that :py:meth:`_propagate_from` unpacks. It is kept verbatim from
the inf base (the native ``EngineBase.add_to_path`` returns a
four-tuple and uses a different acceptance check), so the
propagation behaviour stays byte-identical.
"""
status = "Running propagate..."
success = False
stop = False
path.append(phase_point)
if path.phasepoints[-1].order[0] < left:
status = "Crossed left interface!"
success = True
stop = True
elif path.phasepoints[-1].order[0] > right:
status = "Crossed right interface!"
success = True
stop = True
elif path.maxlen is not None and path.length >= path.maxlen:
status = "Max. path length exceeded"
success = False
stop = True
return status, success, stop
[docs] def calculate_order(self, system, xyz=None, vel=None, box=None):
"""Calculate the order parameter of the current system.
Overrides the native ``ExternalMDEngine.calculate_order`` because
this engine's :py:meth:`_read_configuration` returns the box as
the *third* element (``xyz, vel, box, names``), whereas the native
method reads it as box-first. The body matches the inf base so the
order-parameter evaluation stays byte-identical.
"""
if any((xyz is None, vel is None, box is None)):
out = self._read_configuration(system.config[0])
xyz = out[0]
vel = out[1]
box = out[2]
if xyz is not None:
system.pos = xyz
if vel is not None:
system.vel = vel * -1.0 if system.vel_rev else vel
if box is not None:
update_system_box(system, box)
if self.order_function is None:
raise ValueError("Order parameter is not defined!")
return self.order_function.calculate(system)
[docs] def dump_phasepoint(self, phasepoint, deffnm="conf"):
"""Dump the current configuration from a system object to a file.
Kept from the inf base: it points the phase point at frame index
``0`` via ``set_pos`` (the native external override instead sets
``config = (pos_file, None)``). The inf scheduler
(``_tis_inf.dump_phasepoint`` call sites) relies on the index-0
form, so this preserves byte-identical behaviour.
"""
pos_file = self.dump_config(phasepoint.config, deffnm=deffnm)
phasepoint.set_pos((pos_file, 0))
[docs] def draw_maxwellian_velocities(self, vel, mass, beta, rgen, sigma_v=None):
"""Draw velocities from a Gaussian distribution.
The native bases do not provide this helper, so it is restored
verbatim from the inf base (``modify_velocities`` calls it). The
``rgen`` is the injected velocity-generation generator.
"""
if sigma_v is None or np.any(sigma_v < 0.0):
kbt = 1.0 / beta
sigma_v = np.sqrt(kbt * (1 / mass))
npart, dim = vel.shape
vel = rgen.normal(loc=0.0, scale=sigma_v, size=(npart, dim))
return vel, sigma_v
# ``_extract_frame`` is inherited from :py:class:`.CP2KEngineSteps`
# (the two implementations had byte-identical executable bodies; only
# docstring style and line-wrapping differed).
[docs] def _propagate_from(
self,
name: str,
path: InfPath,
system: System,
ens_set: Dict[str, Any],
msg_file: FileIO,
reverse: bool = False,
) -> Tuple[bool, str]:
"""
Propagate with CP2K from the current system configuration.
Here, we assume that this method is called after the propagate()
has been called in the parent. The parent is then responsible
for reversing the velocities and also for setting the initial
state of the system.
Note that the on-the-fly reading of data is currently only applicable
for NVT simulations, as no box information is read from CP2K.
Args:
name: A name to use for the trajectory we are generating.
path: This is the path we use to fill in phase-space points.
system: The phase point to use as a starting point.
ens_set: Settings for the ensembles.
msg_file: An object we use for writing out messages that are useful
for inspecting the status of the current propagation.
reverse: If True, the system will be propagated backward in time.
Returns:
A tuple containing:
- True if we generated an acceptable path, False, otherwise.
- A text description of the status of the propagation. Can
be used to interpret the reason for the path not being
acceptable.
"""
# Lazy import (see module-level note) to avoid the _parts -> setup
# circular import at module load.
from pyretis.engines._parts import ReadAndProcessOnTheFly, xyz_reader
status = f"propagating with CP2K (reverse = {reverse})"
interfaces = ens_set["interfaces"]
logger.debug(status)
success = False
left, _, right = interfaces
logger.debug("Adding input files for CP2K")
# First, copy the required input files:
self.add_input_files(self.exe_dir)
# Get positions and velocities from the input file.
initial_conf = system.config[0]
xyz, vel, box, atoms = self._read_configuration(initial_conf)
if box is None:
box, _ = read_cp2k_box(self.input_files["template"])
# Add CP2K input for N steps:
run_input = os.path.join(self.exe_dir, "run.inp")
# get ensemble from msg_file of format "msg-ens_.."
ens = os.path.basename(msg_file.filename)[4:7]
restart_wfn = os.path.join(self.input_path, ens + ".wfn")
write_for_run_vel(
self.input_files["template"],
run_input,
self.timestep,
path.maxlen,
self.subcycles,
os.path.basename(initial_conf),
vel,
name=name,
restart_wfn=restart_wfn,
)
# Get the order parameter before the run:
order = self.calculate_order(system, xyz=xyz, vel=vel, box=box)
traj_file = os.path.join(self.exe_dir, f"{name}.{self.ext}")
# Create a message file with some info about this run:
msg_file.write(
f'# Initial order parameter: {" ".join([str(i) for i in order])}'
)
msg_file.write(f"# Trajectory file is: {traj_file}")
# Get CP2K output files:
out_files = {}
for key, val in OUTPUT_FILES.items():
out_files[key] = os.path.join(self.exe_dir, val.format(name))
restart_file = os.path.join(self.exe_dir, out_files["restart"])
prestart_file = os.path.join(self.exe_dir, "previous.restart")
wave_file = os.path.join(self.exe_dir, out_files["wfn"])
pwave_file = os.path.join(self.exe_dir, "previous.wfn")
# CP2K runner
logger.debug("Executing CP2K %s: %s", name, "run.inp")
cmd = self.cp2k + ["-i", "run.inp"]
cwd = self.exe_dir
inputs = None
# from external.exe_command
cmd2 = " ".join(cmd)
logger.debug("Executing: %s", cmd2)
out_name = "stdout.txt"
err_name = "stderr.txt"
if cwd:
out_name = os.path.join(cwd, out_name)
err_name = os.path.join(cwd, err_name)
return_code = None
cp2k_was_terminated = False
with open(out_name, "wb") as fout, open(err_name, "wb") as ferr:
exe = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=fout,
stderr=ferr,
shell=False, # nosec B603
cwd=cwd,
preexec_fn=os.setsid,
)
# wait for trajectories to appear
while not os.path.exists(out_files["pos"]) or not os.path.exists(
out_files["vel"]
):
time.sleep(self.sleep)
if exe.poll() is not None:
logger.debug("CP2K execution stopped")
break
# CP2K may have finished after last checking files
# or it may have crashed without writing the files
if exe.poll() is None or exe.returncode == 0:
pos_reader = ReadAndProcessOnTheFly(
out_files["pos"], xyz_reader
)
vel_reader = ReadAndProcessOnTheFly(
out_files["vel"], xyz_reader
)
# start reading on the fly as CP2K is still running
# if it stops, perform one more iteration to read
# the remaining content in the files. Note that we assume here
# that CP2K writes in blocks of frames, and never partially
# finished frames.
iterations_after_stop = 0
step_nr = 0
pos_traj = []
vel_traj = []
while exe.poll() is None or iterations_after_stop <= 1:
# we may still have some data in one of the trajectories
# so use += here
pos_traj += pos_reader.read_and_process_content()
vel_traj += vel_reader.read_and_process_content()
# loop over the frames that are ready
for _ in range(min(len(pos_traj), len(vel_traj))):
pos = pos_traj.pop(0)
vel = vel_traj.pop(0)
write_xyz_trajectory(traj_file, pos, vel, atoms, box)
# calculate order, check for crossings, etc
order = self.calculate_order(
system, xyz=pos, vel=vel, box=box
)
msg_file.write(
f'{step_nr} {" ".join([str(j) for j in order])}'
)
snapshot = {
"order": order,
"config": (traj_file, step_nr),
"vel_rev": reverse,
}
phase_point = self.snapshot_to_system(system, snapshot)
status, success, stop = self.add_to_path(
path, phase_point, left, right
)
if stop:
# process may have terminated since we last checked
if exe.poll() is None:
logger.debug("Terminating CP2K execution")
os.killpg(os.getpgid(exe.pid), signal.SIGTERM)
# wait for process to die, necessary for mpi
exe.wait(timeout=360)
logger.debug(
"CP2K propagation ended at %i. Reason: %s",
step_nr,
status,
)
# exit while loop without reading additional data
iterations_after_stop = 2
cp2k_was_terminated = True
break
step_nr += 1
time.sleep(self.sleep)
# if CP2K finished, we run one more loop
if exe.poll() is not None and iterations_after_stop <= 1:
iterations_after_stop += 1
return_code = exe.returncode
if return_code != 0 and not cp2k_was_terminated:
logger.error(
"Execution of external program (%s) failed!",
self.description,
)
logger.error("Attempted command: %s", cmd2)
logger.error("Execution directory: %s", cwd)
if inputs is not None:
logger.error("Input to external program was: %s", inputs)
logger.error(
"Return code from external program: %i", return_code
)
logger.error("STDOUT, see file: %s", out_name)
logger.error("STDERR, see file: %s", err_name)
msg = (
f"Execution of external program ({self.description}) "
f"failed with command:\n {cmd2}.\n"
f"Return code: {return_code}"
)
raise RuntimeError(msg)
if (return_code is not None) and (
return_code == 0 or cp2k_was_terminated
):
self._removefile(out_name)
self._removefile(err_name)
msg_file.write("# Propagation done.")
energy_file = out_files["energy"]
msg_file.write(f"# Reading energies from: {energy_file}")
energy = read_cp2k_energy(energy_file)
end = (step_nr + 1) * self.subcycles
ekin: np.ndarray = energy.get("ekin", np.array([]))
vpot: np.ndarray = energy.get("vpot", np.array([]))
etot: np.ndarray = energy.get("etot", np.array([]))
temp: np.ndarray = energy.get("temp", np.array([]))
path.update_energies(
ekin[:end:self.subcycles],
vpot[:end:self.subcycles],
etot[:end:self.subcycles],
temp[:end:self.subcycles],
)
# copy .wfn file for future use
if os.path.isfile(wave_file):
self._copyfile(wave_file, restart_wfn)
for _, files in out_files.items():
self._removefile(files)
self._removefile(prestart_file)
self._removefile(pwave_file)
self._removefile(run_input)
self._removefile(restart_file)
self._removefile(wave_file)
# Mirror the inf base ``propagate``: accumulate the MD steps this
# segment consumed so the scheduler's subcycle accounting stays
# byte-identical (the native ``propagate`` does not increment it).
self.steps += path.length
return success, status
# ``add_input_files`` is inherited from :py:class:`.CP2KEngineSteps`
# (the two implementations had byte-identical executable bodies; only
# docstring style and line-wrapping differed).
[docs] @staticmethod
def _read_configuration(
filename: Union[str, Path],
) -> Tuple[
np.ndarray, np.ndarray, Optional[np.ndarray], Optional[List[str]]
]:
"""Read a CP2K output configuration from a file.
This streaming engine returns the box as the THIRD element
(``xyz, vel, box, names``), so the override is kept verbatim. The
relaunch :py:meth:`.CP2KEngineSteps._read_configuration` returns
``box`` first instead; :py:meth:`calculate_order` and
:py:meth:`_propagate_from` rely on the box-third order, so this
cannot be inherited.
Args:
filename: Path to the file to read.
Returns:
A tuple containing:
- The positions.
- The velocities.
- The box dimensions if we manage to read it.
- The atom names found in the file.
"""
for snapshot in read_xyz_file(filename):
box, xyz, vel, names = convert_snapshot(snapshot)
break # Stop after the first snapshot.
return xyz, vel, box, names
[docs] def set_mdrun(self, md_items: Dict[str, Any]) -> None:
"""Set the execute directory."""
# TODO: REMOVE OR RENAME?
self.exe_dir = md_items["exe_dir"]
[docs] def _reverse_velocities(self, filename: str, outfile: str) -> None:
"""Reverse velocity in a given snapshot.
Kept as an override because it unpacks this engine's box-third
:py:meth:`_read_configuration` (``xyz, vel, box, names``); the
relaunch base unpacks box first.
Args:
filename: Path to the file containing the configuration to
reverse velocities in.
outfile: Path to file for storing the configuration with
reversed velocities.
"""
xyz, vel, box, names = self._read_configuration(filename)
write_xyz_trajectory(
outfile, xyz, -1.0 * vel, names, box, append=False
)
[docs] def modify_velocities(
self, system: System, vel_settings: Dict[str, Any], rgen
) -> Tuple[float, float]:
"""Modify the velocities of all particles.
Args:
system: The system whose particle velocities are to be modified.
vel_settings: A dict containing the key ``'zero_momentum'``
(boolean): if true we reset the linear momentum to zero
after generating velocities internally.
Returns:
A tuple containing:
- dek: The change in kinetic energy as a result of the
velocity modification.
- kin_new: The new kinetic energy of the system.
Note:
CP2K removes the center of mass motion by default,
so we set the momentum to zero by default here.
This method does **not** take care of constraints.
"""
# Lazy import (see module-level note).
from pyretis.engines._parts import kinetic_energy, reset_momentum
mass = self.mass
beta = self.beta
pos = self.dump_frame(system)
xyz, vel, box, atoms = self._read_configuration(pos)
kin_old = kinetic_energy(vel, mass)[0]
# TO DO: Is this check neccessary?
if box is None:
box, _ = read_cp2k_box(self.input_files["template"])
vel, _ = self.draw_maxwellian_velocities(vel, mass, beta, rgen)
# reset momentum by default in cp2k
if vel_settings.get("zero_momentum", True):
vel = reset_momentum(vel, mass)
conf_out = os.path.join(self.exe_dir, f"genvel.{self.ext}")
write_xyz_trajectory(conf_out, xyz, vel, atoms, box, append=False)
kin_new = kinetic_energy(vel, mass)[0]
system.config = (conf_out, 0)
system.ekin = kin_new
if kin_old == 0.0:
dek = float("inf")
logger.debug(
"Kinetic energy not found for previous point."
"\n(This happens when the initial configuration "
"does not contain energies.)"
)
else:
dek = kin_new - kin_old
return dek, kin_new