Source code for pyretis.engines.cp2k

"""A continuous CP2K external MD integrator interface.

This module defines the canonical CP2K engine, selected by
``class = "cp2k"``. CP2K is run as a single persistent process for a
whole propagation segment and its position/velocity trajectories are
read *on-the-fly*, so the path is filled incrementally and CP2K is
terminated as soon as a stopping interface is crossed. The wavefunction
is warm-started from a per-ensemble ``.wfn`` restart for speed.

The relaunch (start-and-stop / restart-chain) engine, which advances
CP2K one order-parameter point at a time, remains available as
``class = "cp2k_steps"`` (:py:mod:`pyretis.engines.cp2k_steps`).

Important classes defined here
------------------------------

CP2KEngine (:py:class:`.CP2KEngine`)
    The continuous CP2K engine.
"""

from __future__ import annotations

import logging
import os
import signal
import subprocess  # nosec B404
import time
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    List,
    Optional,
    Tuple,
    Union,
)

import numpy as np

from pyretis.core.systemhelp import update_system_box
# The streaming engine SUBCLASSES the relaunch engine ``CP2KEngineSteps``
# (mirroring ``LAMMPSEngine(LAMMPSEngineSteps)`` /
# ``GromacsEngine(GromacsEngineSteps)``): both read the SAME input files
# (``initial.xyz`` + ``cp2k.inp``) and take the SAME constructor args, so a
# user can switch ``class = cp2k`` <-> ``class = cp2k_steps`` with the same
# config + input directory. The base provides the constructor, input-file
# discovery and the shared helpers (``add_input_files``, ``_extract_frame``);
# this module overrides only the run loop and the streaming-specific I/O.
# The remaining ``_parts``-native helpers (PERIODIC_TABLE,
# ReadAndProcessOnTheFly, xyz_reader, kinetic_energy, reset_momentum) are
# imported lazily inside the methods that use them: a top-level
# ``import pyretis.engines._parts`` pulls in ``pyretis.setup`` while
# ``pyretis.engines`` is still initialising (this module is imported from
# ``engines/__init__``), which is a circular import. The streaming and
# velocity paths are runtime-only, so importing at call time is safe.
from pyretis.engines.cp2k_steps import CP2KEngineSteps
from pyretis.inout.formats.xyz import (
    convert_snapshot,
    read_xyz_file,
    write_xyz_trajectory,
)
# E4.1 prep (2026-05-27): the CP2K input parser (read_cp2k_input +
# tree manipulators) used to be a verbatim duplicate of the
# pyretis.inout.formats.cp2k version. After harmonising the parser
# (commit ccd26b3a), it is imported from the canonical home so the
# two flavours can no longer drift.
from pyretis.inout.formats.cp2k import (
    read_cp2k_box,
    read_cp2k_energy,
    read_cp2k_input,
    set_parents,
    update_cp2k_input,
)

if TYPE_CHECKING:
    from pyretis.core.path import Path as InfPath
    from pyretis.core._system_inf import System
    from pyretis.inout.fileio import FileIO

# ``FileIO``, ``InfPath``, ``System`` references stay as PEP 563
# forward strings (``from __future__ import annotations`` above).

logger = logging.getLogger(__name__)  # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())


OUTPUT_FILES = {
    "energy": "{}-1.ener",
    "restart": "{}-1.restart",
    "pos": "{}-pos-1.xyz",
    "vel": "{}-vel-1.xyz",
    "wfn": "{}-RESTART.wfn",
    "wfn-bak": "{}-RESTART.wfn.bak-",
}


