Source code for pyretis.engines.factory

"""Engine factory for keyed engine creation (infRETIS flavour).

This module supplements :py:mod:`pyretis.engines` with the keyed-creation
helpers used by the infRETIS scheduler. Unlike
:py:func:`pyretis.engines.engine_factory` and
:py:func:`pyretis.setup.common.create_engine`, which expect the engine
settings to sit at the top-level ``"engine"`` key, ``create_inf_engine``
takes an explicit ``eng_key`` so multiple named engines (``engine``,
``engine2``, etc.) can coexist in one config.

The two call paths are deliberately kept separate so that pyretis-old
behaviour stays reachable while the infRETIS-flavour scheduler also
works inside the same package:

* ``from pyretis.setup.common import create_engine`` -- pyretis-native
  single-engine creation, resolved through the untouched
  :py:data:`pyretis.engines.ENGINE_MAP`.
* ``from pyretis.engines.factory import create_inf_engine`` --
  infRETIS-flavour keyed creation, resolved through the
  :py:func:`build_engine_map` result (which shadows the native
  ``cp2k`` / ``gromacs`` / ``lammps`` entries with the ``_inf``
  classes). ``ENGINE_MAP`` is never mutated.
"""

from __future__ import annotations

import logging
import os
from typing import Any, Dict, List, Optional, Tuple

from pyretis.core.common import generic_factory, import_from, initiate_instance
from pyretis.engines import ENGINE_MAP
from pyretis.setup.common import create_external

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

__all__ = [
    "assign_engines",
    "build_engine_map",
    "check_engine",
    "create_inf_engine",
    "create_engines",
    "import_external",
]


