"""Engine factory for keyed engine creation (infRETIS flavour).
This module supplements :py:mod:`pyretis.engines` with the keyed-creation
helpers used by the infRETIS scheduler. Unlike
:py:func:`pyretis.engines.engine_factory` and
:py:func:`pyretis.setup.common.create_engine`, which expect the engine
settings to sit at the top-level ``"engine"`` key, ``create_inf_engine``
takes an explicit ``eng_key`` so multiple named engines (``engine``,
``engine2``, etc.) can coexist in one config.
The two call paths are deliberately kept separate so that pyretis-old
behaviour stays reachable while the infRETIS-flavour scheduler also
works inside the same package:
* ``from pyretis.setup.common import create_engine`` -- pyretis-native
single-engine creation, resolved through the untouched
:py:data:`pyretis.engines.ENGINE_MAP`.
* ``from pyretis.engines.factory import create_inf_engine`` --
infRETIS-flavour keyed creation, resolved through the
:py:func:`build_engine_map` result (which shadows the native
``cp2k`` / ``gromacs`` / ``lammps`` entries with the ``_inf``
classes). ``ENGINE_MAP`` is never mutated.
"""
from __future__ import annotations
import logging
import os
from typing import Any, Dict, List, Optional, Tuple
from pyretis.core.common import generic_factory, import_from, initiate_instance
from pyretis.engines import ENGINE_MAP
from pyretis.setup.common import create_external
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
__all__ = [
"assign_engines",
"build_engine_map",
"check_engine",
"create_inf_engine",
"create_engines",
"import_external",
]
[docs]def import_external(
settings: Dict[str, Any],
key: str,
required_methods: Optional[List[str]] = None,
) -> Any:
"""Import and instantiate a class described by ``settings``.
Equivalent to ``infretis.core.core.create_external``: the
``settings`` dict must hold ``"class"`` and ``"module"`` keys; the
module is looked up relative to the current directory and, failing
that, ``settings["simulation"]["exe_path"]``.
This complements :py:func:`pyretis.setup.common.create_external`,
which requires a ``factory`` (engine map) argument and is geared
toward the pyretis-native engine creation flow.
Parameters
----------
settings : dict
Settings describing the object to load.
key : str
Label used in error messages.
required_methods : list of str, optional
Method names that the imported class must expose.
"""
klass = settings.get("class", "")
module = settings.get("module", "")
if os.path.isfile(module):
obj = import_from(module, klass)
elif "exe_path" in settings.get("simulation", []):
module = os.path.join(settings["simulation"]["exe_path"], module)
obj = import_from(module, klass)
else:
raise ValueError(f'Could not find module "{module}" for {key}!')
for fn_name in required_methods or []:
fn = getattr(obj, fn_name, None)
if fn is None or not callable(fn):
msg = f"Method {klass}.{fn_name} missing or not callable"
logger.critical(msg)
raise ValueError(msg)
return initiate_instance(obj, settings)
[docs]def build_engine_map() -> Dict[str, Dict[str, Any]]:
"""Return ENGINE_MAP extended with infRETIS-ported engines.
Importable engines are added to the map; missing optional
dependencies (e.g. ``scm.plams`` for AMS, ``turtlemd``) just lead
to that entry being skipped.
"""
extended: Dict[str, Dict[str, Any]] = dict(ENGINE_MAP)
_try_register(extended, "ase", "pyretis.engines.ase", "ASEEngine")
_try_register(extended, "ams", "pyretis.engines.ams", "AMSEngine")
_try_register(
extended, "turtlemd", "pyretis.engines.turtlemd", "TurtleMDEngine"
)
# 4b-2b inf-engine merge COMPLETE: ``cp2k`` / ``gromacs`` / ``lammps``
# are all merged and resolve to their canonical engines in the untouched
# ENGINE_MAP -- no ``_inf`` shadow remains. The scheduler and the
# pyretis-native flow now share one engine class per backend:
# * ``gromacs`` -> gromacs.GromacsEngine (native streaming engine, with
# the merged velocity_generation/ref-t handling). Relaunch:
# ``gromacs_steps``.
# * ``lammps`` -> lammps.LAMMPSEngine (continuous, adopted from the
# inf engine). Relaunch / old-pyretis: ``lammps_steps``.
# * ``cp2k`` -> cp2k.CP2KEngine (continuous, adopted from the inf
# engine: persistent process + on-the-fly reading + wfn warm-start).
# Relaunch / restart-chain old-pyretis: ``cp2k_steps``.
# ``build_engine_map`` now only adds the optional-dependency engines
# (ase / ams / turtlemd) on top of ENGINE_MAP.
return extended
[docs]def _try_register(
target: Dict[str, Dict[str, Any]],
key: str,
module_path: str,
class_name: str,
) -> None:
"""Attempt to import ``class_name`` from ``module_path``.
Adds ``{key: {'cls': cls}}`` to ``target`` on success. Failures
(typically missing optional dependencies for the engine itself)
are silently ignored.
"""
try:
module = __import__(module_path, fromlist=[class_name])
target[key] = {"cls": getattr(module, class_name)}
except ImportError:
logger.debug("Engine %s unavailable (module %s)", key, module_path)
[docs]def create_inf_engine(
settings: Dict[str, Any], eng_key: str = "engine"
) -> Optional[Any]:
"""Create an infRETIS engine from a settings section.
Counterpart to :py:func:`pyretis.setup.common.create_engine` for the
infRETIS scheduler: takes an explicit ``eng_key`` so multiple named
engines (``engine``, ``engine2``, ...) can coexist in one config,
and resolves engine classes through :py:func:`build_engine_map`
(which shadows pyretis-native ``cp2k`` / ``gromacs`` / ``lammps``
entries with the ``_inf`` flavours). The pyretis-native
:py:func:`pyretis.setup.common.create_engine` stays the entry
point for pyretis-old runs.
Parameters
----------
settings : dict
Full simulation settings. The engine settings live under
``settings[eng_key]``.
eng_key : str
Which engine section to instantiate (``"engine"``,
``"engine2"``, ...).
Returns
-------
Optional[EngineBase]
The created engine, or the result of
:py:func:`pyretis.setup.common.create_external` if the engine
class is not in the extended engine map.
"""
if eng_key not in settings:
logger.critical("Missing engine section [%s] in settings", eng_key)
return None
engine_map = build_engine_map()
klass = str(settings[eng_key].get("class", "")).lower()
if klass not in engine_map:
return create_external(
settings[eng_key], eng_key, engine_map, []
)
return generic_factory(settings[eng_key], engine_map, name=eng_key)
[docs]def check_engine(settings: Dict[str, Any], eng_key: str = "engine") -> bool:
"""Light-touch validation of an engine settings block.
Mirrors infretis ``factory.check_engine``: ensures the section
exists and that file-format-bearing engines specify their format.
"""
problems = []
if eng_key not in settings:
problems.append(f"The section [{eng_key}] is missing")
# No section to inspect further; surface and stop.
logger.critical("\n".join(problems))
return False
section = settings[eng_key]
if "gmx" in section and "gmx_format" not in section:
problems.append("File format is not specified for the engine")
elif "cp2k" in section and "cp2k_format" not in section:
problems.append("File format is not specified for the engine")
if problems:
logger.critical("\n".join(problems))
return False
return True
[docs]def create_engines(
config: Dict[str, Any],
) -> Tuple[Dict[Any, Any], Dict[Any, list]]:
"""Build the engine pool for an infRETIS-style parallel run.
For each unique engine name referenced in
``config["simulation"]["ensemble_engines"]``, instantiate
``min(occurrences, n_workers)`` copies and pair them with a
per-instance occupancy slot initialised to ``-1`` (free).
Returns ``(engines, engine_occ)`` where:
* ``engines[name]`` is the list of instantiated engines.
* ``engine_occ[name][i]`` is the worker pin currently using
``engines[name][i]``, or ``-1`` when free.
"""
engine_count: Dict[str, int] = {}
for engine in config["simulation"]["ensemble_engines"]:
for engine_i in engine:
engine_count[engine_i] = engine_count.get(engine_i, 0) + 1
engines: Dict[str, list] = {}
engine_occ: Dict[str, list] = {}
for engine, n_engine in engine_count.items():
engines[engine] = []
engine_occ[engine] = []
n_create = min(n_engine, config["runner"]["workers"])
for _ in range(n_create):
check_engine(config, eng_key=engine)
engine_occ[engine].append(-1)
engines[engine].append(create_inf_engine(config, eng_key=engine))
return engines, engine_occ
[docs]def assign_engines(
engine_occ: Dict[str, list], eng_names, pin
) -> Dict[Any, int]:
"""Assign free engine instances to worker ``pin``.
Releases any engines currently held by ``pin``, then claims one
free instance of each engine type in ``eng_names`` for that
worker. Returns ``{name: index}`` mapping each requested engine
name to the index inside ``engine_occ[name]`` now owned by
``pin``.
Raises
------
ValueError
If no free engine is available for one of the requested
names (this should never happen with a correctly sized pool).
"""
# Release engines previously held by this worker.
for eng_key in engine_occ:
for i, occupied_by in enumerate(engine_occ[eng_key]):
if pin == occupied_by:
engine_occ[eng_key][i] = -1
out: Dict[Any, int] = {}
for eng_key in eng_names:
for i, occupied_by in enumerate(engine_occ[eng_key]):
if occupied_by == -1:
engine_occ[eng_key][i] = pin
out[eng_key] = i
break
if i == len(engine_occ[eng_key]):
raise ValueError(
f"All engines '{eng_key}' are occupied -- this "
"indicates an undersized pool."
)
if not out:
raise ValueError(
"Did not find a free engine for any requested name."
)
return out