[docs]def guess_particle_mass(particle_no: int, particle_type: str) -> float: """Guess a particle mass from its type and convert to CP2K units. Args: particle_no: The particle number. This is only used for output to the logger. particle_type: The particle type as a string. This should be an element from the periodic table. Returns: The guessed particle mass as a float. """ # Lazy import (see module-level note) to avoid the _parts -> setup # circular import at module load. from pyretis.engines._parts import PERIODIC_TABLE logger.info( ( "Mass not specified for particle no. %i\n" 'Will guess from particle type "%s"' ), particle_no, particle_type, ) mass = PERIODIC_TABLE.get(particle_type, None) if mass is None: msg = ( f"Could not infer mass for particle number {particle_no}" f" with element '{particle_type}' as its not in our periodic" "table." ) raise ValueError(msg) particle_mass = 1822.8884858012982 * mass logger.info( "-> Using a mass of %f g/mol (%f in internal units)", mass, particle_mass, ) return particle_mass
[docs]def write_for_run_vel( infile: Union[str, Path], outfile: Union[str, Path], timestep: float, nsteps: int, subcycles: int, posfile: str, vel: np.ndarray, name: str = "md_step", restart_wfn: str = "doesnotexist.wfn", print_freq: Optional[int] = None, ) -> None: """Create input file to perform n steps. Note, a single step actually consists of a number of subcycles. But from PyRETIS' point of view, this is a single step. Further, we here assume that we start from a given xyz file and we also explicitly give the velocities here. Args: infile: Path to the input template to use. outfile: Path to the input file to create. timestep: The time step to use for the simulation. nsteps: The number of PyRETIS steps to perform. subcycles: The number of sub-cycles to perform. posfile: The (base)name for the input file to read positions from. vel: The velocities to set in the input. name: A name for the CP2K project. print_freq: How often we should print to the trajectory file. """ if print_freq is None: print_freq = subcycles to_update: Dict[str, Any] = { "GLOBAL": { "data": [f"PROJECT {name}", "RUN_TYPE MD", "PRINT_LEVEL LOW"], "replace": True, }, "MOTION->MD": { "data": {"STEPS": nsteps * subcycles, "TIMESTEP": timestep} }, "MOTION->PRINT->RESTART": { "data": ["BACKUP_COPIES 0"], "replace": True, }, "MOTION->PRINT->RESTART->EACH": {"data": {"MD": print_freq}}, "MOTION->PRINT->VELOCITIES->EACH": {"data": {"MD": print_freq}}, "MOTION->PRINT->TRAJECTORY->EACH": {"data": {"MD": print_freq}}, "FORCE_EVAL->SUBSYS->TOPOLOGY": { "data": {"COORD_FILE_NAME": posfile, "COORD_FILE_FORMAT": "xyz"} }, "FORCE_EVAL->SUBSYS->VELOCITY": { "data": [], "replace": True, }, "FORCE_EVAL->DFT->SCF->PRINT->RESTART": { "data": ["BACKUP_COPIES 0"], "replace": True, }, "FORCE_EVAL->DFT": { "data": {"WFN_RESTART_FILE_NAME": restart_wfn}, }, } for veli in vel: to_update["FORCE_EVAL->SUBSYS->VELOCITY"]["data"].append( f"{veli[0]} {veli[1]} {veli[2]}" ) remove = ["EXT_RESTART", "FORCE_EVAL->SUBSYS->COORD"] update_cp2k_input(infile, outfile, update=to_update, remove=remove)
[docs]class CP2KEngine(CP2KEngineSteps): """Streaming CP2K engine. This engine shares its configuration and input convention with the relaunch engine :py:class:`.CP2KEngineSteps` (same ``[engine]`` settings, same ``initial.xyz`` / ``cp2k.inp`` / ``extra_files`` input directory) so a user can switch ``class = cp2k`` <-> ``class = cp2k_steps`` and have it just work -- as for the two GROMACS and LAMMPS engines. The two engines build the CP2K run from the *same* input template; they differ only in how the trajectory is consumed: this engine runs CP2K as a single persistent process and reads its position/velocity trajectories on the fly, terminating CP2K early when an interface is crossed. The wavefunction is warm-started from a per-ensemble ``.wfn`` restart for speed. Attributes: cp2k: The command for executing CP2K. input_path: The directory where the input files are stored. timestep: The time step used in the CP2K MD simulation. subcycles: The number of steps each CP2K run is composed of. extra_files: List of extra files which may be required to run CP2K. sleep: A time in seconds, used to wait for files to be ready. """ # CP2K drives a real external binary, so it is an ``external`` engine. # Set it explicitly so the native code paths that branch on # ``engine_type`` (e.g. ``initiate_load`` external branch, # ``simulationio`` particle handling) keep the external behaviour. # ``needs_order`` is ``True``: CP2K does not compute the order # parameter itself -- this engine calls ``self.calculate_order`` with # an injected ``self.order_function``. ``default_units = 'cp2k'`` is # inherited from :py:class:`.CP2KEngineSteps`. engine_type = "external" needs_order = True
[docs] def __init__( self, cp2k: str, input_path: str, timestep: float, subcycles: int, temperature: Optional[float] = None, extra_files: Optional[List[str]] = None, exe_path: Union[str, Path] = Path(".").resolve(), sleep: float = 0.1, ): """Set up the CP2K MD engine. Args: cp2k: The CP2K executable. input_path: The path to the directory containing CP2K input files. timestep: The time step used in the CP2K simulation. subcycles: The number of steps each CP2K run is composed of. temperature: The temperature of the simulation. extra_files: List of extra files which may be required to run CP2K. exe_path: The path on which the engine is executed sleep: A time in seconds, used to wait for files to be ready. """ # Delegate the shared setup (command, input path, input-file # discovery, ``self.ext``, ``extra_files``) to the relaunch base, # so the two engines read the SAME ``initial.xyz`` + ``cp2k.inp`` # by the SAME convention. The base signature uses ``conf`` / # ``template`` keywords; pass the CP2K defaults explicitly. super().__init__( cp2k, input_path, timestep, subcycles, extra_files=extra_files, exe_path=exe_path, conf="initial.xyz", template="cp2k.inp", ) # Streaming-specific state. The native base defaults ``exe_dir`` to # ``None`` and defines no step counter; the infinite-swapping # scheduler sets the per-worker exe_dir via ``set_mdrun`` but still # expects the inf-base defaults for standalone use (``"."``) and # reads ``self.steps`` for subcycle accounting (see # ``_tis_inf.run_md``). Restore both here. self.exe_dir = "." self.steps = 0 self.name = "cp2k" self.sleep = sleep # add mass, temperature and unit information to engine # which is needed for velocity modification _, _, _, atoms = self._read_configuration(self.input_files["conf"]) mass = [] if atoms is not None: mass = [ guess_particle_mass(i, name) for i, name in enumerate(atoms) ] self.mass = np.reshape(mass, (len(mass), 1)) # The MD temperature lives in the cp2k.inp template. The [engine] # ``temperature`` setting is an optional cross-check: when omitted, # the template value is used, so the SAME config + input directory # drives either engine (``class = cp2k`` <-> ``cp2k_steps``) -- the # relaunch engine never needs a ``temperature`` setting. If the # template sets no temperature (NVE with NVT shooting), it stays # ``None`` and velocity generation is unavailable (as before). section = "MOTION->MD" nodes = read_cp2k_input(self.input_files["template"]) node_ref = set_parents(nodes) md_settings = node_ref[section] cp2k_temp = None for data in md_settings.data: if "temperature" in data.lower(): cp2k_temp = float(data.split()[1]) if temperature is None: temperature = cp2k_temp elif cp2k_temp is not None and cp2k_temp != temperature: msg = ( f"The temperature in the cp2k input {cp2k_temp} !=" + f" {temperature} which is specified in the .toml. " + "They should be the same since we generate " + "velocities internally at the .toml temperature." ) raise ValueError(msg) self.temperature = temperature self.kb = 3.16681534e-6 # hartree self._beta = ( None if temperature is None else 1 / (temperature * self.kb) )
# ``self.extra_files`` is built by the relaunch base from the same # ``extra_files`` argument, so it is not re-done here. @property def beta(self): """The thermodynamic temperature in units of the engine.""" return self._beta
[docs] def step(self, system, name): """Perform a single step with the engine. CP2K is propagated via :py:meth:`_propagate_from`; a bare single :py:meth:`step` is not used by the infinite-swapping scheduler. The method exists only so the class can be instantiated (the native external base declares it abstract). """ raise NotImplementedError( "CP2KEngine does not support single-step integration " '("step()"); it is propagated via "_propagate_from".' )
[docs] @staticmethod def add_to_path(path, phase_point, left, right): """Add a phase point and perform some checks. This is the three-value variant (status, success, stop) that :py:meth:`_propagate_from` unpacks. It is kept verbatim from the inf base (the native ``EngineBase.add_to_path`` returns a four-tuple and uses a different acceptance check), so the propagation behaviour stays byte-identical. """ status = "Running propagate..." success = False stop = False path.append(phase_point) if path.phasepoints[-1].order[0] < left: status = "Crossed left interface!" success = True stop = True elif path.phasepoints[-1].order[0] > right: status = "Crossed right interface!" success = True stop = True elif path.maxlen is not None and path.length >= path.maxlen: status = "Max. path length exceeded" success = False stop = True return status, success, stop
[docs] def calculate_order(self, system, xyz=None, vel=None, box=None): """Calculate the order parameter of the current system. Overrides the native ``ExternalMDEngine.calculate_order`` because this engine's :py:meth:`_read_configuration` returns the box as the *third* element (``xyz, vel, box, names``), whereas the native method reads it as box-first. The body matches the inf base so the order-parameter evaluation stays byte-identical. """ if any((xyz is None, vel is None, box is None)): out = self._read_configuration(system.config[0]) xyz = out[0] vel = out[1] box = out[2] if xyz is not None: system.pos = xyz if vel is not None: system.vel = vel * -1.0 if system.vel_rev else vel if box is not None: update_system_box(system, box) if self.order_function is None: raise ValueError("Order parameter is not defined!") return self.order_function.calculate(system)
[docs] def dump_phasepoint(self, phasepoint, deffnm="conf"): """Dump the current configuration from a system object to a file. Kept from the inf base: it points the phase point at frame index ``0`` via ``set_pos`` (the native external override instead sets ``config = (pos_file, None)``). The inf scheduler (``_tis_inf.dump_phasepoint`` call sites) relies on the index-0 form, so this preserves byte-identical behaviour. """ pos_file = self.dump_config(phasepoint.config, deffnm=deffnm) phasepoint.set_pos((pos_file, 0))
[docs] def draw_maxwellian_velocities(self, vel, mass, beta, rgen, sigma_v=None): """Draw velocities from a Gaussian distribution. The native bases do not provide this helper, so it is restored verbatim from the inf base (``modify_velocities`` calls it). The ``rgen`` is the injected velocity-generation generator. """ if sigma_v is None or np.any(sigma_v < 0.0): kbt = 1.0 / beta sigma_v = np.sqrt(kbt * (1 / mass)) npart, dim = vel.shape vel = rgen.normal(loc=0.0, scale=sigma_v, size=(npart, dim)) return vel, sigma_v
# ``_extract_frame`` is inherited from :py:class:`.CP2KEngineSteps` # (the two implementations had byte-identical executable bodies; only # docstring style and line-wrapping differed).
[docs] def _propagate_from( self, name: str, path: InfPath, system: System, ens_set: Dict[str, Any], msg_file: FileIO, reverse: bool = False, ) -> Tuple[bool, str]: """ Propagate with CP2K from the current system configuration. Here, we assume that this method is called after the propagate() has been called in the parent. The parent is then responsible for reversing the velocities and also for setting the initial state of the system. Note that the on-the-fly reading of data is currently only applicable for NVT simulations, as no box information is read from CP2K. Args: name: A name to use for the trajectory we are generating. path: This is the path we use to fill in phase-space points. system: The phase point to use as a starting point. ens_set: Settings for the ensembles. msg_file: An object we use for writing out messages that are useful for inspecting the status of the current propagation. reverse: If True, the system will be propagated backward in time. Returns: A tuple containing: - True if we generated an acceptable path, False, otherwise. - A text description of the status of the propagation. Can be used to interpret the reason for the path not being acceptable. """ # Lazy import (see module-level note) to avoid the _parts -> setup # circular import at module load. from pyretis.engines._parts import ReadAndProcessOnTheFly, xyz_reader status = f"propagating with CP2K (reverse = {reverse})" interfaces = ens_set["interfaces"] logger.debug(status) success = False left, _, right = interfaces logger.debug("Adding input files for CP2K") # First, copy the required input files: self.add_input_files(self.exe_dir) # Get positions and velocities from the input file. initial_conf = system.config[0] xyz, vel, box, atoms = self._read_configuration(initial_conf) if box is None: box, _ = read_cp2k_box(self.input_files["template"]) # Add CP2K input for N steps: run_input = os.path.join(self.exe_dir, "run.inp") # get ensemble from msg_file of format "msg-ens_.." ens = os.path.basename(msg_file.filename)[4:7] restart_wfn = os.path.join(self.input_path, ens + ".wfn") write_for_run_vel( self.input_files["template"], run_input, self.timestep, path.maxlen, self.subcycles, os.path.basename(initial_conf), vel, name=name, restart_wfn=restart_wfn, ) # Get the order parameter before the run: order = self.calculate_order(system, xyz=xyz, vel=vel, box=box) traj_file = os.path.join(self.exe_dir, f"{name}.{self.ext}") # Create a message file with some info about this run: msg_file.write( f'# Initial order parameter: {" ".join([str(i) for i in order])}' ) msg_file.write(f"# Trajectory file is: {traj_file}") # Get CP2K output files: out_files = {} for key, val in OUTPUT_FILES.items(): out_files[key] = os.path.join(self.exe_dir, val.format(name)) restart_file = os.path.join(self.exe_dir, out_files["restart"]) prestart_file = os.path.join(self.exe_dir, "previous.restart") wave_file = os.path.join(self.exe_dir, out_files["wfn"]) pwave_file = os.path.join(self.exe_dir, "previous.wfn") # CP2K runner logger.debug("Executing CP2K %s: %s", name, "run.inp") cmd = self.cp2k + ["-i", "run.inp"] cwd = self.exe_dir inputs = None # from external.exe_command cmd2 = " ".join(cmd) logger.debug("Executing: %s", cmd2) out_name = "stdout.txt" err_name = "stderr.txt" if cwd: out_name = os.path.join(cwd, out_name) err_name = os.path.join(cwd, err_name) return_code = None cp2k_was_terminated = False with open(out_name, "wb") as fout, open(err_name, "wb") as ferr: exe = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=fout, stderr=ferr, shell=False, # nosec B603 cwd=cwd, preexec_fn=os.setsid, ) # wait for trajectories to appear while not os.path.exists(out_files["pos"]) or not os.path.exists( out_files["vel"] ): time.sleep(self.sleep) if exe.poll() is not None: logger.debug("CP2K execution stopped") break # CP2K may have finished after last checking files # or it may have crashed without writing the files if exe.poll() is None or exe.returncode == 0: pos_reader = ReadAndProcessOnTheFly( out_files["pos"], xyz_reader ) vel_reader = ReadAndProcessOnTheFly( out_files["vel"], xyz_reader ) # start reading on the fly as CP2K is still running # if it stops, perform one more iteration to read # the remaining content in the files. Note that we assume here # that CP2K writes in blocks of frames, and never partially # finished frames. iterations_after_stop = 0 step_nr = 0 pos_traj = [] vel_traj = [] while exe.poll() is None or iterations_after_stop <= 1: # we may still have some data in one of the trajectories # so use += here pos_traj += pos_reader.read_and_process_content() vel_traj += vel_reader.read_and_process_content() # loop over the frames that are ready for _ in range(min(len(pos_traj), len(vel_traj))): pos = pos_traj.pop(0) vel = vel_traj.pop(0) write_xyz_trajectory(traj_file, pos, vel, atoms, box) # calculate order, check for crossings, etc order = self.calculate_order( system, xyz=pos, vel=vel, box=box ) msg_file.write( f'{step_nr} {" ".join([str(j) for j in order])}' ) snapshot = { "order": order, "config": (traj_file, step_nr), "vel_rev": reverse, } phase_point = self.snapshot_to_system(system, snapshot) status, success, stop = self.add_to_path( path, phase_point, left, right ) if stop: # process may have terminated since we last checked if exe.poll() is None: logger.debug("Terminating CP2K execution") os.killpg(os.getpgid(exe.pid), signal.SIGTERM) # wait for process to die, necessary for mpi exe.wait(timeout=360) logger.debug( "CP2K propagation ended at %i. Reason: %s", step_nr, status, ) # exit while loop without reading additional data iterations_after_stop = 2 cp2k_was_terminated = True break step_nr += 1 time.sleep(self.sleep) # if CP2K finished, we run one more loop if exe.poll() is not None and iterations_after_stop <= 1: iterations_after_stop += 1 return_code = exe.returncode if return_code != 0 and not cp2k_was_terminated: logger.error( "Execution of external program (%s) failed!", self.description, ) logger.error("Attempted command: %s", cmd2) logger.error("Execution directory: %s", cwd) if inputs is not None: logger.error("Input to external program was: %s", inputs) logger.error( "Return code from external program: %i", return_code ) logger.error("STDOUT, see file: %s", out_name) logger.error("STDERR, see file: %s", err_name) msg = ( f"Execution of external program ({self.description}) " f"failed with command:\n {cmd2}.\n" f"Return code: {return_code}" ) raise RuntimeError(msg) if (return_code is not None) and ( return_code == 0 or cp2k_was_terminated ): self._removefile(out_name) self._removefile(err_name) msg_file.write("# Propagation done.") energy_file = out_files["energy"] msg_file.write(f"# Reading energies from: {energy_file}") energy = read_cp2k_energy(energy_file) end = (step_nr + 1) * self.subcycles ekin: np.ndarray = energy.get("ekin", np.array([])) vpot: np.ndarray = energy.get("vpot", np.array([])) etot: np.ndarray = energy.get("etot", np.array([])) temp: np.ndarray = energy.get("temp", np.array([])) path.update_energies( ekin[:end:self.subcycles], vpot[:end:self.subcycles], etot[:end:self.subcycles], temp[:end:self.subcycles], ) # copy .wfn file for future use if os.path.isfile(wave_file): self._copyfile(wave_file, restart_wfn) for _, files in out_files.items(): self._removefile(files) self._removefile(prestart_file) self._removefile(pwave_file) self._removefile(run_input) self._removefile(restart_file) self._removefile(wave_file) # Mirror the inf base ``propagate``: accumulate the MD steps this # segment consumed so the scheduler's subcycle accounting stays # byte-identical (the native ``propagate`` does not increment it). self.steps += path.length return success, status
# ``add_input_files`` is inherited from :py:class:`.CP2KEngineSteps` # (the two implementations had byte-identical executable bodies; only # docstring style and line-wrapping differed).
[docs] @staticmethod def _read_configuration( filename: Union[str, Path], ) -> Tuple[ np.ndarray, np.ndarray, Optional[np.ndarray], Optional[List[str]] ]: """Read a CP2K output configuration from a file. This streaming engine returns the box as the THIRD element (``xyz, vel, box, names``), so the override is kept verbatim. The relaunch :py:meth:`.CP2KEngineSteps._read_configuration` returns ``box`` first instead; :py:meth:`calculate_order` and :py:meth:`_propagate_from` rely on the box-third order, so this cannot be inherited. Args: filename: Path to the file to read. Returns: A tuple containing: - The positions. - The velocities. - The box dimensions if we manage to read it. - The atom names found in the file. """ for snapshot in read_xyz_file(filename): box, xyz, vel, names = convert_snapshot(snapshot) break # Stop after the first snapshot. return xyz, vel, box, names
[docs] def set_mdrun(self, md_items: Dict[str, Any]) -> None: """Set the execute directory.""" # TODO: REMOVE OR RENAME? self.exe_dir = md_items["exe_dir"]
[docs] def _reverse_velocities(self, filename: str, outfile: str) -> None: """Reverse velocity in a given snapshot. Kept as an override because it unpacks this engine's box-third :py:meth:`_read_configuration` (``xyz, vel, box, names``); the relaunch base unpacks box first. Args: filename: Path to the file containing the configuration to reverse velocities in. outfile: Path to file for storing the configuration with reversed velocities. """ xyz, vel, box, names = self._read_configuration(filename) write_xyz_trajectory( outfile, xyz, -1.0 * vel, names, box, append=False )
[docs] def modify_velocities( self, system: System, vel_settings: Dict[str, Any], rgen ) -> Tuple[float, float]: """Modify the velocities of all particles. Args: system: The system whose particle velocities are to be modified. vel_settings: A dict containing the key ``'zero_momentum'`` (boolean): if true we reset the linear momentum to zero after generating velocities internally. Returns: A tuple containing: - dek: The change in kinetic energy as a result of the velocity modification. - kin_new: The new kinetic energy of the system. Note: CP2K removes the center of mass motion by default, so we set the momentum to zero by default here. This method does **not** take care of constraints. """ # Lazy import (see module-level note). from pyretis.engines._parts import kinetic_energy, reset_momentum mass = self.mass beta = self.beta pos = self.dump_frame(system) xyz, vel, box, atoms = self._read_configuration(pos) kin_old = kinetic_energy(vel, mass)[0] # TO DO: Is this check neccessary? if box is None: box, _ = read_cp2k_box(self.input_files["template"]) vel, _ = self.draw_maxwellian_velocities(vel, mass, beta, rgen) # reset momentum by default in cp2k if vel_settings.get("zero_momentum", True): vel = reset_momentum(vel, mass) conf_out = os.path.join(self.exe_dir, f"genvel.{self.ext}") write_xyz_trajectory(conf_out, xyz, vel, atoms, box, append=False) kin_new = kinetic_energy(vel, mass)[0] system.config = (conf_out, 0) system.ekin = kin_new if kin_old == 0.0: dek = float("inf") logger.debug( "Kinetic energy not found for previous point." "\n(This happens when the initial configuration " "does not contain energies.)" ) else: dek = kin_new - kin_old return dek, kin_new