Source code for pyretis.bin.pyretisrun

#!/usr/bin/env python3
# Copyright (c) 2026, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""pyretisrun - An application for running PyRETIS simulations.

This script is a part of the PyRETIS library and can be used for
running simulations from an input script.

usage: pyretisrun.py [-h] -i INPUT [-V] [-f LOG_FILE] [-l LOG_LEVEL] [-p]

PyRETIS

optional arguments:
  -h, --help            show this help message and exit
  -i INPUT, --input INPUT
                        Location of PyRETIS input file
  -V, --version         show program's version number and exit
  -f LOG_FILE, --log_file LOG_FILE
                        Specify log file to write
  -l LOG_LEVEL, --log_level LOG_LEVEL
                        Specify log level for log file
  -p, --progress        Display a progress meter instead of text output for
                        the simulation

More information about running PyRETIS can be found at: www.pyretis.org
"""
# pylint: disable=invalid-name
import argparse
import datetime
import logging
import os
import pathlib
import signal
import sys
import traceback
import warnings

# Surface PyRETIS-owned DeprecationWarnings (e.g. the rst input
# deprecation) to CLI users. Python hides DeprecationWarning by
# default unless it comes from __main__; this filter scopes the
# override to pyretis modules only, so warnings from other libraries
# stay at the user's default behaviour.
warnings.filterwarnings('default', category=DeprecationWarning,
                        module=r'pyretis(\..*)?')
# Other libraries:
import tqdm  # noqa: E402 For a progress bar
import colorama  # noqa: E402 For coloring text
# PyRETIS library imports:
from pyretis import __version__ as VERSION  # noqa: E402
from pyretis.info import PROGRAM_NAME, URL, CITE, LOGO  # noqa: E402
from pyretis.core.pathensemble import generate_ensemble_name  # noqa: E402
from pyretis.setup import create_simulation  # noqa: E402
from pyretis.inout.screen import REFERENCE  # noqa: E402
from pyretis.inout.common import (  # noqa: E402
    check_python_version,
    create_backup,
)
from pyretis.inout.formats.formatter import (  # noqa: E402
    get_log_formatter,
    setup_console_logging,
)
from pyretis.inout.settings import (  # noqa: E402
    parse_settings_file,
    write_settings_file
)


_DATE_FMT = '%d.%m.%Y %H:%M:%S'
logger = setup_console_logging()


[docs]def use_tqdm(progress): """Return a progress bar if we want one. Parameters ---------- progress : boolean If True, we should use a progress bar, otherwise not. Returns ------- out : object like :py:class:`tqdm.tqdm` The progress bar, if requested. Otherwise, just a dummy iterator. """ if progress: pbar = tqdm.tqdm else: def empty_tqdm(*args, **kwargs): """Return an iterator to replace tqdm.""" if args: return args[0] return kwargs.get('iterable', None) pbar = empty_tqdm return pbar
[docs]def hello_world(infile, rundir, logfile): """Print out a politically correct greeting for PyRETIS. Parameters ---------- infile : string String showing the location of the input file. rundir : string String showing the location we are running in. logfile : string The output log file """ timestart = datetime.datetime.now().strftime(_DATE_FMT) pyversion = sys.version.split()[0] logger.banner('\n'.join([LOGO])) logger.banner('%s version: %s', PROGRAM_NAME, VERSION) logger.banner('Start of execution: %s', timestart) logger.banner('Python version: %s', pyversion) logger.progress('\nRunning in directory: %s', rundir) logger.progress('Input file: %s', infile) logger.progress('Log file: %s', logfile)
[docs]def bye_bye_world(): """Print out the goodbye message for PyRETIS.""" timeend = datetime.datetime.now().strftime(_DATE_FMT) msgtxt = f'End of {PROGRAM_NAME} execution: {timeend}' logger.progress(msgtxt) # display some references: references = [f'{PROGRAM_NAME} references:'] references.append(('-')*len(references[0])) for line in CITE.split('\n'): if line: references.append(line) reftxt = '\n'.join(references) logger.log(REFERENCE, '\n%s', reftxt) urltxt = str(URL) logger.log(REFERENCE, urltxt)
[docs]def run_md_flux_simulation(sim, sim_settings, progress=False): """Run a MD-FLUX simulation. Parameters ---------- sim : object like :py:class:`.Simulation` This is the simulation to run. sim_settings : dict The simulation settings. progress : boolean, optional If True, we will display a progress bar, otherwise, we print results to the screen. """ logger.progress('Starting MD-Flux simulation') tqd = use_tqdm(progress) sim.engine.exe_dir = sim_settings['simulation']['exe_path'] sim.set_up_output(sim_settings, progress=progress) for _ in tqd(sim.run(), initial=sim.cycle['startcycle'], total=sim.cycle['endcycle'], desc='MD-flux'): pass # Write final restart file: sim.write_restart(now=True) return True
[docs]def run_md_simulation(sim, sim_settings, progress=False): """Run a MD simulation. Parameters ---------- sim : object like :py:class:`.Simulation` This is the simulation to run. sim_settings : dict The simulation settings. progress : boolean, optional If True, we will display a progress bar, otherwise, we print results to the screen. """ logger.progress('Starting MD simulation') tqd = use_tqdm(progress) sim.engine.exe_dir = sim_settings['simulation']['exe_path'] sim.set_up_output(sim_settings, progress=progress) for _ in tqd(sim.run(), initial=sim.cycle['startcycle'], total=sim.cycle['endcycle'], desc='MD step'): pass # Write final restart file: sim.write_restart(now=True) return True
[docs]def explore_simulation(sim, sim_settings, progress=False): """Run a RETIS simulation with PyRETIS. Parameters ---------- sim : object like :py:class:`.Simulation` This is the simulation to run. sim_settings : dict The simulation settings. progress : boolean, optional If True, we will display a progress bar, otherwise, we print results to the screen. """ sim.set_up_output( sim_settings, progress=progress ) logtxt = 'Load frames for free energy landscape exploration' logger.progress(logtxt) # Make sure that the settings are correct. No users don't know better. for s_ens in sim_settings.get('ensemble', []): s_ens['tis']['freq'] = 0 s_ens['tis']['allowmaxlength'] = True # Here we do the initialisation: if not sim.initiate(sim_settings): logger.progress('Initiation stopped, will exit now.') return False sim.write_restart(now=True) logtxt = 'Initiation done. Exploring now.' logger.progress(logtxt) tqd = use_tqdm(progress) desc = f'{sim_settings["simulation"]["task"]} Simulation' for _ in tqd(sim.run(), initial=sim.cycle['startcycle'], total=sim.cycle['endcycle'], desc=desc): pass # Write final restart files: sim.write_restart(now=True) return True
[docs]def run_path_simulation(sim, sim_settings, progress=False): """Run a RETIS simulation with PyRETIS. Parameters ---------- sim : object like :py:class:`.Simulation` This is the simulation to run. sim_settings : dict The simulation settings. progress : boolean, optional If True, we will display a progress bar, otherwise, we print results to the screen. """ sim.set_up_output( sim_settings, progress=progress ) task = sim_settings['simulation']['task'] sep = '=' * (len(task) + 26) logtxt = f'\n{sep}\n Initialising {task} simulation.\n{sep}' logger.banner(logtxt) logger.progress('\nInitialising path ensembles:') # Here we do the initialisation: if not sim.initiate(sim_settings): logger.progress('Initiation stopped, will exit now.') return False sim.write_restart(now=True) logger.progress('\nInitiation done.') sep = '=' * (len(task) + 22) logtxt = f'\n{sep}\n Starting {task} simulation.\n{sep}' logger.banner(logtxt) tqd = use_tqdm(progress) desc = f"{sim_settings['simulation']['task']} Simulation. " for _ in tqd(sim.run(), initial=sim.cycle['startcycle'], total=sim.cycle['endcycle'], desc=desc): pass # Write final restart files: sim.write_restart(now=True) return True
[docs]def make_tis_files(_, settings, progress=False): """Create TIS simulations input files PyRETIS. It just writes out input files for single TIS simulations and exit without running a simulation. Parameters ---------- settings : list of dicts or Simulation objects The settings for the simulations. """ _ = progress logtxt = 'Input settings requests: TIS for multiple path ensembles.' logger.progress(logtxt) logtxt = 'Will create input files for the TIS simulations and exit' logger.progress(logtxt) i_ens = 0 for i, ens_settings in enumerate(settings['ensemble']): i_ens += 1 if i == 0 and not settings['simulation']['zero_ensemble']: i_ens += 1 ens_settings['simulation']['zero_ensemble'] = False ens_settings['simulation']['task'] = 'tis' ensf = generate_ensemble_name(i_ens) logtxt = f'Creating input for TIS ensemble: {i_ens} ' logger.progress(logtxt) infile = f'tis-{ensf}.rst' logtxt = f'Create file: "{infile}"' logger.progress(logtxt) exe_dir_file = os.path.join(ens_settings['engine']['exe_path'], infile) # Ensure generated TIS input files include a human-friendly # orderparameter name without mutating the caller's settings. op_section = ens_settings.get('orderparameter', {}) need_restore = False had_name_key = 'name' in op_section original_name = op_section.get('name') if had_name_key else None if not had_name_key or op_section.get('name') is None: ens_settings.setdefault('orderparameter', {}) ens_settings['orderparameter']['name'] = 'Order Parameter' need_restore = True try: write_settings_file(ens_settings, exe_dir_file, backup=False) finally: if need_restore: if had_name_key: ens_settings['orderparameter']['name'] = original_name else: # remove the temporary key we added ens_settings['orderparameter'].pop('name', None) logtxt = 'Command for executing:' logger.progress(logtxt) logtxt = f'pyretisrun -i {infile} -p -f {ensf}.log' logger.progress(logtxt) return True
[docs]def run_generic_simulation(sim, sim_settings, progress=False): """Run a generic PyRETIS simulation. These are simulations that are just going to complete a given number of steps. Other simulation may consist of several simulations tied together and these are NOT handled here. Parameters ---------- sim : object like :py:class:`.Simulation` This is the simulation to run. sim_settings : dict The simulation settings. progress : boolean, optional If True, we will display a progress bar, otherwise, we print results to the screen. """ logtxt = 'Running simulation' logger.progress(logtxt) tqd = use_tqdm(progress) sim.set_up_output(sim_settings, progress=progress) for _ in tqd(sim.run(), desc='Step'): pass # Write final restart file: sim.write_restart(now=True) return True
_RUNNERS = {'md-flux': run_md_flux_simulation, 'md-nve': run_md_simulation, 'explore': explore_simulation, 'md': run_md_simulation, 'make-tis-files': make_tis_files, 'tis': run_path_simulation, 'retis': run_path_simulation, 'pptis': run_path_simulation, 'repptis': run_path_simulation} # The input-TOML task values that select the infinite-swapping # (replica-exchange) sampler instead of the in-process simulation flow. _INFINITE_SWAPPING_TASKS = frozenset( {'infinite_swapping', 'infswap', 'infretis'} ) # Native path-sampling tasks that, as of the Stage C collapse, are run # through the infinite-swapping scheduler at n_workers=1 by default # (their native config is translated by native_to_infswap_config). Only # ``retis`` is routed: the infinite-swapping scheduler is a RETIS-family # replica-exchange sampler (it always builds the [0^-]/[0^+]/[i^+] # structure), so it subsumes RETIS one-to-one. Plain ``tis`` does NOT # route -- a TIS config can be a single [i^+] ensemble with no [0^-] and # nothing to exchange, so it has no faithful scheduler analog; it keeps # the in-process loop, together with explore / pptis / repptis. Set # PYRETIS_NATIVE_LOOP=1 to force the legacy in-process loop for retis too # (debugging / native-vs-scheduler reference comparison). _NATIVE_SCHEDULER_TASKS = frozenset({'retis'}) _NATIVE_LOOP_ENV = 'PYRETIS_NATIVE_LOOP' # Engine classes the scheduler route is VALIDATED against: the pyretis # in-process MD integrators (pyretis.engines.internal -- engine_type # 'internal'). The scheduler can in principle drive the external engines # (GROMACS / LAMMPS / CP2K / OpenMM) too -- that is what the inf flow was # built for -- but the scheduler+external integration is NOT yet verified # for the native route: routing external-engine task=retis crashed the # GROMACS/LAMMPS/OpenMM suites in test-heavy. Until that path is validated # (and its heavy, supervised goldens regenerated), external-engine retis # keeps the native in-process loop. Names are matched case-insensitively # with separators stripped (so "velocity-verlet" -> "velocityverlet"). _SCHEDULER_INTERNAL_ENGINES = frozenset({ 'langevin', 'verlet', 'velocityverlet', 'randomwalk', })
[docs]def is_infinite_swapping_config(inputfile): """Return True if the input selects the infinite-swapping sampler. The infinite-swapping (replica-exchange) sampler is selected either explicitly via ``[simulation] task = "infinite_swapping"`` or implicitly by the presence of the ``[runner]`` worker-pool section that only that sampler's input schema uses. Only ``.toml`` inputs are considered; the in-process simulation flow handles everything else. Parameters ---------- inputfile : string Path to the input file. Returns ------- boolean True if the infinite-swapping scheduler should run this input. """ if os.path.splitext(inputfile)[1].lower() != '.toml': return False if not os.path.isfile(inputfile): # main() calls this before set_up_simulation, and the open() below # would raise a bare FileNotFoundError on a missing .toml. We # cannot inspect a file that is not there anyway, so return False: # main() then routes to set_up_simulation, whose descriptive # "Input file NOT found!" ValueError is raised inside the # error-logging try/except -- the same path a missing .rst takes. return False try: import tomllib as _toml except ImportError: import tomli as _toml with open(inputfile, 'rb') as infile: config = _toml.load(infile) task = str(config.get('simulation', {}).get('task', '')).lower() if task in _INFINITE_SWAPPING_TASKS: return True return 'runner' in config
[docs]def is_native_retis_config(inputfile, tasks=('retis',)): """Return True if the input is a native ``task = "retis"`` TOML. Used only by the opt-in native-via-coordinator route: it must NOT fire for inputs that already select the infinite-swapping sampler (those carry a ``[runner]`` section or an infinite-swapping task). Only ``.toml`` inputs are considered. ``tasks`` lets the caller widen the accepted native task set -- the ∞REPPTIS opt-in passes ``('retis', 'repptis')`` so a native ``task = "repptis"`` config can be routed through the scheduler for the native-vs-∞REPPTIS comparison, while the default keeps the route retis-only. Parameters ---------- inputfile : string Path to the input file. tasks : tuple of str The native task names that may take the opt-in scheduler route. Returns ------- boolean True for a native ``.toml`` (task in ``tasks``) that does not already select the coordinator. """ if os.path.splitext(inputfile)[1].lower() != '.toml': return False if not os.path.isfile(inputfile): return False if is_infinite_swapping_config(inputfile): return False try: import tomllib as _toml except ImportError: import tomli as _toml with open(inputfile, 'rb') as infile: config = _toml.load(infile) task = str(config.get('simulation', {}).get('task', '')).lower() return task in tasks
[docs]def scheduler_supported_features(config): """Return ``(supported, reason)`` for routing a native retis config. The infinite-swapping scheduler at ``n_workers = 1`` faithfully reproduces the native RETIS loop for the kick- (or restart-) initialised internal-engine RETIS family with the ``sh``/``wt``/``wf``/ ``ss`` shooting moves (``wf``/``ss`` via the WHAM ``Cxy/HA`` unweighting on the native-output route). Several native features are still scheduler PORT GAPS and must keep the in-process loop until they are ported, so this helper detects them and reports the first one found: - shooting moves other than ``sh``/``wt``/``wf``/``ss``. - the permeability ``mirror`` (``mirror_freq``) and ``target`` swap (``target_freq``) moves ARE ported (the scheduler ``mr``/``ts`` moves) and route here, but only at ``n_workers = 1`` (both persist a global order-function mutation on accept); a >1-worker config keeps the native loop. - an ``[initial-path] method`` of ``load`` (a pre-existing load folder): the scheduler route kick-initialises or resumes from its own ``restart.toml``, but does not honour a foreign load folder. - a ``[simulation] restart`` continuation. - a non-internal ``[engine]`` (GROMACS / LAMMPS / CP2K / OpenMM / ...): only the in-process pyretis MD integrators (:data:`_SCHEDULER_INTERNAL_ENGINES`) are validated through the scheduler; external-engine retis keeps the native loop. Parameters ---------- config : dict The parsed native TOML configuration. Returns ------- (boolean, string) ``(True, '')`` when the scheduler faithfully covers the config; ``(False, reason)`` naming the first unsupported feature. """ engine_class = str(config.get('engine', {}).get('class', '')) engine_norm = engine_class.lower().replace('-', '').replace('_', '') engine_norm = engine_norm.replace(' ', '') if engine_norm not in _SCHEDULER_INTERNAL_ENGINES: return False, f"non-internal engine '{engine_class}'" tis = config.get('tis', {}) moves = tis.get('shooting_moves') if moves is not None: # sh / wt / wf / ss route faithfully through the scheduler: wt and # the indicator-cv moves directly, wf and ss via the WHAM Cxy/HA # unweighting on the native-output route (Weight = compute_weight / # frac). Any other shooting move is still a scheduler port gap. supported_moves = {'sh', 'wt', 'wf', 'ss'} unsupported = sorted( {str(move).lower() for move in moves} - supported_moves) if unsupported: return False, ('unsupported shooting move(s): ' + ', '.join(unsupported)) if tis.get('mirror_freq') or tis.get('target_freq'): # Mirror and target swap are ported (scheduler ``mr``/``ts`` # moves) but PERSIST a global order-function mutation on accept # (the mirror flag / the target index), so they are only # consistent at a single worker. Resolve the worker count exactly # as native_compat does (env override, then a [runner] section, # then the default of 1) and keep the native loop if it would # be >1. resolved_workers = int(os.environ.get( 'PYRETIS_NATIVE_WORKERS', config.get('runner', {}).get('workers', 1))) if resolved_workers > 1: return False, ('mirror/target swap moves require n_workers=1 ' f'(got {resolved_workers})') init_method = str(config.get('initial-path', {}).get('method', 'kick')).lower() # ``restart`` resumes the scheduler from its own restart.toml (the # coordinator persists cstep / frac / RNG there); ``load`` (a # pre-existing load folder) is still a port gap. if init_method not in ('kick', '', 'restart'): return False, 'initial-path method = ' + init_method if config.get('simulation', {}).get('restart'): return False, 'simulation restart continuation' return True, ''
[docs]def is_native_scheduler_config(inputfile): """Return True if a native ``retis`` TOML routes to the scheduler. As of the Stage C collapse, native ``task = "retis"`` ``.toml`` inputs are run through the infinite-swapping scheduler at ``n_workers = 1`` (their config is translated by :func:`pyretis.inout.native_compat.native_to_infswap_config`). Inputs that already select the infinite-swapping sampler, and the native ``tis`` / ``explore`` / ``pptis`` / ``repptis`` tasks, are excluded -- they keep their existing routes (see :data:`_NATIVE_SCHEDULER_TASKS` for why plain TIS is not routed). A retis config that uses a scheduler PORT GAP (see :func:`scheduler_supported_features`) also keeps the native loop. Only ``.toml`` inputs are considered (a ``.rst`` input keeps the in-process loop). Parameters ---------- inputfile : string Path to the input file. Returns ------- boolean True for a native tis/retis ``.toml`` that should run through the scheduler. """ if os.path.splitext(inputfile)[1].lower() != '.toml': return False if not os.path.isfile(inputfile): return False if is_infinite_swapping_config(inputfile): return False try: import tomllib as _toml except ImportError: import tomli as _toml with open(inputfile, 'rb') as infile: config = _toml.load(infile) task = str(config.get('simulation', {}).get('task', '')).lower() if task not in _NATIVE_SCHEDULER_TASKS: return False supported, reason = scheduler_supported_features(config) if not supported: logger.info('Native retis config uses a scheduler port gap (%s); ' 'keeping the native in-process loop.', reason) return False return True
[docs]def force_native_loop(): """Return True if the legacy in-process loop is forced for tis/retis. Reads :data:`_NATIVE_LOOP_ENV` (``PYRETIS_NATIVE_LOOP``). When set to a truthy value, native ``tis`` / ``retis`` keep the in-process ``PathSimulation`` loop instead of routing to the scheduler -- a reversible escape hatch for debugging and native-vs-scheduler reference comparison. Returns ------- boolean True when the legacy in-process loop should be forced. """ value = os.environ.get(_NATIVE_LOOP_ENV, '') return value not in ('', '0')
[docs]def run_infinite_swapping(inputfile): """Run an infinite-swapping (replica-exchange) input via its scheduler. This is the programmatic entry for the infinite-swapping sampler (the scheduler + config loader), run from the current working directory. ``pyretisrun`` calls it for inputs that select infinite swapping, so it is the single way to drive that sampler. Parameters ---------- inputfile : string Path to the infinite-swapping input TOML. """ from pyretis.simulation.scheduler import scheduler from pyretis.simulation.setup import setup_config config = setup_config(inputfile) # setup_config returns None when there is nothing to run (e.g. a # restart whose cstep has already reached the target, or a missing # active trajectory). Exit cleanly rather than crashing scheduler(). if config is None: logger.progress('Nothing to run (restart already complete or no ' 'active path); exiting.') return scheduler(config)
# Opt-in environment flag: when set to a truthy value, a native # ``task = "retis"`` config is translated to the coordinator's config and # run through the infinite-swapping scheduler instead of the native # in-process loop. The DEFAULT (flag unset) path is unchanged. _NATIVE_VIA_INFSWAP_ENV = 'PYRETIS_NATIVE_VIA_INFSWAP'
[docs]def native_via_infswap_requested(): """Return True if the opt-in native-via-coordinator flag is set. Reads :data:`_NATIVE_VIA_INFSWAP_ENV`. The flag is considered set for any value other than the empty string and ``"0"`` so that ``PYRETIS_NATIVE_VIA_INFSWAP=1`` (the documented form) enables it. Returns ------- boolean True when the opt-in route should be taken. """ value = os.environ.get(_NATIVE_VIA_INFSWAP_ENV, '') return value not in ('', '0')
[docs]def _apply_restart_steps(restart_file, steps): """Set the step target in a scheduler ``restart.toml`` in place. On a continuation the native config carries the new (usually higher) total step target. The scheduler stops once ``cstep`` reaches ``[simulation] steps``, so a resumed restart must adopt the new target or it would have nothing left to run. This reads, updates and rewrites the restart TOML, leaving every other key (the persisted ``current`` state -- RNG, frac, active paths -- and all settings) untouched. Parameters ---------- restart_file : string Path to the scheduler ``restart.toml``. steps : int The new total step target to write into ``[simulation] steps``. """ import tomli import tomli_w with open(restart_file, 'rb') as handle: restart = tomli.load(handle) restart.setdefault('simulation', {})['steps'] = int(steps) with open(restart_file, 'wb') as handle: tomli_w.dump(restart, handle)
[docs]def run_native_via_infswap(inputfile, runpath): """Run a native RETIS config through the infinite-swapping coordinator. This is the opt-in compatibility route. It translates the native configuration to the coordinator's config dictionary (:func:`pyretis.inout.native_compat.native_to_infswap_config`), generates the coordinator's ``load_dir`` from the native kick initiation when requested (:func:`pyretis.inout.native_compat.generate_load_dir_via_kick`), writes the translated ``infswap.toml`` and hands it to the unchanged scheduler via :func:`run_infinite_swapping`. Parameters ---------- inputfile : string Path to the native RETIS input TOML. runpath : string The directory the simulation runs from (where ``load`` and the translated ``infswap.toml`` are written). """ import tomli_w from pyretis.inout.settings import parse_settings_file from pyretis.inout.native_compat import ( native_to_infswap_config, generate_load_dir_via_kick, ) native_config = parse_settings_file(inputfile) config = native_to_infswap_config(native_config) method = str( native_config.get('initial-path', {}).get('method', 'kick') ).lower() # --- Continuation / fresh-start handling -------------------------- # The scheduler persists its full state (cstep, frac, per-worker RNG) # in ``./restart.toml``. A native config requesting a restart # continuation therefore RESUMES from that file rather than # re-translating a fresh infswap.toml -- which the inf setup would # reject, since it forbids a (differently named) run file alongside a # restart.toml. setup_config reads ``restart.toml`` as both the input # and the restart file, so ``inp == re_inp`` and the guard passes; # exe_dir == cwd (see main), so the bare name resolves in the run dir. restart_file = os.path.join(runpath, 'restart.toml') if method == 'restart' and os.path.isfile(restart_file): # Carry the native step target into the restart so a continuation # to a higher cycle count actually advances (the scheduler stops # once cstep reaches steps). _apply_restart_steps(restart_file, config['simulation']['steps']) logger.progress('Resuming the scheduler from %s.', restart_file) run_infinite_swapping('restart.toml') return if method == 'restart': logger.progress('initial-path method=restart but no %s found; ' 'starting fresh via kick.', restart_file) method = 'kick' # Fresh start: discard any stale restart.toml left by an earlier # scheduler run so the inf setup guard does not reject the freshly # translated infswap.toml (this route decides kick-vs-resume here, not # through that guard). if os.path.isfile(restart_file): logger.progress('Discarding stale %s for a fresh scheduler run.', restart_file) os.remove(restart_file) if method == 'kick': logger.progress('Generating initial paths via native kick ' 'initiation for the coordinator load directory.') generate_load_dir_via_kick(native_config, runpath) translated = os.path.join(runpath, 'infswap.toml') with open(translated, 'wb') as handle: tomli_w.dump(config, handle) logger.progress('Translated native config written to: %s', translated) run_infinite_swapping(translated)
[docs]def set_up_simulation(inputfile, runpath): """Run all the needed generic set-up. Parameters ---------- inputfile : string The input file which defines the simulation. runpath : string The base path we are running the simulation from. Returns ------- runner : method A method which can be used to execute the simulation. sim : object like :py:class:`.Simulation` The simulation defined by the input file. syst : object like :py:class:`.System` The system created. sim_settings : dict The input settings read from the input file. """ if not os.path.isfile(inputfile): raise ValueError(f'Input file "{inputfile}" NOT found!') logger.progress('Reading input settings from: %s', inputfile) logger.progress('Setting up simulation') sim_settings = parse_settings_file(inputfile) # NB this is not transmitted to the ensembles sim_settings['simulation']['exe_path'] = runpath sim_settings['engine']['exe_path'] = runpath for ens in sim_settings.get('ensemble', []): ens['simulation']['exe_path'] = runpath ens['engine']['exe_path'] = runpath logtxt = 'Set up and create simulation.' logger.info(logtxt) sim = create_simulation(sim_settings) task = sim_settings['simulation']['task'].lower() logger.progress('Setup for simulation "%s" is done.', task) runner = _RUNNERS.get(task, run_generic_simulation) return runner, sim, sim_settings
[docs]def store_simulation_settings(settings, indir, backup, ext='.rst'): """Store the parsed input settings. Parameters ---------- settings : dict The simulation settings. indir : string The directory which contains the input script. backup : boolean If True, an existing settings file will be backed up. ext : string Extension for the regenerated settings dump. Matches the input file extension, so a ``.toml`` run writes ``out.toml`` and a ``.rst`` run writes ``out.rst``. Defaults to ``.rst`` for backwards compatibility. """ out_file = os.path.join(indir, f'out{ext}') logger.info('Full simulation settings written to: %s', out_file) write_settings_file(settings, out_file, backup=backup)
[docs]def remove_exit_file(exit_file): """Remove the EXIT file after a completed soft exit.""" try: os.remove(exit_file) except FileNotFoundError: return except OSError as error: logger.warning('Could not remove EXIT file "%s": %s', exit_file, error) else: logger.info('Removed EXIT file: %s', exit_file)
[docs]def soft_exit_ignore(turn_keyboard_interruption_off=True, exe_dir=None): """Manage the KeyboardInterrupt exception. Parameters ---------- turn_keyboard_interruption_off : boolean If True, instead of regular exiting from the program, the file 'EXIT' is created to stop the PyRETIS. exe_dir : string, optional The path where EXIT file is expected. """ def soft_exit_handler(signum, frame): # pragma: no cover """Handle with a keyboard interruption signal.""" # pylint: disable=unused-argument logger.progress('Attempting soft exit - terminating soon...') pathlib.Path(os.path.join(exe_dir, 'EXIT')).touch(exist_ok=True) if turn_keyboard_interruption_off: return signal.signal(signal.SIGINT, soft_exit_handler) return signal.signal(signal.SIGINT, signal.default_int_handler)
[docs]def _report_execution_error(error, log_level): """Log a stopped execution and write its traceback to the log only. Shared by both CLI flows (the native in-process simulation and the infinite-swapping scheduler) so a failure is reported the same way regardless of which path ran. The friendly one-line message goes to the screen; the full traceback goes to the log file only. Parameters ---------- error : Exception The exception currently being handled. log_level : integer The active log level. At ``DEBUG`` or below the caller should re-raise so the traceback also reaches the screen. Returns ------- reraise : boolean ``True`` when the caller should re-raise (debug mode), so the error is never silently swallowed. """ logger.error('"%s: %s".', error.__class__.__name__, error.args) logger.error('ERROR - execution stopped.') logger.error( 'Please see the LOG for the error message and traceback.' ) # Print the traceback to the log-file, but not to the screen. screen = logger.handlers[0] lvl = screen.level screen.setLevel(logging.CRITICAL + 1) logger.error(traceback.format_exc()) screen.setLevel(lvl) return log_level <= logging.DEBUG
[docs]def main(infile, indir, exe_dir, progress, log_level): """Execute PyRETIS. Parameters ---------- infile : string The input file to open with settings for PyRETIS. indir : string The folder containing the settings file. exe_dir : string The directory we are working from. progress : boolean Determines if we should use a progress bar or not. log_level : integer Determines if we should display the error traceback or not. """ simulation = None settings = {} exit_status = 0 # Stage C: native ``task = "tis"`` / ``"retis"`` TOML inputs run # through the infinite-swapping scheduler at n_workers=1 by default # (their config is translated by native_to_infswap_config). The # legacy in-process loop can be forced with PYRETIS_NATIVE_LOOP=1, # and the older PYRETIS_NATIVE_VIA_INFSWAP=1 opt-in still selects this # route for a native ``retis`` config (now a no-op since it is the # default). explore / pptis / repptis keep the in-process loop below. route_to_scheduler = ( is_native_scheduler_config(infile) and not force_native_loop() ) or (native_via_infswap_requested() and is_native_retis_config(infile, tasks=('retis', 'repptis'))) if route_to_scheduler: logger.progress('Routing the native TIS/RETIS config through the ' 'infinite-swapping scheduler at n_workers=1.') def _sigterm_handler(signum, frame): # pragma: no cover # pylint: disable=unused-argument # One-shot: ignore any further SIGTERM during the unwind / # cleanup below, so a second signal (``timeout`` can SIGTERM a # routed run again while it logs "stopped cleanly" or tears the # pool down) cannot re-enter and raise a second # KeyboardInterrupt that escapes the except handler as an # uncaught traceback. The finally clause restores old_sigterm. signal.signal(signal.SIGTERM, signal.SIG_IGN) raise KeyboardInterrupt old_sigterm = signal.signal(signal.SIGTERM, _sigterm_handler) try: run_native_via_infswap(infile, exe_dir) except KeyboardInterrupt: logger.progress('Termination signal received; ' 'stopped the scheduler cleanly.') bye_bye_world() return 130 except Exception as error: # pylint: disable=broad-exception-caught if _report_execution_error(error, log_level): raise bye_bye_world() return 1 finally: signal.signal(signal.SIGTERM, old_sigterm) logger.success('\nSimulation done\n') bye_bye_world() return 0 # Single-entry-point dispatch: an infinite-swapping input is run by # its own scheduler (replica-exchange worker pool), not the # in-process simulation flow below. if is_infinite_swapping_config(infile): logger.progress('Input selects the infinite-swapping sampler; ' 'running the replica-exchange scheduler.') # HPC schedulers (e.g. SLURM at walltime), ``kill`` and ``scancel`` # send SIGTERM, whose default action terminates the interpreter # without running atexit/finally handlers -- orphaning the worker # pool (and any live engine subprocesses). Convert SIGTERM into the # same clean unwind as Ctrl-C so the pool is released. restart.toml # is written atomically (os.replace), so interrupting a run cannot # corrupt the restart state. def _sigterm_handler(signum, frame): # pragma: no cover # pylint: disable=unused-argument # One-shot: ignore any further SIGTERM during the unwind / # cleanup below, so a second signal (``timeout`` can SIGTERM a # routed run again while it logs "stopped cleanly" or tears the # pool down) cannot re-enter and raise a second # KeyboardInterrupt that escapes the except handler as an # uncaught traceback. The finally clause restores old_sigterm. signal.signal(signal.SIGTERM, signal.SIG_IGN) raise KeyboardInterrupt old_sigterm = signal.signal(signal.SIGTERM, _sigterm_handler) try: run_infinite_swapping(infile) except KeyboardInterrupt: logger.progress('Termination signal received; ' 'stopped the scheduler cleanly.') bye_bye_world() return 130 except Exception as error: # pylint: disable=broad-exception-caught if _report_execution_error(error, log_level): raise bye_bye_world() return 1 finally: signal.signal(signal.SIGTERM, old_sigterm) logger.success('\nSimulation done\n') bye_bye_world() return 0 exit_file = os.path.join(exe_dir, 'EXIT') if os.path.isfile(exit_file): logger.progress( 'Exit file found - Remove it before executing PyRETIS.') logger.error('* %s file found *', exit_file) logger.error('Remove the file to execute PyRETIS') bye_bye_world() return 1 in_ext = os.path.splitext(infile)[1].lower() or '.rst' try: run, simulation, settings = set_up_simulation(infile, exe_dir) store_simulation_settings(settings, indir, True, ext=in_ext) # Run the simulation: soft_exit_ignore(turn_keyboard_interruption_off=True, exe_dir=exe_dir) run(simulation, settings, progress=progress) soft_exit_ignore(turn_keyboard_interruption_off=False, exe_dir=exe_dir) except Exception as error: # pylint: disable=broad-exception-caught exit_status = 1 if _report_execution_error(error, log_level): raise finally: # Write out the simulation settings as they were parsed and # add some additional info: if simulation is not None: end = getattr(simulation, 'cycle', {'step': None})['step'] if end is not None: settings['simulation']['endcycle'] = end logtxt = f'Execution ended at step {end}' logger.progress(logtxt) logger.success('\nSimulation done\n') store_simulation_settings(settings, indir, False, ext=in_ext) if exit_status == 0: remove_exit_file(exit_file) bye_bye_world() return exit_status
[docs]def entry_point(): # pragma: no cover """entry_point - The entry point for the pip install of pyretisrun.""" colorama.init(autoreset=True) parser = argparse.ArgumentParser(description=PROGRAM_NAME) parser.add_argument('-i', '--input', help=f'Location of {PROGRAM_NAME} input file', required=True) parser.add_argument('-V', '--version', action='version', version=f'{PROGRAM_NAME} {VERSION}') parser.add_argument('-f', '--log_file', help='Specify log file to write', required=False, default=f'{PROGRAM_NAME.lower()}.log') parser.add_argument('-l', '--log_level', help='Specify log level for log file', required=False, default='INFO') parser.add_argument('-p', '--progress', action='store_true', help=('Display a progress meter instead of text ' 'output for the simulation')) args_dict = vars(parser.parse_args()) input_file = args_dict['input'] # Store directories: cwd_dir = os.getcwd() input_dir = os.path.dirname(input_file) if not os.path.isdir(input_dir): input_dir = os.getcwd() # Define a file logger: create_backup(args_dict['log_file']) fileh = logging.FileHandler(args_dict['log_file'], mode='a') log_levl = getattr(logging, args_dict['log_level'].upper(), logging.INFO) fileh.setLevel(log_levl) fileh.setFormatter(get_log_formatter(log_levl)) logger.addHandler(fileh) # Here, we just check the python version. PyRETIS should anyway # fail before this for python2. check_python_version() hello_world(input_file, cwd_dir, args_dict['log_file']) sys.exit(main(input_file, input_dir, cwd_dir, args_dict['progress'], log_levl))
if __name__ == '__main__': # pragma: no cover entry_point()