#!/usr/bin/env python3
# Copyright (c) 2026, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""pyretisrun - An application for running PyRETIS simulations.
This script is a part of the PyRETIS library and can be used for
running simulations from an input script.
usage: pyretisrun.py [-h] -i INPUT [-V] [-f LOG_FILE] [-l LOG_LEVEL] [-p]
PyRETIS
optional arguments:
-h, --help show this help message and exit
-i INPUT, --input INPUT
Location of PyRETIS input file
-V, --version show program's version number and exit
-f LOG_FILE, --log_file LOG_FILE
Specify log file to write
-l LOG_LEVEL, --log_level LOG_LEVEL
Specify log level for log file
-p, --progress Display a progress meter instead of text output for
the simulation
More information about running PyRETIS can be found at: www.pyretis.org
"""
# pylint: disable=invalid-name
import argparse
import datetime
import logging
import os
import pathlib
import signal
import sys
import traceback
import warnings
# Surface PyRETIS-owned DeprecationWarnings (e.g. the rst input
# deprecation) to CLI users. Python hides DeprecationWarning by
# default unless it comes from __main__; this filter scopes the
# override to pyretis modules only, so warnings from other libraries
# stay at the user's default behaviour.
warnings.filterwarnings('default', category=DeprecationWarning,
module=r'pyretis(\..*)?')
# Other libraries:
import tqdm # noqa: E402 For a progress bar
import colorama # noqa: E402 For coloring text
# PyRETIS library imports:
from pyretis import __version__ as VERSION # noqa: E402
from pyretis.info import PROGRAM_NAME, URL, CITE, LOGO # noqa: E402
from pyretis.core.pathensemble import generate_ensemble_name # noqa: E402
from pyretis.setup import create_simulation # noqa: E402
from pyretis.inout.screen import REFERENCE # noqa: E402
from pyretis.inout.common import ( # noqa: E402
check_python_version,
create_backup,
)
from pyretis.inout.formats.formatter import ( # noqa: E402
get_log_formatter,
setup_console_logging,
)
from pyretis.inout.settings import ( # noqa: E402
parse_settings_file,
write_settings_file
)
_DATE_FMT = '%d.%m.%Y %H:%M:%S'
logger = setup_console_logging()
[docs]def use_tqdm(progress):
"""Return a progress bar if we want one.
Parameters
----------
progress : boolean
If True, we should use a progress bar, otherwise not.
Returns
-------
out : object like :py:class:`tqdm.tqdm`
The progress bar, if requested. Otherwise, just a dummy
iterator.
"""
if progress:
pbar = tqdm.tqdm
else:
def empty_tqdm(*args, **kwargs):
"""Return an iterator to replace tqdm."""
if args:
return args[0]
return kwargs.get('iterable', None)
pbar = empty_tqdm
return pbar
[docs]def hello_world(infile, rundir, logfile):
"""Print out a politically correct greeting for PyRETIS.
Parameters
----------
infile : string
String showing the location of the input file.
rundir : string
String showing the location we are running in.
logfile : string
The output log file
"""
timestart = datetime.datetime.now().strftime(_DATE_FMT)
pyversion = sys.version.split()[0]
logger.banner('\n'.join([LOGO]))
logger.banner('%s version: %s', PROGRAM_NAME, VERSION)
logger.banner('Start of execution: %s', timestart)
logger.banner('Python version: %s', pyversion)
logger.progress('\nRunning in directory: %s', rundir)
logger.progress('Input file: %s', infile)
logger.progress('Log file: %s', logfile)
[docs]def bye_bye_world():
"""Print out the goodbye message for PyRETIS."""
timeend = datetime.datetime.now().strftime(_DATE_FMT)
msgtxt = f'End of {PROGRAM_NAME} execution: {timeend}'
logger.progress(msgtxt)
# display some references:
references = [f'{PROGRAM_NAME} references:']
references.append(('-')*len(references[0]))
for line in CITE.split('\n'):
if line:
references.append(line)
reftxt = '\n'.join(references)
logger.log(REFERENCE, '\n%s', reftxt)
urltxt = str(URL)
logger.log(REFERENCE, urltxt)
[docs]def run_md_flux_simulation(sim, sim_settings, progress=False):
"""Run a MD-FLUX simulation.
Parameters
----------
sim : object like :py:class:`.Simulation`
This is the simulation to run.
sim_settings : dict
The simulation settings.
progress : boolean, optional
If True, we will display a progress bar, otherwise, we print
results to the screen.
"""
logger.progress('Starting MD-Flux simulation')
tqd = use_tqdm(progress)
sim.engine.exe_dir = sim_settings['simulation']['exe_path']
sim.set_up_output(sim_settings, progress=progress)
for _ in tqd(sim.run(), initial=sim.cycle['startcycle'],
total=sim.cycle['endcycle'], desc='MD-flux'):
pass
# Write final restart file:
sim.write_restart(now=True)
return True
[docs]def run_md_simulation(sim, sim_settings, progress=False):
"""Run a MD simulation.
Parameters
----------
sim : object like :py:class:`.Simulation`
This is the simulation to run.
sim_settings : dict
The simulation settings.
progress : boolean, optional
If True, we will display a progress bar, otherwise, we print
results to the screen.
"""
logger.progress('Starting MD simulation')
tqd = use_tqdm(progress)
sim.engine.exe_dir = sim_settings['simulation']['exe_path']
sim.set_up_output(sim_settings, progress=progress)
for _ in tqd(sim.run(), initial=sim.cycle['startcycle'],
total=sim.cycle['endcycle'], desc='MD step'):
pass
# Write final restart file:
sim.write_restart(now=True)
return True
[docs]def explore_simulation(sim, sim_settings, progress=False):
"""Run a RETIS simulation with PyRETIS.
Parameters
----------
sim : object like :py:class:`.Simulation`
This is the simulation to run.
sim_settings : dict
The simulation settings.
progress : boolean, optional
If True, we will display a progress bar, otherwise, we print
results to the screen.
"""
sim.set_up_output(
sim_settings,
progress=progress
)
logtxt = 'Load frames for free energy landscape exploration'
logger.progress(logtxt)
# Make sure that the settings are correct. No users don't know better.
for s_ens in sim_settings.get('ensemble', []):
s_ens['tis']['freq'] = 0
s_ens['tis']['allowmaxlength'] = True
# Here we do the initialisation:
if not sim.initiate(sim_settings):
logger.progress('Initiation stopped, will exit now.')
return False
sim.write_restart(now=True)
logtxt = 'Initiation done. Exploring now.'
logger.progress(logtxt)
tqd = use_tqdm(progress)
desc = f'{sim_settings["simulation"]["task"]} Simulation'
for _ in tqd(sim.run(), initial=sim.cycle['startcycle'],
total=sim.cycle['endcycle'], desc=desc):
pass
# Write final restart files:
sim.write_restart(now=True)
return True
[docs]def run_path_simulation(sim, sim_settings, progress=False):
"""Run a RETIS simulation with PyRETIS.
Parameters
----------
sim : object like :py:class:`.Simulation`
This is the simulation to run.
sim_settings : dict
The simulation settings.
progress : boolean, optional
If True, we will display a progress bar, otherwise, we print
results to the screen.
"""
sim.set_up_output(
sim_settings,
progress=progress
)
task = sim_settings['simulation']['task']
sep = '=' * (len(task) + 26)
logtxt = f'\n{sep}\n Initialising {task} simulation.\n{sep}'
logger.banner(logtxt)
logger.progress('\nInitialising path ensembles:')
# Here we do the initialisation:
if not sim.initiate(sim_settings):
logger.progress('Initiation stopped, will exit now.')
return False
sim.write_restart(now=True)
logger.progress('\nInitiation done.')
sep = '=' * (len(task) + 22)
logtxt = f'\n{sep}\n Starting {task} simulation.\n{sep}'
logger.banner(logtxt)
tqd = use_tqdm(progress)
desc = f"{sim_settings['simulation']['task']} Simulation. "
for _ in tqd(sim.run(), initial=sim.cycle['startcycle'],
total=sim.cycle['endcycle'], desc=desc):
pass
# Write final restart files:
sim.write_restart(now=True)
return True
[docs]def make_tis_files(_, settings, progress=False):
"""Create TIS simulations input files PyRETIS.
It just writes out input files for single TIS simulations and
exit without running a simulation.
Parameters
----------
settings : list of dicts or Simulation objects
The settings for the simulations.
"""
_ = progress
logtxt = 'Input settings requests: TIS for multiple path ensembles.'
logger.progress(logtxt)
logtxt = 'Will create input files for the TIS simulations and exit'
logger.progress(logtxt)
i_ens = 0
for i, ens_settings in enumerate(settings['ensemble']):
i_ens += 1
if i == 0 and not settings['simulation']['zero_ensemble']:
i_ens += 1
ens_settings['simulation']['zero_ensemble'] = False
ens_settings['simulation']['task'] = 'tis'
ensf = generate_ensemble_name(i_ens)
logtxt = f'Creating input for TIS ensemble: {i_ens} '
logger.progress(logtxt)
infile = f'tis-{ensf}.rst'
logtxt = f'Create file: "{infile}"'
logger.progress(logtxt)
exe_dir_file = os.path.join(ens_settings['engine']['exe_path'], infile)
# Ensure generated TIS input files include a human-friendly
# orderparameter name without mutating the caller's settings.
op_section = ens_settings.get('orderparameter', {})
need_restore = False
had_name_key = 'name' in op_section
original_name = op_section.get('name') if had_name_key else None
if not had_name_key or op_section.get('name') is None:
ens_settings.setdefault('orderparameter', {})
ens_settings['orderparameter']['name'] = 'Order Parameter'
need_restore = True
try:
write_settings_file(ens_settings, exe_dir_file, backup=False)
finally:
if need_restore:
if had_name_key:
ens_settings['orderparameter']['name'] = original_name
else:
# remove the temporary key we added
ens_settings['orderparameter'].pop('name', None)
logtxt = 'Command for executing:'
logger.progress(logtxt)
logtxt = f'pyretisrun -i {infile} -p -f {ensf}.log'
logger.progress(logtxt)
return True
[docs]def run_generic_simulation(sim, sim_settings, progress=False):
"""Run a generic PyRETIS simulation.
These are simulations that are just going to complete a given
number of steps. Other simulation may consist of several
simulations tied together and these are NOT handled here.
Parameters
----------
sim : object like :py:class:`.Simulation`
This is the simulation to run.
sim_settings : dict
The simulation settings.
progress : boolean, optional
If True, we will display a progress bar, otherwise, we print
results to the screen.
"""
logtxt = 'Running simulation'
logger.progress(logtxt)
tqd = use_tqdm(progress)
sim.set_up_output(sim_settings, progress=progress)
for _ in tqd(sim.run(), desc='Step'):
pass
# Write final restart file:
sim.write_restart(now=True)
return True
_RUNNERS = {'md-flux': run_md_flux_simulation,
'md-nve': run_md_simulation,
'explore': explore_simulation,
'md': run_md_simulation,
'make-tis-files': make_tis_files,
'tis': run_path_simulation,
'retis': run_path_simulation,
'pptis': run_path_simulation,
'repptis': run_path_simulation}
# The input-TOML task values that select the infinite-swapping
# (replica-exchange) sampler instead of the in-process simulation flow.
_INFINITE_SWAPPING_TASKS = frozenset(
{'infinite_swapping', 'infswap', 'infretis'}
)
# Native path-sampling tasks that, as of the Stage C collapse, are run
# through the infinite-swapping scheduler at n_workers=1 by default
# (their native config is translated by native_to_infswap_config). Only
# ``retis`` is routed: the infinite-swapping scheduler is a RETIS-family
# replica-exchange sampler (it always builds the [0^-]/[0^+]/[i^+]
# structure), so it subsumes RETIS one-to-one. Plain ``tis`` does NOT
# route -- a TIS config can be a single [i^+] ensemble with no [0^-] and
# nothing to exchange, so it has no faithful scheduler analog; it keeps
# the in-process loop, together with explore / pptis / repptis. Set
# PYRETIS_NATIVE_LOOP=1 to force the legacy in-process loop for retis too
# (debugging / native-vs-scheduler reference comparison).
_NATIVE_SCHEDULER_TASKS = frozenset({'retis'})
_NATIVE_LOOP_ENV = 'PYRETIS_NATIVE_LOOP'
# Engine classes the scheduler route is VALIDATED against: the pyretis
# in-process MD integrators (pyretis.engines.internal -- engine_type
# 'internal'). The scheduler can in principle drive the external engines
# (GROMACS / LAMMPS / CP2K / OpenMM) too -- that is what the inf flow was
# built for -- but the scheduler+external integration is NOT yet verified
# for the native route: routing external-engine task=retis crashed the
# GROMACS/LAMMPS/OpenMM suites in test-heavy. Until that path is validated
# (and its heavy, supervised goldens regenerated), external-engine retis
# keeps the native in-process loop. Names are matched case-insensitively
# with separators stripped (so "velocity-verlet" -> "velocityverlet").
_SCHEDULER_INTERNAL_ENGINES = frozenset({
'langevin', 'verlet', 'velocityverlet', 'randomwalk',
})
[docs]def is_infinite_swapping_config(inputfile):
"""Return True if the input selects the infinite-swapping sampler.
The infinite-swapping (replica-exchange) sampler is selected either
explicitly via ``[simulation] task = "infinite_swapping"`` or
implicitly by the presence of the ``[runner]`` worker-pool section
that only that sampler's input schema uses. Only ``.toml`` inputs are
considered; the in-process simulation flow handles everything else.
Parameters
----------
inputfile : string
Path to the input file.
Returns
-------
boolean
True if the infinite-swapping scheduler should run this input.
"""
if os.path.splitext(inputfile)[1].lower() != '.toml':
return False
if not os.path.isfile(inputfile):
# main() calls this before set_up_simulation, and the open() below
# would raise a bare FileNotFoundError on a missing .toml. We
# cannot inspect a file that is not there anyway, so return False:
# main() then routes to set_up_simulation, whose descriptive
# "Input file NOT found!" ValueError is raised inside the
# error-logging try/except -- the same path a missing .rst takes.
return False
try:
import tomllib as _toml
except ImportError:
import tomli as _toml
with open(inputfile, 'rb') as infile:
config = _toml.load(infile)
task = str(config.get('simulation', {}).get('task', '')).lower()
if task in _INFINITE_SWAPPING_TASKS:
return True
return 'runner' in config
[docs]def is_native_retis_config(inputfile, tasks=('retis',)):
"""Return True if the input is a native ``task = "retis"`` TOML.
Used only by the opt-in native-via-coordinator route: it must NOT
fire for inputs that already select the infinite-swapping sampler
(those carry a ``[runner]`` section or an infinite-swapping task).
Only ``.toml`` inputs are considered. ``tasks`` lets the caller widen
the accepted native task set -- the ∞REPPTIS opt-in passes
``('retis', 'repptis')`` so a native ``task = "repptis"`` config can be
routed through the scheduler for the native-vs-∞REPPTIS comparison,
while the default keeps the route retis-only.
Parameters
----------
inputfile : string
Path to the input file.
tasks : tuple of str
The native task names that may take the opt-in scheduler route.
Returns
-------
boolean
True for a native ``.toml`` (task in ``tasks``) that does not
already select the coordinator.
"""
if os.path.splitext(inputfile)[1].lower() != '.toml':
return False
if not os.path.isfile(inputfile):
return False
if is_infinite_swapping_config(inputfile):
return False
try:
import tomllib as _toml
except ImportError:
import tomli as _toml
with open(inputfile, 'rb') as infile:
config = _toml.load(infile)
task = str(config.get('simulation', {}).get('task', '')).lower()
return task in tasks
[docs]def scheduler_supported_features(config):
"""Return ``(supported, reason)`` for routing a native retis config.
The infinite-swapping scheduler at ``n_workers = 1`` faithfully
reproduces the native RETIS loop for the kick- (or restart-)
initialised internal-engine RETIS family with the ``sh``/``wt``/``wf``/
``ss`` shooting moves (``wf``/``ss`` via the WHAM ``Cxy/HA`` unweighting
on the native-output route). Several native features are still
scheduler PORT GAPS and must keep the in-process loop until they are
ported, so this helper detects them and reports the first one found:
- shooting moves other than ``sh``/``wt``/``wf``/``ss``.
- the permeability ``mirror`` (``mirror_freq``) and ``target`` swap
(``target_freq``) moves ARE ported (the scheduler ``mr``/``ts``
moves) and route here, but only at ``n_workers = 1`` (both persist
a global order-function mutation on accept); a >1-worker config
keeps the native loop.
- an ``[initial-path] method`` of ``load`` (a pre-existing load
folder): the scheduler route kick-initialises or resumes from its
own ``restart.toml``, but does not honour a foreign load folder.
- a ``[simulation] restart`` continuation.
- a non-internal ``[engine]`` (GROMACS / LAMMPS / CP2K / OpenMM /
...): only the in-process pyretis MD integrators
(:data:`_SCHEDULER_INTERNAL_ENGINES`) are validated through the
scheduler; external-engine retis keeps the native loop.
Parameters
----------
config : dict
The parsed native TOML configuration.
Returns
-------
(boolean, string)
``(True, '')`` when the scheduler faithfully covers the config;
``(False, reason)`` naming the first unsupported feature.
"""
engine_class = str(config.get('engine', {}).get('class', ''))
engine_norm = engine_class.lower().replace('-', '').replace('_', '')
engine_norm = engine_norm.replace(' ', '')
if engine_norm not in _SCHEDULER_INTERNAL_ENGINES:
return False, f"non-internal engine '{engine_class}'"
tis = config.get('tis', {})
moves = tis.get('shooting_moves')
if moves is not None:
# sh / wt / wf / ss route faithfully through the scheduler: wt and
# the indicator-cv moves directly, wf and ss via the WHAM Cxy/HA
# unweighting on the native-output route (Weight = compute_weight /
# frac). Any other shooting move is still a scheduler port gap.
supported_moves = {'sh', 'wt', 'wf', 'ss'}
unsupported = sorted(
{str(move).lower() for move in moves} - supported_moves)
if unsupported:
return False, ('unsupported shooting move(s): '
+ ', '.join(unsupported))
if tis.get('mirror_freq') or tis.get('target_freq'):
# Mirror and target swap are ported (scheduler ``mr``/``ts``
# moves) but PERSIST a global order-function mutation on accept
# (the mirror flag / the target index), so they are only
# consistent at a single worker. Resolve the worker count exactly
# as native_compat does (env override, then a [runner] section,
# then the default of 1) and keep the native loop if it would
# be >1.
resolved_workers = int(os.environ.get(
'PYRETIS_NATIVE_WORKERS',
config.get('runner', {}).get('workers', 1)))
if resolved_workers > 1:
return False, ('mirror/target swap moves require n_workers=1 '
f'(got {resolved_workers})')
init_method = str(config.get('initial-path', {}).get('method',
'kick')).lower()
# ``restart`` resumes the scheduler from its own restart.toml (the
# coordinator persists cstep / frac / RNG there); ``load`` (a
# pre-existing load folder) is still a port gap.
if init_method not in ('kick', '', 'restart'):
return False, 'initial-path method = ' + init_method
if config.get('simulation', {}).get('restart'):
return False, 'simulation restart continuation'
return True, ''
[docs]def is_native_scheduler_config(inputfile):
"""Return True if a native ``retis`` TOML routes to the scheduler.
As of the Stage C collapse, native ``task = "retis"`` ``.toml``
inputs are run through the infinite-swapping scheduler at
``n_workers = 1`` (their config is translated by
:func:`pyretis.inout.native_compat.native_to_infswap_config`). Inputs
that already select the infinite-swapping sampler, and the native
``tis`` / ``explore`` / ``pptis`` / ``repptis`` tasks, are excluded --
they keep their existing routes (see :data:`_NATIVE_SCHEDULER_TASKS`
for why plain TIS is not routed). A retis config that uses a scheduler
PORT GAP (see :func:`scheduler_supported_features`) also keeps the
native loop. Only ``.toml`` inputs are considered (a ``.rst`` input
keeps the in-process loop).
Parameters
----------
inputfile : string
Path to the input file.
Returns
-------
boolean
True for a native tis/retis ``.toml`` that should run through the
scheduler.
"""
if os.path.splitext(inputfile)[1].lower() != '.toml':
return False
if not os.path.isfile(inputfile):
return False
if is_infinite_swapping_config(inputfile):
return False
try:
import tomllib as _toml
except ImportError:
import tomli as _toml
with open(inputfile, 'rb') as infile:
config = _toml.load(infile)
task = str(config.get('simulation', {}).get('task', '')).lower()
if task not in _NATIVE_SCHEDULER_TASKS:
return False
supported, reason = scheduler_supported_features(config)
if not supported:
logger.info('Native retis config uses a scheduler port gap (%s); '
'keeping the native in-process loop.', reason)
return False
return True
[docs]def force_native_loop():
"""Return True if the legacy in-process loop is forced for tis/retis.
Reads :data:`_NATIVE_LOOP_ENV` (``PYRETIS_NATIVE_LOOP``). When set to
a truthy value, native ``tis`` / ``retis`` keep the in-process
``PathSimulation`` loop instead of routing to the scheduler -- a
reversible escape hatch for debugging and native-vs-scheduler
reference comparison.
Returns
-------
boolean
True when the legacy in-process loop should be forced.
"""
value = os.environ.get(_NATIVE_LOOP_ENV, '')
return value not in ('', '0')
[docs]def run_infinite_swapping(inputfile):
"""Run an infinite-swapping (replica-exchange) input via its scheduler.
This is the programmatic entry for the infinite-swapping sampler (the
scheduler + config loader), run from the current working directory.
``pyretisrun`` calls it for inputs that select infinite swapping, so
it is the single way to drive that sampler.
Parameters
----------
inputfile : string
Path to the infinite-swapping input TOML.
"""
from pyretis.simulation.scheduler import scheduler
from pyretis.simulation.setup import setup_config
config = setup_config(inputfile)
# setup_config returns None when there is nothing to run (e.g. a
# restart whose cstep has already reached the target, or a missing
# active trajectory). Exit cleanly rather than crashing scheduler().
if config is None:
logger.progress('Nothing to run (restart already complete or no '
'active path); exiting.')
return
scheduler(config)
# Opt-in environment flag: when set to a truthy value, a native
# ``task = "retis"`` config is translated to the coordinator's config and
# run through the infinite-swapping scheduler instead of the native
# in-process loop. The DEFAULT (flag unset) path is unchanged.
_NATIVE_VIA_INFSWAP_ENV = 'PYRETIS_NATIVE_VIA_INFSWAP'
[docs]def native_via_infswap_requested():
"""Return True if the opt-in native-via-coordinator flag is set.
Reads :data:`_NATIVE_VIA_INFSWAP_ENV`. The flag is considered set
for any value other than the empty string and ``"0"`` so that
``PYRETIS_NATIVE_VIA_INFSWAP=1`` (the documented form) enables it.
Returns
-------
boolean
True when the opt-in route should be taken.
"""
value = os.environ.get(_NATIVE_VIA_INFSWAP_ENV, '')
return value not in ('', '0')
[docs]def _apply_restart_steps(restart_file, steps):
"""Set the step target in a scheduler ``restart.toml`` in place.
On a continuation the native config carries the new (usually higher)
total step target. The scheduler stops once ``cstep`` reaches
``[simulation] steps``, so a resumed restart must adopt the new target
or it would have nothing left to run. This reads, updates and rewrites
the restart TOML, leaving every other key (the persisted ``current``
state -- RNG, frac, active paths -- and all settings) untouched.
Parameters
----------
restart_file : string
Path to the scheduler ``restart.toml``.
steps : int
The new total step target to write into ``[simulation] steps``.
"""
import tomli
import tomli_w
with open(restart_file, 'rb') as handle:
restart = tomli.load(handle)
restart.setdefault('simulation', {})['steps'] = int(steps)
with open(restart_file, 'wb') as handle:
tomli_w.dump(restart, handle)
[docs]def run_native_via_infswap(inputfile, runpath):
"""Run a native RETIS config through the infinite-swapping coordinator.
This is the opt-in compatibility route. It translates the native
configuration to the coordinator's config dictionary
(:func:`pyretis.inout.native_compat.native_to_infswap_config`),
generates the coordinator's ``load_dir`` from the native kick
initiation when requested
(:func:`pyretis.inout.native_compat.generate_load_dir_via_kick`),
writes the translated ``infswap.toml`` and hands it to the unchanged
scheduler via :func:`run_infinite_swapping`.
Parameters
----------
inputfile : string
Path to the native RETIS input TOML.
runpath : string
The directory the simulation runs from (where ``load`` and the
translated ``infswap.toml`` are written).
"""
import tomli_w
from pyretis.inout.settings import parse_settings_file
from pyretis.inout.native_compat import (
native_to_infswap_config,
generate_load_dir_via_kick,
)
native_config = parse_settings_file(inputfile)
config = native_to_infswap_config(native_config)
method = str(
native_config.get('initial-path', {}).get('method', 'kick')
).lower()
# --- Continuation / fresh-start handling --------------------------
# The scheduler persists its full state (cstep, frac, per-worker RNG)
# in ``./restart.toml``. A native config requesting a restart
# continuation therefore RESUMES from that file rather than
# re-translating a fresh infswap.toml -- which the inf setup would
# reject, since it forbids a (differently named) run file alongside a
# restart.toml. setup_config reads ``restart.toml`` as both the input
# and the restart file, so ``inp == re_inp`` and the guard passes;
# exe_dir == cwd (see main), so the bare name resolves in the run dir.
restart_file = os.path.join(runpath, 'restart.toml')
if method == 'restart' and os.path.isfile(restart_file):
# Carry the native step target into the restart so a continuation
# to a higher cycle count actually advances (the scheduler stops
# once cstep reaches steps).
_apply_restart_steps(restart_file, config['simulation']['steps'])
logger.progress('Resuming the scheduler from %s.', restart_file)
run_infinite_swapping('restart.toml')
return
if method == 'restart':
logger.progress('initial-path method=restart but no %s found; '
'starting fresh via kick.', restart_file)
method = 'kick'
# Fresh start: discard any stale restart.toml left by an earlier
# scheduler run so the inf setup guard does not reject the freshly
# translated infswap.toml (this route decides kick-vs-resume here, not
# through that guard).
if os.path.isfile(restart_file):
logger.progress('Discarding stale %s for a fresh scheduler run.',
restart_file)
os.remove(restart_file)
if method == 'kick':
logger.progress('Generating initial paths via native kick '
'initiation for the coordinator load directory.')
generate_load_dir_via_kick(native_config, runpath)
translated = os.path.join(runpath, 'infswap.toml')
with open(translated, 'wb') as handle:
tomli_w.dump(config, handle)
logger.progress('Translated native config written to: %s', translated)
run_infinite_swapping(translated)
[docs]def set_up_simulation(inputfile, runpath):
"""Run all the needed generic set-up.
Parameters
----------
inputfile : string
The input file which defines the simulation.
runpath : string
The base path we are running the simulation from.
Returns
-------
runner : method
A method which can be used to execute the simulation.
sim : object like :py:class:`.Simulation`
The simulation defined by the input file.
syst : object like :py:class:`.System`
The system created.
sim_settings : dict
The input settings read from the input file.
"""
if not os.path.isfile(inputfile):
raise ValueError(f'Input file "{inputfile}" NOT found!')
logger.progress('Reading input settings from: %s', inputfile)
logger.progress('Setting up simulation')
sim_settings = parse_settings_file(inputfile)
# NB this is not transmitted to the ensembles
sim_settings['simulation']['exe_path'] = runpath
sim_settings['engine']['exe_path'] = runpath
for ens in sim_settings.get('ensemble', []):
ens['simulation']['exe_path'] = runpath
ens['engine']['exe_path'] = runpath
logtxt = 'Set up and create simulation.'
logger.info(logtxt)
sim = create_simulation(sim_settings)
task = sim_settings['simulation']['task'].lower()
logger.progress('Setup for simulation "%s" is done.', task)
runner = _RUNNERS.get(task, run_generic_simulation)
return runner, sim, sim_settings
[docs]def store_simulation_settings(settings, indir, backup, ext='.rst'):
"""Store the parsed input settings.
Parameters
----------
settings : dict
The simulation settings.
indir : string
The directory which contains the input script.
backup : boolean
If True, an existing settings file will be backed up.
ext : string
Extension for the regenerated settings dump. Matches the
input file extension, so a ``.toml`` run writes ``out.toml``
and a ``.rst`` run writes ``out.rst``. Defaults to ``.rst``
for backwards compatibility.
"""
out_file = os.path.join(indir, f'out{ext}')
logger.info('Full simulation settings written to: %s', out_file)
write_settings_file(settings, out_file, backup=backup)
[docs]def remove_exit_file(exit_file):
"""Remove the EXIT file after a completed soft exit."""
try:
os.remove(exit_file)
except FileNotFoundError:
return
except OSError as error:
logger.warning('Could not remove EXIT file "%s": %s', exit_file,
error)
else:
logger.info('Removed EXIT file: %s', exit_file)
[docs]def soft_exit_ignore(turn_keyboard_interruption_off=True, exe_dir=None):
"""Manage the KeyboardInterrupt exception.
Parameters
----------
turn_keyboard_interruption_off : boolean
If True, instead of regular exiting from the program,
the file 'EXIT' is created to stop the PyRETIS.
exe_dir : string, optional
The path where EXIT file is expected.
"""
def soft_exit_handler(signum, frame): # pragma: no cover
"""Handle with a keyboard interruption signal."""
# pylint: disable=unused-argument
logger.progress('Attempting soft exit - terminating soon...')
pathlib.Path(os.path.join(exe_dir, 'EXIT')).touch(exist_ok=True)
if turn_keyboard_interruption_off:
return signal.signal(signal.SIGINT, soft_exit_handler)
return signal.signal(signal.SIGINT, signal.default_int_handler)
[docs]def _report_execution_error(error, log_level):
"""Log a stopped execution and write its traceback to the log only.
Shared by both CLI flows (the native in-process simulation and the
infinite-swapping scheduler) so a failure is reported the same way
regardless of which path ran. The friendly one-line message goes to
the screen; the full traceback goes to the log file only.
Parameters
----------
error : Exception
The exception currently being handled.
log_level : integer
The active log level. At ``DEBUG`` or below the caller should
re-raise so the traceback also reaches the screen.
Returns
-------
reraise : boolean
``True`` when the caller should re-raise (debug mode), so the
error is never silently swallowed.
"""
logger.error('"%s: %s".', error.__class__.__name__, error.args)
logger.error('ERROR - execution stopped.')
logger.error(
'Please see the LOG for the error message and traceback.'
)
# Print the traceback to the log-file, but not to the screen.
screen = logger.handlers[0]
lvl = screen.level
screen.setLevel(logging.CRITICAL + 1)
logger.error(traceback.format_exc())
screen.setLevel(lvl)
return log_level <= logging.DEBUG
[docs]def main(infile, indir, exe_dir, progress, log_level):
"""Execute PyRETIS.
Parameters
----------
infile : string
The input file to open with settings for PyRETIS.
indir : string
The folder containing the settings file.
exe_dir : string
The directory we are working from.
progress : boolean
Determines if we should use a progress bar or not.
log_level : integer
Determines if we should display the error traceback or not.
"""
simulation = None
settings = {}
exit_status = 0
# Stage C: native ``task = "tis"`` / ``"retis"`` TOML inputs run
# through the infinite-swapping scheduler at n_workers=1 by default
# (their config is translated by native_to_infswap_config). The
# legacy in-process loop can be forced with PYRETIS_NATIVE_LOOP=1,
# and the older PYRETIS_NATIVE_VIA_INFSWAP=1 opt-in still selects this
# route for a native ``retis`` config (now a no-op since it is the
# default). explore / pptis / repptis keep the in-process loop below.
route_to_scheduler = (
is_native_scheduler_config(infile) and not force_native_loop()
) or (native_via_infswap_requested()
and is_native_retis_config(infile, tasks=('retis', 'repptis')))
if route_to_scheduler:
logger.progress('Routing the native TIS/RETIS config through the '
'infinite-swapping scheduler at n_workers=1.')
def _sigterm_handler(signum, frame): # pragma: no cover
# pylint: disable=unused-argument
# One-shot: ignore any further SIGTERM during the unwind /
# cleanup below, so a second signal (``timeout`` can SIGTERM a
# routed run again while it logs "stopped cleanly" or tears the
# pool down) cannot re-enter and raise a second
# KeyboardInterrupt that escapes the except handler as an
# uncaught traceback. The finally clause restores old_sigterm.
signal.signal(signal.SIGTERM, signal.SIG_IGN)
raise KeyboardInterrupt
old_sigterm = signal.signal(signal.SIGTERM, _sigterm_handler)
try:
run_native_via_infswap(infile, exe_dir)
except KeyboardInterrupt:
logger.progress('Termination signal received; '
'stopped the scheduler cleanly.')
bye_bye_world()
return 130
except Exception as error: # pylint: disable=broad-exception-caught
if _report_execution_error(error, log_level):
raise
bye_bye_world()
return 1
finally:
signal.signal(signal.SIGTERM, old_sigterm)
logger.success('\nSimulation done\n')
bye_bye_world()
return 0
# Single-entry-point dispatch: an infinite-swapping input is run by
# its own scheduler (replica-exchange worker pool), not the
# in-process simulation flow below.
if is_infinite_swapping_config(infile):
logger.progress('Input selects the infinite-swapping sampler; '
'running the replica-exchange scheduler.')
# HPC schedulers (e.g. SLURM at walltime), ``kill`` and ``scancel``
# send SIGTERM, whose default action terminates the interpreter
# without running atexit/finally handlers -- orphaning the worker
# pool (and any live engine subprocesses). Convert SIGTERM into the
# same clean unwind as Ctrl-C so the pool is released. restart.toml
# is written atomically (os.replace), so interrupting a run cannot
# corrupt the restart state.
def _sigterm_handler(signum, frame): # pragma: no cover
# pylint: disable=unused-argument
# One-shot: ignore any further SIGTERM during the unwind /
# cleanup below, so a second signal (``timeout`` can SIGTERM a
# routed run again while it logs "stopped cleanly" or tears the
# pool down) cannot re-enter and raise a second
# KeyboardInterrupt that escapes the except handler as an
# uncaught traceback. The finally clause restores old_sigterm.
signal.signal(signal.SIGTERM, signal.SIG_IGN)
raise KeyboardInterrupt
old_sigterm = signal.signal(signal.SIGTERM, _sigterm_handler)
try:
run_infinite_swapping(infile)
except KeyboardInterrupt:
logger.progress('Termination signal received; '
'stopped the scheduler cleanly.')
bye_bye_world()
return 130
except Exception as error: # pylint: disable=broad-exception-caught
if _report_execution_error(error, log_level):
raise
bye_bye_world()
return 1
finally:
signal.signal(signal.SIGTERM, old_sigterm)
logger.success('\nSimulation done\n')
bye_bye_world()
return 0
exit_file = os.path.join(exe_dir, 'EXIT')
if os.path.isfile(exit_file):
logger.progress(
'Exit file found - Remove it before executing PyRETIS.')
logger.error('* %s file found *', exit_file)
logger.error('Remove the file to execute PyRETIS')
bye_bye_world()
return 1
in_ext = os.path.splitext(infile)[1].lower() or '.rst'
try:
run, simulation, settings = set_up_simulation(infile, exe_dir)
store_simulation_settings(settings, indir, True, ext=in_ext)
# Run the simulation:
soft_exit_ignore(turn_keyboard_interruption_off=True, exe_dir=exe_dir)
run(simulation, settings, progress=progress)
soft_exit_ignore(turn_keyboard_interruption_off=False,
exe_dir=exe_dir)
except Exception as error: # pylint: disable=broad-exception-caught
exit_status = 1
if _report_execution_error(error, log_level):
raise
finally:
# Write out the simulation settings as they were parsed and
# add some additional info:
if simulation is not None:
end = getattr(simulation, 'cycle', {'step': None})['step']
if end is not None:
settings['simulation']['endcycle'] = end
logtxt = f'Execution ended at step {end}'
logger.progress(logtxt)
logger.success('\nSimulation done\n')
store_simulation_settings(settings, indir, False, ext=in_ext)
if exit_status == 0:
remove_exit_file(exit_file)
bye_bye_world()
return exit_status
[docs]def entry_point(): # pragma: no cover
"""entry_point - The entry point for the pip install of pyretisrun."""
colorama.init(autoreset=True)
parser = argparse.ArgumentParser(description=PROGRAM_NAME)
parser.add_argument('-i', '--input',
help=f'Location of {PROGRAM_NAME} input file',
required=True)
parser.add_argument('-V', '--version', action='version',
version=f'{PROGRAM_NAME} {VERSION}')
parser.add_argument('-f', '--log_file',
help='Specify log file to write',
required=False,
default=f'{PROGRAM_NAME.lower()}.log')
parser.add_argument('-l', '--log_level',
help='Specify log level for log file',
required=False,
default='INFO')
parser.add_argument('-p', '--progress', action='store_true',
help=('Display a progress meter instead of text '
'output for the simulation'))
args_dict = vars(parser.parse_args())
input_file = args_dict['input']
# Store directories:
cwd_dir = os.getcwd()
input_dir = os.path.dirname(input_file)
if not os.path.isdir(input_dir):
input_dir = os.getcwd()
# Define a file logger:
create_backup(args_dict['log_file'])
fileh = logging.FileHandler(args_dict['log_file'], mode='a')
log_levl = getattr(logging, args_dict['log_level'].upper(),
logging.INFO)
fileh.setLevel(log_levl)
fileh.setFormatter(get_log_formatter(log_levl))
logger.addHandler(fileh)
# Here, we just check the python version. PyRETIS should anyway
# fail before this for python2.
check_python_version()
hello_world(input_file, cwd_dir, args_dict['log_file'])
sys.exit(main(input_file, input_dir, cwd_dir,
args_dict['progress'], log_levl))
if __name__ == '__main__': # pragma: no cover
entry_point()