# Copyright (c) 2026, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Shared Monte-Carlo path-sampling move primitives.
This module holds the engine-agnostic MC move primitives that several
parts of |pyretis| need without pulling in the whole native TIS/RETIS
loop: the shooting move and its helpers, and the time-reversal move.
They were factored out of :py:mod:`pyretis.core.tis` so that consumers
which only need the primitives -- notably the initial-path *kick*
machinery (:py:mod:`pyretis.initiation.initiate_kick`, used by both the
native loop and the scheduler's native-route input shim) -- do not depend
on ``tis.py``. ``tis.py`` re-imports these names, so existing
``from pyretis.core.tis import shoot, time_reversal`` users keep working.
Important
---------
The move bodies are kept byte-for-byte identical to their previous
``tis.py`` definitions: the random-number draw order, acceptance criteria
and sub-path construction are unchanged, so results are reproducible
across the move.
"""
import logging
from pyretis.core.montecarlo import metropolis_accept_reject
from pyretis.core.path import paste_paths, INITIAL_MOVES
from pyretis.core.velocity import (is_aimless_velocity_setting,
make_velocity_settings)
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())
__all__ = ['shoot', 'time_reversal']
[docs]def _order_value(order):
"""Return the first order parameter component (scalar or vector)."""
try:
return order[0]
except (TypeError, IndexError):
return order
[docs]def time_reversal(path, order_function, interfaces, start_condition):
"""Perform a time-reversal move.
Parameters
----------
path : object like :py:class:`.PathBase`
This is the input path which will be used for generating a
new path.
order_function : object like :py:class:`.OrderParameter`
The class used for obtaining the order parameter(s).
interfaces : list/tuple of floats
These are the interface positions of the form
``[left, middle, right]``.
start_condition : string
The starting condition, 'L'eft or 'R'ight.
Returns
-------
out[0] : boolean
True if the path can be accepted.
out[1] : object like :py:class:`.PathBase`
The time reversed path.
out[2] : string
Status of the path, this is one of the strings defined in
`.path._STATUS`.
"""
new_path = path.reverse(order_function)
start, _, _, _ = new_path.check_interfaces(interfaces)
# Explicitly set how this was generated. A time reversal reuses the same
# phase points, so an initialization path stays an initialization path
# (it keeps its flag and remains subject to the flush / warm-up).
if path.get_move() in INITIAL_MOVES:
new_path.set_move(path.get_move())
else:
new_path.generated = ('tr', 0, 0, 0)
if start in set(start_condition):
accept = True
status = 'ACC'
else:
accept = False
status = 'BWI' # Backward trajectory end at wrong interface.
new_path.status = status
return accept, new_path, status
def prepare_shooting_point(path, ensemble, tis_settings):
"""Select and modify velocities for a shooting move.
This method will randomly select a shooting point from a given
path and modify its velocities.
Parameters
----------
path : object like :py:class:`.PathBase`
This is the input path which will be used for generating a
new path.
ensemble : dictionary of objects
It contains:
* `order_function`: object like :py:class:`.OrderParameter`
The class used for obtaining the order parameter(s).
* `engine`: object like :py:class:`.EngineBase`
The engine to use for propagating a path.
* `rgen`: object like :py:class:`.RandomGenerator`
This is the random generator that will be used.
* `interfaces`: list/tuple of floats
These are the interface positions of the form
``[left, middle, right]``.
tis_settings : dict
This contains the settings for TIS. Here, we use the
settings which dictates how we modify the velocities.
* `sigma_v`: velocity width. Negative values select aimless shooting.
Returns
-------
out[0] : object like :py:class:`.System`
The shooting point with modified velocities.
out[1] : integer
The index of the shooting point in the original path.
out[2] : float
The change in kinetic energy when modifying the velocities.
"""
if path.length < 3:
ens_name = ensemble['path_ensemble'].ensemble_name
if path.get_move() in INITIAL_MOVES:
msg = (
f'Path of length {path.length} was loaded for ensemble '
f'{ens_name} but contains no interior frames to shoot from. '
'The loaded file does not provide a suitable initial path for '
'this ensemble.\n'
'Suggestion: add "load_and_kick = True" in the [initial-path] '
'section of the input file so PyRETIS will kick a frame from '
'the loaded path to generate a valid initial path.'
)
else:
msg = (
f'Path of length {path.length} was generated for ensemble '
f'{ens_name} but contains no interior frames to shoot from. '
'A shootable path requires at least 3 frames.\n'
'Suggestions to resolve this:\n'
' - Reduce the timestep\n'
' - Decrease the number of subcycles\n'
' - Move the interfaces further apart'
)
raise ValueError(msg)
# Select the shooting point with the ENSEMBLE's rgen, not the path's
# own ``self.rgen``. A path's rgen can be aliased across ensembles
# (``empty_path``/``copy`` share it, and ``retis_swap_zero`` builds the
# [0-] and [0+] paths from one source path), so two ensembles could
# draw their shooting point from the same generator object -- a draw in
# one then advances the other, and continue-vs-restart equivalence
# breaks once they are restored as separate objects. The per-ensemble
# rgen is distinct and restart-faithful; this also matches the
# infinite-swapping path, which passes the rgen in explicitly (A3.1a).
shooting_point, idx = path.get_shooting_point(
criteria=tis_settings.get('shooting_move', 'rnd'),
interfaces=ensemble.get('interfaces'),
rgen=ensemble['rgen'])
engine = ensemble['engine']
orderp = shooting_point.order
logger.info('Shooting from order parameter/index: %f, %d', orderp[0], idx)
# Copy the shooting point, so that we can modify velocities without
# altering the original path:
shooting_copy = shooting_point.copy()
ensemble['system'] = shooting_copy
engine.order_function = ensemble['order_function']
# Modify the velocities:
dek, _, = engine.modify_velocities(
shooting_copy, make_velocity_settings(tis_settings),
ensemble['rgen'])
orderp = engine.calculate_order(shooting_copy)
shooting_copy.order = orderp
return shooting_copy, idx, dek
def check_kick(shooting_point, interfaces, trial_path, rgen, dek,
tis_settings):
"""Check the modification of the shooting point.
After generating velocities for a shooting point, we
do some additional checking to see if the shooting point is
acceptable.
Parameters
----------
shooting_point : object like :py:class:`.System`
The shooting point with modified velocities.
interfaces : list of floats
The interfaces used for TIS, in the format
``[left, middle, right]``.
trial_path : object like :py:class:`.PathBase`
The path we are currently generating.
rgen : object like :py:class:`.RandomGenerator`
This is the random generator that will be used to check if
we accept the shooting point based on the change in kinetic
energy.
dek : float
The change in kinetic energy when modifying the velocities.
tis_settings : dict
This contains the settings for TIS.
Returns
-------
out : boolean
True if the kick was OK, False otherwise.
"""
# 1) Check if the kick was too violent:
left, _, right = interfaces
if 'exp' in tis_settings.get('shooting_move', {}):
return True
if not left <= _order_value(shooting_point.order) < right:
# Shooting point was velocity dependent and was kicked outside
# of boundaries when modifying velocities.
trial_path.append(shooting_point)
trial_path.status = 'KOB'
return False
# 2) If the kick is not aimless, we check if we reject it or not:
if not is_aimless_velocity_setting(tis_settings):
accept_kick = metropolis_accept_reject(rgen, shooting_point, dek)
# If one wish to implement a bias call, this can be done here.
if not accept_kick:
trial_path.append(shooting_point)
trial_path.status = 'MCR' # Momenta Change Rejection.
return False
return True
def shoot_backwards(path_back, trial_path, ensemble,
tis_settings, start_cond):
"""Shoot in the backward time direction.
Parameters
----------
path_back : object like :py:class:`.PathBase`
The path we will fill with phase points from the propagation.
trial_path : object like :py:class:`.PathBase`
The current trial path generated by the shooting.
ensemble : dictionary of objects
It contains:
* `order_function`: object like :py:class:`.OrderParameter`
The class used for obtaining the order parameter(s).
* `engine`: object like :py:class:`.EngineBase`
The engine to use for propagating a path.
* `interfaces`: list/tuple of floats
These are the interface positions of the form
``[left, middle, right]``.
* `system`: object like :py:class:`.System`
The system that originates the path.
tis_settings : dict
This contains the settings for TIS.
start_cond : string
The starting condition for the current ensemble, 'L'eft or
'R'ight.
Returns
-------
out : boolean
True if the backward path was generated successfully, False
otherwise.
"""
logger.debug('Propagating backwards for the shooting move.')
path_back.time_origin = trial_path.time_origin
engine = ensemble['engine']
engine.order_function = ensemble['order_function']
success_back, _ = engine.propagate(path_back, ensemble,
ensemble['system'], reverse=True)
if not success_back:
# Something went wrong, most probably the path length was exceeded.
trial_path.status = 'BTL' # BTL = backward trajectory too long.
# Add the failed path to trial path for analysis:
trial_path += path_back
if path_back.length >= tis_settings['maxlength'] - 1:
# BTX is backward trajectory longer than maximum memory.
trial_path.status = 'BTX'
return False
# Backward seems OK so far, check if the ending point is correct:
left, _, right = ensemble['interfaces']
if path_back.get_end_point(left, right) not in set(start_cond):
# Nope, backward trajectory end at wrong interface.
trial_path += path_back # Store path for analysis.
trial_path.status = 'BWI'
return False
return True
[docs]def shoot(ensemble, tis_settings, start_cond, shooting_point=None):
"""Perform a shooting-move.
This function will perform the shooting move from a randomly
selected time-slice.
Parameters
----------
ensemble: dict
It contains:
* `path_ensemble`: object like :py:class:`.PathEnsemble`
This is the path ensemble to perform the TIS step for.
* `system`: object like :py:class:`.System`
System is used here since we need access to the temperature
and to the particle list.
* `order_function`: object like :py:class:`.OrderParameter`
The class used for obtaining the order parameter(s).
* `engine`: object like :py:class:`.EngineBase`
The engine to use for propagating a path.
* `rgen`: object like :py:class:`.RandomGenerator`
This is the random generator that will be used.
* `interfaces`: list of floats
These are the interface positions on form
[left, middle, right].
tis_settings : dict
This contains the settings for TIS:
* `sigma_v`: velocity width. Negative values select aimless shooting.
* `allowmaxlength`: boolean, should paths be allowed to reach
maximum length?
* `maxlength`: integer, maximum allowed length of paths.
start_cond : string or tuple of strings
The starting condition for the current ensemble, 'L'eft or
'R'ight. ('L', 'R'), the default option, implies no directional
difference.
shooting_point: object like :py:class:`.System`, optional
If given, it is the shooting point from which the path is generated.
Returns
-------
out[0] : boolean
True if the path can be accepted.
out[1] : object like :py:class:`.PathBase`
Returns the generated path.
out[2] : string
Status of the path, this is one of the strings defined in
:py:const:`.path._STATUS`.
"""
path_ensemble = ensemble['path_ensemble']
path = path_ensemble.last_path
interfaces = ensemble['interfaces']
trial_path = path.empty_path() # The trial path we will generate.
if shooting_point is None:
shooting_point, idx, dek = prepare_shooting_point(
path, ensemble, tis_settings
)
kick = check_kick(shooting_point, interfaces, trial_path,
ensemble['rgen'], dek, tis_settings)
else:
kick = True
idx = getattr(shooting_point, 'idx', 0)
# Store info about this point, just in case we have to return
# before completing a full new path:
trial_path.generated = ('sh', _order_value(shooting_point.order), idx, 0)
trial_path.time_origin = path.time_origin + idx
# We now check if the kick was OK or not:
if not kick:
return False, trial_path, trial_path.status
# OK: kick was either aimless or it was accepted by Metropolis
# we should now generate trajectories, but first check how long
# it should be (if the path comes from a sparse load, it is assumed to
# not respect the detail balance anyway):
if path.get_move() == 'ld' or tis_settings['allowmaxlength']:
maxlen = tis_settings['maxlength']
else:
# Draw the random max length from the ENSEMBLE rgen, not the path's
# own (aliasable) rgen -- see the note above the get_shooting_point
# call.
maxlen = min(int((path.length - 2) / ensemble['rgen'].rand()[0]) + 2,
tis_settings['maxlength'])
# Since the forward path must be at least one step, the maximum
# length for the backward path is maxlen-1.
# Generate the backward path:
path_back = path.empty_path(maxlen=maxlen - 1)
# Set ensemble state to the selected shooting point:
ensemble['system'] = shooting_point.copy()
if not shoot_backwards(path_back, trial_path, ensemble,
tis_settings, start_cond):
return False, trial_path, trial_path.status
# Everything seems fine, now propagate forward.
# Note that the length of the forward path is adjusted to
# account for the fact that it shares a point with the backward
# path (i.e. the shooting point). The duplicate point is just
# counted once when the paths are merged by the method
# `paste_paths` by setting `overlap=True` (which indicates that
# the forward and backward paths share a point).
path_forw = path.empty_path(maxlen=maxlen - path_back.length + 1)
logger.debug('Propagating forwards for shooting move...')
# Set ensemble state to the selected shooting point:
# change the system state.
ensemble['system'] = shooting_point.copy()
ensemble['engine'].order_function = ensemble['order_function']
success_forw, _ = ensemble['engine'].propagate(path_forw, ensemble,
ensemble['system'],
reverse=False)
path_forw.time_origin = trial_path.time_origin
# Now, the forward propagation could have failed by exceeding the
# maximum length for the forward path. However, it could also fail
# when we paste together so that the length is larger than the
# allowed maximum. We paste first and ask later:
trial_path = paste_paths(path_back, path_forw, overlap=True,
maxlen=tis_settings['maxlength'])
# Also update information about the shooting:
trial_path.generated = ('sh', _order_value(shooting_point.order), idx,
path_back.length - 1)
trial_path.weight = 1.
if not success_forw:
trial_path.status = 'FTL'
# If we reached this point, the backward path was successful,
# but the forward was not. For the case where the forward was
# also successful, the length of the trial path cannot exceed
# the maximum length given in the TIS settings. Thus we only
# need to check this here, i.e. when given that the backward
# was successful and the forward not:
if trial_path.length == tis_settings['maxlength']:
trial_path.status = 'FTX' # exceeds "memory".
# Deal with the rejections for path properties.
# Make sure we did not hit the left interface on {0-} if this is disallowed
# Cases:
# Permeability=true: starting/ending at L is allowed in [0-]
# RETIS: [0-] is the only ensemble that allows paths starting R
# (RE)PPTIS: starting/ending at L and R is allowed for body ensembles.
# exception: [0^{+-}'] is not allowed to start AND end at R.
# As this ensemble has must_cross_M=True, and L=M,
# This is automatically taken care of.
elif ('L' not in set(path_ensemble.start_condition) and
'L' in trial_path.check_interfaces(interfaces)[:2]):
trial_path.status = '0-L'
# Last check - Did we cross the middle interface?
# if we are using PPTIS or REPPTIS
elif getattr(path_ensemble, 'must_cross_M', False) and \
(not trial_path.check_interfaces(interfaces)[-1][1] or
not (trial_path.check_interfaces(interfaces)[-1][0] or
trial_path.check_interfaces(interfaces)[-1][2])):
# not for [0-]
# detect if: minorder < middle interf <= maxorder
# or
# if we do cross middle interface, the
# path still has to come from left or right
# so we need cross[0] or cross[2] to be true
trial_path.status = 'NCR'
# if we are not using PPTIS or REPPTIS
# Special treatment if paths can start everywhere
elif set(('R', 'L')) != set(path_ensemble.start_condition) and \
not trial_path.check_interfaces(interfaces)[-1][1]:
# No, we did not cross the middle interface:
trial_path.status = 'NCR'
else: # If nothing went wrong, then...
trial_path.status = 'ACC'
return True, trial_path, trial_path.status
return False, trial_path, trial_path.status