[docs]def import_external( settings: Dict[str, Any], key: str, required_methods: Optional[List[str]] = None, ) -> Any: """Import and instantiate a class described by ``settings``. Equivalent to ``infretis.core.core.create_external``: the ``settings`` dict must hold ``"class"`` and ``"module"`` keys; the module is looked up relative to the current directory and, failing that, ``settings["simulation"]["exe_path"]``. This complements :py:func:`pyretis.setup.common.create_external`, which requires a ``factory`` (engine map) argument and is geared toward the pyretis-native engine creation flow. Parameters ---------- settings : dict Settings describing the object to load. key : str Label used in error messages. required_methods : list of str, optional Method names that the imported class must expose. """ klass = settings.get("class", "") module = settings.get("module", "") if os.path.isfile(module): obj = import_from(module, klass) elif "exe_path" in settings.get("simulation", []): module = os.path.join(settings["simulation"]["exe_path"], module) obj = import_from(module, klass) else: raise ValueError(f'Could not find module "{module}" for {key}!') for fn_name in required_methods or []: fn = getattr(obj, fn_name, None) if fn is None or not callable(fn): msg = f"Method {klass}.{fn_name} missing or not callable" logger.critical(msg) raise ValueError(msg) return initiate_instance(obj, settings)
[docs]def build_engine_map() -> Dict[str, Dict[str, Any]]: """Return ENGINE_MAP extended with infRETIS-ported engines. Importable engines are added to the map; missing optional dependencies (e.g. ``scm.plams`` for AMS, ``turtlemd``) just lead to that entry being skipped. """ extended: Dict[str, Dict[str, Any]] = dict(ENGINE_MAP) _try_register(extended, "ase", "pyretis.engines.ase", "ASEEngine") _try_register(extended, "ams", "pyretis.engines.ams", "AMSEngine") _try_register( extended, "turtlemd", "pyretis.engines.turtlemd", "TurtleMDEngine" ) # 4b-2b inf-engine merge COMPLETE: ``cp2k`` / ``gromacs`` / ``lammps`` # are all merged and resolve to their canonical engines in the untouched # ENGINE_MAP -- no ``_inf`` shadow remains. The scheduler and the # pyretis-native flow now share one engine class per backend: # * ``gromacs`` -> gromacs.GromacsEngine (native streaming engine, with # the merged velocity_generation/ref-t handling). Relaunch: # ``gromacs_steps``. # * ``lammps`` -> lammps.LAMMPSEngine (continuous, adopted from the # inf engine). Relaunch / old-pyretis: ``lammps_steps``. # * ``cp2k`` -> cp2k.CP2KEngine (continuous, adopted from the inf # engine: persistent process + on-the-fly reading + wfn warm-start). # Relaunch / restart-chain old-pyretis: ``cp2k_steps``. # ``build_engine_map`` now only adds the optional-dependency engines # (ase / ams / turtlemd) on top of ENGINE_MAP. return extended
[docs]def _try_register( target: Dict[str, Dict[str, Any]], key: str, module_path: str, class_name: str, ) -> None: """Attempt to import ``class_name`` from ``module_path``. Adds ``{key: {'cls': cls}}`` to ``target`` on success. Failures (typically missing optional dependencies for the engine itself) are silently ignored. """ try: module = __import__(module_path, fromlist=[class_name]) target[key] = {"cls": getattr(module, class_name)} except ImportError: logger.debug("Engine %s unavailable (module %s)", key, module_path)
[docs]def create_inf_engine( settings: Dict[str, Any], eng_key: str = "engine" ) -> Optional[Any]: """Create an infRETIS engine from a settings section. Counterpart to :py:func:`pyretis.setup.common.create_engine` for the infRETIS scheduler: takes an explicit ``eng_key`` so multiple named engines (``engine``, ``engine2``, ...) can coexist in one config, and resolves engine classes through :py:func:`build_engine_map` (which shadows pyretis-native ``cp2k`` / ``gromacs`` / ``lammps`` entries with the ``_inf`` flavours). The pyretis-native :py:func:`pyretis.setup.common.create_engine` stays the entry point for pyretis-old runs. Parameters ---------- settings : dict Full simulation settings. The engine settings live under ``settings[eng_key]``. eng_key : str Which engine section to instantiate (``"engine"``, ``"engine2"``, ...). Returns ------- Optional[EngineBase] The created engine, or the result of :py:func:`pyretis.setup.common.create_external` if the engine class is not in the extended engine map. """ if eng_key not in settings: logger.critical("Missing engine section [%s] in settings", eng_key) return None engine_map = build_engine_map() klass = str(settings[eng_key].get("class", "")).lower() if klass not in engine_map: return create_external( settings[eng_key], eng_key, engine_map, [] ) return generic_factory(settings[eng_key], engine_map, name=eng_key)
[docs]def check_engine(settings: Dict[str, Any], eng_key: str = "engine") -> bool: """Light-touch validation of an engine settings block. Mirrors infretis ``factory.check_engine``: ensures the section exists and that file-format-bearing engines specify their format. """ problems = [] if eng_key not in settings: problems.append(f"The section [{eng_key}] is missing") # No section to inspect further; surface and stop. logger.critical("\n".join(problems)) return False section = settings[eng_key] if "gmx" in section and "gmx_format" not in section: problems.append("File format is not specified for the engine") elif "cp2k" in section and "cp2k_format" not in section: problems.append("File format is not specified for the engine") if problems: logger.critical("\n".join(problems)) return False return True
[docs]def create_engines( config: Dict[str, Any], ) -> Tuple[Dict[Any, Any], Dict[Any, list]]: """Build the engine pool for an infRETIS-style parallel run. For each unique engine name referenced in ``config["simulation"]["ensemble_engines"]``, instantiate ``min(occurrences, n_workers)`` copies and pair them with a per-instance occupancy slot initialised to ``-1`` (free). Returns ``(engines, engine_occ)`` where: * ``engines[name]`` is the list of instantiated engines. * ``engine_occ[name][i]`` is the worker pin currently using ``engines[name][i]``, or ``-1`` when free. """ engine_count: Dict[str, int] = {} for engine in config["simulation"]["ensemble_engines"]: for engine_i in engine: engine_count[engine_i] = engine_count.get(engine_i, 0) + 1 engines: Dict[str, list] = {} engine_occ: Dict[str, list] = {} for engine, n_engine in engine_count.items(): engines[engine] = [] engine_occ[engine] = [] n_create = min(n_engine, config["runner"]["workers"]) for _ in range(n_create): check_engine(config, eng_key=engine) engine_occ[engine].append(-1) engines[engine].append(create_inf_engine(config, eng_key=engine)) return engines, engine_occ
[docs]def assign_engines( engine_occ: Dict[str, list], eng_names, pin ) -> Dict[Any, int]: """Assign free engine instances to worker ``pin``. Releases any engines currently held by ``pin``, then claims one free instance of each engine type in ``eng_names`` for that worker. Returns ``{name: index}`` mapping each requested engine name to the index inside ``engine_occ[name]`` now owned by ``pin``. Raises ------ ValueError If no free engine is available for one of the requested names (this should never happen with a correctly sized pool). """ # Release engines previously held by this worker. for eng_key in engine_occ: for i, occupied_by in enumerate(engine_occ[eng_key]): if pin == occupied_by: engine_occ[eng_key][i] = -1 out: Dict[Any, int] = {} for eng_key in eng_names: for i, occupied_by in enumerate(engine_occ[eng_key]): if occupied_by == -1: engine_occ[eng_key][i] = pin out[eng_key] = i break if i == len(engine_occ[eng_key]): raise ValueError( f"All engines '{eng_key}' are occupied -- this " "indicates an undersized pool." ) if not out: raise ValueError( "Did not find a free engine for any requested name." ) return out