Source code for pyretis.core.mc_moves

# Copyright (c) 2026, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Shared Monte-Carlo path-sampling move primitives.

This module holds the engine-agnostic MC move primitives that several
parts of |pyretis| need without pulling in the whole native TIS/RETIS
loop: the shooting move and its helpers, and the time-reversal move.
They were factored out of :py:mod:`pyretis.core.tis` so that consumers
which only need the primitives -- notably the initial-path *kick*
machinery (:py:mod:`pyretis.initiation.initiate_kick`, used by both the
native loop and the scheduler's native-route input shim) -- do not depend
on ``tis.py``. ``tis.py`` re-imports these names, so existing
``from pyretis.core.tis import shoot, time_reversal`` users keep working.

Important
---------
The move bodies are kept byte-for-byte identical to their previous
``tis.py`` definitions: the random-number draw order, acceptance criteria
and sub-path construction are unchanged, so results are reproducible
across the move.
"""
import logging

from pyretis.core.montecarlo import metropolis_accept_reject
from pyretis.core.path import paste_paths, INITIAL_MOVES
from pyretis.core.velocity import (is_aimless_velocity_setting,
                                   make_velocity_settings)

logger = logging.getLogger(__name__)  # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())

__all__ = ['shoot', 'time_reversal']


[docs]def _order_value(order): """Return the first order parameter component (scalar or vector).""" try: return order[0] except (TypeError, IndexError): return order
[docs]def time_reversal(path, order_function, interfaces, start_condition): """Perform a time-reversal move. Parameters ---------- path : object like :py:class:`.PathBase` This is the input path which will be used for generating a new path. order_function : object like :py:class:`.OrderParameter` The class used for obtaining the order parameter(s). interfaces : list/tuple of floats These are the interface positions of the form ``[left, middle, right]``. start_condition : string The starting condition, 'L'eft or 'R'ight. Returns ------- out[0] : boolean True if the path can be accepted. out[1] : object like :py:class:`.PathBase` The time reversed path. out[2] : string Status of the path, this is one of the strings defined in `.path._STATUS`. """ new_path = path.reverse(order_function) start, _, _, _ = new_path.check_interfaces(interfaces) # Explicitly set how this was generated. A time reversal reuses the same # phase points, so an initialization path stays an initialization path # (it keeps its flag and remains subject to the flush / warm-up). if path.get_move() in INITIAL_MOVES: new_path.set_move(path.get_move()) else: new_path.generated = ('tr', 0, 0, 0) if start in set(start_condition): accept = True status = 'ACC' else: accept = False status = 'BWI' # Backward trajectory end at wrong interface. new_path.status = status return accept, new_path, status
def prepare_shooting_point(path, ensemble, tis_settings): """Select and modify velocities for a shooting move. This method will randomly select a shooting point from a given path and modify its velocities. Parameters ---------- path : object like :py:class:`.PathBase` This is the input path which will be used for generating a new path. ensemble : dictionary of objects It contains: * `order_function`: object like :py:class:`.OrderParameter` The class used for obtaining the order parameter(s). * `engine`: object like :py:class:`.EngineBase` The engine to use for propagating a path. * `rgen`: object like :py:class:`.RandomGenerator` This is the random generator that will be used. * `interfaces`: list/tuple of floats These are the interface positions of the form ``[left, middle, right]``. tis_settings : dict This contains the settings for TIS. Here, we use the settings which dictates how we modify the velocities. * `sigma_v`: velocity width. Negative values select aimless shooting. Returns ------- out[0] : object like :py:class:`.System` The shooting point with modified velocities. out[1] : integer The index of the shooting point in the original path. out[2] : float The change in kinetic energy when modifying the velocities. """ if path.length < 3: ens_name = ensemble['path_ensemble'].ensemble_name if path.get_move() in INITIAL_MOVES: msg = ( f'Path of length {path.length} was loaded for ensemble ' f'{ens_name} but contains no interior frames to shoot from. ' 'The loaded file does not provide a suitable initial path for ' 'this ensemble.\n' 'Suggestion: add "load_and_kick = True" in the [initial-path] ' 'section of the input file so PyRETIS will kick a frame from ' 'the loaded path to generate a valid initial path.' ) else: msg = ( f'Path of length {path.length} was generated for ensemble ' f'{ens_name} but contains no interior frames to shoot from. ' 'A shootable path requires at least 3 frames.\n' 'Suggestions to resolve this:\n' ' - Reduce the timestep\n' ' - Decrease the number of subcycles\n' ' - Move the interfaces further apart' ) raise ValueError(msg) # Select the shooting point with the ENSEMBLE's rgen, not the path's # own ``self.rgen``. A path's rgen can be aliased across ensembles # (``empty_path``/``copy`` share it, and ``retis_swap_zero`` builds the # [0-] and [0+] paths from one source path), so two ensembles could # draw their shooting point from the same generator object -- a draw in # one then advances the other, and continue-vs-restart equivalence # breaks once they are restored as separate objects. The per-ensemble # rgen is distinct and restart-faithful; this also matches the # infinite-swapping path, which passes the rgen in explicitly (A3.1a). shooting_point, idx = path.get_shooting_point( criteria=tis_settings.get('shooting_move', 'rnd'), interfaces=ensemble.get('interfaces'), rgen=ensemble['rgen']) engine = ensemble['engine'] orderp = shooting_point.order logger.info('Shooting from order parameter/index: %f, %d', orderp[0], idx) # Copy the shooting point, so that we can modify velocities without # altering the original path: shooting_copy = shooting_point.copy() ensemble['system'] = shooting_copy engine.order_function = ensemble['order_function'] # Modify the velocities: dek, _, = engine.modify_velocities( shooting_copy, make_velocity_settings(tis_settings), ensemble['rgen']) orderp = engine.calculate_order(shooting_copy) shooting_copy.order = orderp return shooting_copy, idx, dek def check_kick(shooting_point, interfaces, trial_path, rgen, dek, tis_settings): """Check the modification of the shooting point. After generating velocities for a shooting point, we do some additional checking to see if the shooting point is acceptable. Parameters ---------- shooting_point : object like :py:class:`.System` The shooting point with modified velocities. interfaces : list of floats The interfaces used for TIS, in the format ``[left, middle, right]``. trial_path : object like :py:class:`.PathBase` The path we are currently generating. rgen : object like :py:class:`.RandomGenerator` This is the random generator that will be used to check if we accept the shooting point based on the change in kinetic energy. dek : float The change in kinetic energy when modifying the velocities. tis_settings : dict This contains the settings for TIS. Returns ------- out : boolean True if the kick was OK, False otherwise. """ # 1) Check if the kick was too violent: left, _, right = interfaces if 'exp' in tis_settings.get('shooting_move', {}): return True if not left <= _order_value(shooting_point.order) < right: # Shooting point was velocity dependent and was kicked outside # of boundaries when modifying velocities. trial_path.append(shooting_point) trial_path.status = 'KOB' return False # 2) If the kick is not aimless, we check if we reject it or not: if not is_aimless_velocity_setting(tis_settings): accept_kick = metropolis_accept_reject(rgen, shooting_point, dek) # If one wish to implement a bias call, this can be done here. if not accept_kick: trial_path.append(shooting_point) trial_path.status = 'MCR' # Momenta Change Rejection. return False return True def shoot_backwards(path_back, trial_path, ensemble, tis_settings, start_cond): """Shoot in the backward time direction. Parameters ---------- path_back : object like :py:class:`.PathBase` The path we will fill with phase points from the propagation. trial_path : object like :py:class:`.PathBase` The current trial path generated by the shooting. ensemble : dictionary of objects It contains: * `order_function`: object like :py:class:`.OrderParameter` The class used for obtaining the order parameter(s). * `engine`: object like :py:class:`.EngineBase` The engine to use for propagating a path. * `interfaces`: list/tuple of floats These are the interface positions of the form ``[left, middle, right]``. * `system`: object like :py:class:`.System` The system that originates the path. tis_settings : dict This contains the settings for TIS. start_cond : string The starting condition for the current ensemble, 'L'eft or 'R'ight. Returns ------- out : boolean True if the backward path was generated successfully, False otherwise. """ logger.debug('Propagating backwards for the shooting move.') path_back.time_origin = trial_path.time_origin engine = ensemble['engine'] engine.order_function = ensemble['order_function'] success_back, _ = engine.propagate(path_back, ensemble, ensemble['system'], reverse=True) if not success_back: # Something went wrong, most probably the path length was exceeded. trial_path.status = 'BTL' # BTL = backward trajectory too long. # Add the failed path to trial path for analysis: trial_path += path_back if path_back.length >= tis_settings['maxlength'] - 1: # BTX is backward trajectory longer than maximum memory. trial_path.status = 'BTX' return False # Backward seems OK so far, check if the ending point is correct: left, _, right = ensemble['interfaces'] if path_back.get_end_point(left, right) not in set(start_cond): # Nope, backward trajectory end at wrong interface. trial_path += path_back # Store path for analysis. trial_path.status = 'BWI' return False return True
[docs]def shoot(ensemble, tis_settings, start_cond, shooting_point=None): """Perform a shooting-move. This function will perform the shooting move from a randomly selected time-slice. Parameters ---------- ensemble: dict It contains: * `path_ensemble`: object like :py:class:`.PathEnsemble` This is the path ensemble to perform the TIS step for. * `system`: object like :py:class:`.System` System is used here since we need access to the temperature and to the particle list. * `order_function`: object like :py:class:`.OrderParameter` The class used for obtaining the order parameter(s). * `engine`: object like :py:class:`.EngineBase` The engine to use for propagating a path. * `rgen`: object like :py:class:`.RandomGenerator` This is the random generator that will be used. * `interfaces`: list of floats These are the interface positions on form [left, middle, right]. tis_settings : dict This contains the settings for TIS: * `sigma_v`: velocity width. Negative values select aimless shooting. * `allowmaxlength`: boolean, should paths be allowed to reach maximum length? * `maxlength`: integer, maximum allowed length of paths. start_cond : string or tuple of strings The starting condition for the current ensemble, 'L'eft or 'R'ight. ('L', 'R'), the default option, implies no directional difference. shooting_point: object like :py:class:`.System`, optional If given, it is the shooting point from which the path is generated. Returns ------- out[0] : boolean True if the path can be accepted. out[1] : object like :py:class:`.PathBase` Returns the generated path. out[2] : string Status of the path, this is one of the strings defined in :py:const:`.path._STATUS`. """ path_ensemble = ensemble['path_ensemble'] path = path_ensemble.last_path interfaces = ensemble['interfaces'] trial_path = path.empty_path() # The trial path we will generate. if shooting_point is None: shooting_point, idx, dek = prepare_shooting_point( path, ensemble, tis_settings ) kick = check_kick(shooting_point, interfaces, trial_path, ensemble['rgen'], dek, tis_settings) else: kick = True idx = getattr(shooting_point, 'idx', 0) # Store info about this point, just in case we have to return # before completing a full new path: trial_path.generated = ('sh', _order_value(shooting_point.order), idx, 0) trial_path.time_origin = path.time_origin + idx # We now check if the kick was OK or not: if not kick: return False, trial_path, trial_path.status # OK: kick was either aimless or it was accepted by Metropolis # we should now generate trajectories, but first check how long # it should be (if the path comes from a sparse load, it is assumed to # not respect the detail balance anyway): if path.get_move() == 'ld' or tis_settings['allowmaxlength']: maxlen = tis_settings['maxlength'] else: # Draw the random max length from the ENSEMBLE rgen, not the path's # own (aliasable) rgen -- see the note above the get_shooting_point # call. maxlen = min(int((path.length - 2) / ensemble['rgen'].rand()[0]) + 2, tis_settings['maxlength']) # Since the forward path must be at least one step, the maximum # length for the backward path is maxlen-1. # Generate the backward path: path_back = path.empty_path(maxlen=maxlen - 1) # Set ensemble state to the selected shooting point: ensemble['system'] = shooting_point.copy() if not shoot_backwards(path_back, trial_path, ensemble, tis_settings, start_cond): return False, trial_path, trial_path.status # Everything seems fine, now propagate forward. # Note that the length of the forward path is adjusted to # account for the fact that it shares a point with the backward # path (i.e. the shooting point). The duplicate point is just # counted once when the paths are merged by the method # `paste_paths` by setting `overlap=True` (which indicates that # the forward and backward paths share a point). path_forw = path.empty_path(maxlen=maxlen - path_back.length + 1) logger.debug('Propagating forwards for shooting move...') # Set ensemble state to the selected shooting point: # change the system state. ensemble['system'] = shooting_point.copy() ensemble['engine'].order_function = ensemble['order_function'] success_forw, _ = ensemble['engine'].propagate(path_forw, ensemble, ensemble['system'], reverse=False) path_forw.time_origin = trial_path.time_origin # Now, the forward propagation could have failed by exceeding the # maximum length for the forward path. However, it could also fail # when we paste together so that the length is larger than the # allowed maximum. We paste first and ask later: trial_path = paste_paths(path_back, path_forw, overlap=True, maxlen=tis_settings['maxlength']) # Also update information about the shooting: trial_path.generated = ('sh', _order_value(shooting_point.order), idx, path_back.length - 1) trial_path.weight = 1. if not success_forw: trial_path.status = 'FTL' # If we reached this point, the backward path was successful, # but the forward was not. For the case where the forward was # also successful, the length of the trial path cannot exceed # the maximum length given in the TIS settings. Thus we only # need to check this here, i.e. when given that the backward # was successful and the forward not: if trial_path.length == tis_settings['maxlength']: trial_path.status = 'FTX' # exceeds "memory". # Deal with the rejections for path properties. # Make sure we did not hit the left interface on {0-} if this is disallowed # Cases: # Permeability=true: starting/ending at L is allowed in [0-] # RETIS: [0-] is the only ensemble that allows paths starting R # (RE)PPTIS: starting/ending at L and R is allowed for body ensembles. # exception: [0^{+-}'] is not allowed to start AND end at R. # As this ensemble has must_cross_M=True, and L=M, # This is automatically taken care of. elif ('L' not in set(path_ensemble.start_condition) and 'L' in trial_path.check_interfaces(interfaces)[:2]): trial_path.status = '0-L' # Last check - Did we cross the middle interface? # if we are using PPTIS or REPPTIS elif getattr(path_ensemble, 'must_cross_M', False) and \ (not trial_path.check_interfaces(interfaces)[-1][1] or not (trial_path.check_interfaces(interfaces)[-1][0] or trial_path.check_interfaces(interfaces)[-1][2])): # not for [0-] # detect if: minorder < middle interf <= maxorder # or # if we do cross middle interface, the # path still has to come from left or right # so we need cross[0] or cross[2] to be true trial_path.status = 'NCR' # if we are not using PPTIS or REPPTIS # Special treatment if paths can start everywhere elif set(('R', 'L')) != set(path_ensemble.start_condition) and \ not trial_path.check_interfaces(interfaces)[-1][1]: # No, we did not cross the middle interface: trial_path.status = 'NCR' else: # If nothing went wrong, then... trial_path.status = 'ACC' return True, trial_path, trial_path.status return False, trial_path, trial_path.status