# Copyright (c) 2026, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""This file contains functions used in TIS.
This module defines the functions needed to perform TIS simulations
(algorithms implemented as described by van Erp et al. [TIS]_ ) with
advanced shooting moves, such as Stone Skipping and Web Throwing
(algorithms implemented as described by Riccardi et al. [SS+WT]_ ).
Methods defined here
~~~~~~~~~~~~~~~~~~~~
make_tis_step (:py:func:`.make_tis_step`)
A method that will perform a single TIS step.
make_tis_step_ensemble (:py:func:`.make_tis_step_ensemble`)
A method to perform a TIS step for a path ensemble. It will handle
adding of the path to a path ensemble object.
shoot (:py:func:`.shoot`)
A method that will perform a shooting move.
select_shoot (:py:func:`.select_shoot`)
A method that randomly selects the shooting point.
wire_fencing (:py:func:`.wire_fencing`)
A method that will do a wire fencing shooting move.
stone_skipping (:py:func:`.stone_skipping`)
A method that will do a stone skipping shooting move.
web_throwing (:py:func:`.web_throwing`)
A method that will do a web throwing shooting move.
extender (:py:func:`.extender`)
A method that will extend a path to target interfaces.
time_reversal (:py:func:`.time_reversal`)
A method for performing the time reversal move.
ss_wt_wf_acceptance (:py:func:`.ss_wt_wf_acceptance`)
A method to check the acceptance of a newly generated path
according to super detailed balance rules for Stone Skipping,
Wire Fencing (basic and High Acceptance version) and Web Throwing.
References
~~~~~~~~~~
.. [SS+WT] E. Riccardi, O. Dahlen, T. S. van Erp,
J. Phys. Chem. letters, 8, 18, 4456, (2017),
https://doi.org/10.1021/acs.jpclett.7b01617
.. [TIS] T. S. van Erp, D. Moroni, P. G. Bolhuis,
J. Chem. Phys. 118, 7762 (2003),
https://dx.doi.org/10.1063%2F1.1562614
"""
import logging
from pyretis.core.common import (null_move,
counter,
relative_shoots_select,
segments_counter,
compute_weight,
select_and_trim_a_segment,
crossing_counter,
priority_checker,
crossing_finder,
wirefence_weight_and_pick)
from pyretis.core.path import Path, paste_paths, INITIAL_MOVES
from pyretis.inout.restart import write_ensemble_restart
from pyretis.core.mc_moves import (_order_value,
time_reversal,
prepare_shooting_point,
check_kick,
shoot_backwards,
shoot)
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())
__all__ = ['extender',
'make_tis',
'make_tis_step',
'make_tis_step_ensemble',
'paste_paths',
'select_shoot',
'shoot',
'ss_wt_wf_acceptance',
'wire_fencing',
'stone_skipping',
'time_reversal',
'web_throwing']
[docs]def make_tis(ensembles, rgen, settings, cycle):
"""Perform a TIS step in a given path ensemble.
This function will execute the TIS steps. When used in the RETIS method,
three different options can be given:
1) If `relative_shoots` is given in the input settings as a list of
probability for each ensemble to be picked, then an ensemble
will be ranomly picked and TIS performed on. For all the
other ensembles we again have two options based on the given
`settings['nullmoves']`:
a) Do a 'null move' in all other ensembles.
b) Do nothing for all other ensembles.
Performing the null move in an ensemble will simply just accept
the previously accepted path in that ensemble again.
2) If `relative_shoots` is not given in the input settings, TIS moves
will be executed for all path ensembles.
Parameters
----------
ensembles : list of dictionaries of objects
This is a list of the ensembles we are using in the TIS method
with their properties.
They contain:
* `path_ensemble`: object like :py:class:`.PathEnsemble`
This is used for storing results for the simulation. It
is also used for defining the interfaces for this simulation.
* `system`: object like :py:class:`.System`
System is used here since we need access to the temperature
and to the particle list
* `order_function`: object like :py:class:`.OrderParameter`
The class used for calculating the order parameter(s).
* `engine`: object like :py:class:`.EngineBase`
The engine to use for propagating a path.
rgen : object like :py:class:`.RandomGenerator`
This is the random generator that will be used.
settings : dict
This dictionary contains the TIS settings.
cycle : int
The current cycle number.
Yields
------
out : dict
This dictionary contains the results of the TIS move(s).
"""
relative = settings.get('retis', {}).get('relative_shoots', None)
nullmoves = settings.get('retis', {}).get('nullmoves', False)
if relative is not None:
idx, ensemble = relative_shoots_select(ensembles, rgen, relative)
accept, trial, status = make_tis_step_ensemble(
ensemble, settings['ensemble'][idx], cycle
)
result = {
'ensemble_number': idx,
'mc-move': trial.get_move(),
'status': status,
'trial': trial,
'accept': accept
}
yield result
# Do null moves in the other ensembles, if requested:
if nullmoves:
for ensemble in ensembles:
other = ensemble['path_ensemble'].ensemble_number
if other != idx:
accept, trial, status = null_move(ensemble, cycle)
result = {
'ensemble_number': other,
'mc-move': 'nullmove',
'status': status,
'trial': trial,
'accept': accept,
}
if 0 == cycle % settings['ensemble'][idx].get(
'output', {}).get('restart-file', 1):
write_ensemble_restart(ensemble, settings)
yield result
else: # Do TIS in all ensembles:
prio_skip = priority_checker(ensembles, settings)
for i, (ens, e_set) in enumerate(
zip(ensembles, settings['ensemble'], strict=True)):
if prio_skip[i]:
# Here we skip the ensembles with higher cycle numbers.
yield
else:
accept, trial, status = make_tis_step_ensemble(ens,
e_set,
cycle)
result = {
'ensemble_number': ens['path_ensemble'].ensemble_number,
'mc-move': trial.get_move(),
'status': status,
'trial': trial,
'accept': accept
}
yield result
[docs]def make_tis_step_ensemble(ensemble, settings, cycle):
"""Perform a TIS step for a given path ensemble.
This function will run `make_tis_step` for the given path_ensemble.
It will handle adding of the path. This function is intended for
convenience when working with path ensembles. If we are using the
path ensemble ``[0^-]`` then the start condition should be 'R' for
right.
Parameters
----------
ensemble : dictionary of objects
It contains:
* `path_ensemble`: object like :py:class:`.PathEnsemble`
This is used for storing results for the simulation. It
is also used for defining the interfaces for this simulation.
* `system`: object like :py:class:`.System`
System is used here since we need access to the temperature
and to the particle list.
* `order_function`: object like :py:class:`.OrderParameter`
The class used for obtaining the order parameter(s).
* `engine`: object like :py:class:`.EngineBase`
The engine to use for propagating a path.
* `rgen`: object like :py:class:`.RandomGenerator`
This is the random generator that will be used.
settings : dict
This dictionary contains the ensemble settings.
cycle : int
The current cycle number.
Returns
-------
out[0] : boolean
True if the new path can be accepted, False otherwise.
out[1] : object like :py:class:`.PathBase`
The generated path.
out[2] : string
The status of the path.
"""
path_ensemble = ensemble['path_ensemble']
engine = ensemble['engine']
start_cond = path_ensemble.start_condition
logger.info('TIS move in: %s', path_ensemble.ensemble_name)
engine.exe_dir = path_ensemble.directory['generate']
accept, trial, status = make_tis_step(ensemble,
settings['tis'],
start_cond)
# If we are exploring, we accept everything.
if settings['simulation']['task'] == 'explore':
if trial.length > 2:
accept, status = True, 'EXP'
trial.status = 'EXP'
if accept:
logger.info('The move was accepted!')
else:
logger.info('The move was rejected! (%s)', status)
path_ensemble.add_path_data(trial, status, cycle=cycle)
if cycle % settings.get('output', {}).get('restart-file', 1) == 0:
write_ensemble_restart(ensemble, settings)
return accept, trial, status
[docs]def make_tis_step(ensemble, tis_settings, start_cond):
"""Perform a TIS step and generate a new path.
The new path will be generated from the input path, either by
performing a time-reversal move or by a shooting move. This is
determined pseudo-randomly by drawing a random number from a
uniform distribution using the given random generator.
Parameters
----------
ensemble : dictionary of objects
It contains:
* `path_ensemble`: object like :py:class:`.PathEnsemble`
This is the path ensemble to perform the TIS step for.
* `path`: object like :py:class:`.PathBase`
This is the input path which will be used for generating a
new path.
* `system`: object like :py:class:`.System`
System is used here since we need access to the temperature
and to the particle list.
* `order_function`: object like :py:class:`.OrderParameter`
The class used for obtaining the order parameter(s).
* `engine`: object like :py:class:`.EngineBase`
The engine to use for propagating a path.
* `rgen`: object like :py:class:`.RandomGenerator`
This is the random generator that will be used.
* `interfaces`: list of floats
These are the interface positions on form
[left, middle, right].
tis_settings : dict
This dictionary contains the settings for the TIS method. Here we
explicitly use:
* `freq`: float, the frequency of how often we should do time
reversal moves.
* `shooting_move`: string, the label of the shooting move to perform.
start_cond : string
The starting condition for the path. This is determined by the
ensemble we are generating for - it is 'R'ight or 'L'eft.
Returns
-------
out[0] : boolean
True if the new path can be accepted.
out[1] : object like :py:class:`.PathBase`
The generated path.
out[2] : string
The status of the path.
"""
interfaces = ensemble['interfaces']
order_function = ensemble['order_function']
path_ensemble = ensemble['path_ensemble']
rgen = ensemble['rgen']
last_path = path_ensemble.last_path
# Get weights
moves = ['tr', 'mirror', 'targetswap', 'sh']
weights = [tis_settings.get('freq', 0),
tis_settings.get('mirror_freq', 0),
tis_settings.get('target_freq', 0),
0]
# Mirror and target swap only makes sense for ensemble 000
if path_ensemble.ensemble_number != 0:
weights[moves.index('mirror')] = 0
weights[moves.index('targetswap')] = 0
# Add sh weight (make sure it is non-negative)
weights[-1] = max(1-sum(weights), 0)
if sum(weights) > 1:
msg = "move probabilities sum to more than 1. Got the following "
msg += f"probabilities: {list(zip(moves, weights))}"
raise ValueError(msg)
choice = rgen.choice(moves, 1, p=weights)[0]
if choice == 'tr':
logger.info('Performing a time reversal move')
accept, new_path, status = time_reversal(last_path, order_function,
interfaces, start_cond)
elif choice == 'sh':
logger.info('Selecting the shooting move.')
accept, new_path, status = select_shoot(ensemble, tis_settings,
start_cond)
elif choice == 'mirror':
logger.info("Selecting the mirror move")
accept, new_path, status = mirror(ensemble)
elif choice == 'targetswap':
logger.info("Selecting the target swap move")
accept, new_path, status = target_swap(ensemble, tis_settings)
else:
raise RuntimeError(f"Illegal move choice: {choice}")
return accept, new_path, status
[docs]def select_shoot(ensemble, tis_settings, start_cond):
"""Select the shooting move to generate a new path.
The new path will be generated from the input path, either by
performing a normal shooting or web-throwing. This is
determined pseudo-randomly by drawing a random number from a
uniform distribution using the given random generator.
Parameters
----------
ensemble : dictionary of objects
It contains:
* `path_ensemble`: object like :py:class:`.PathEnsemble`
This is the path ensemble to perform the TIS step for.
* `path`: object like :py:class:`.PathBase`
This is the input path which will be used for generating a
new path.
* `system`: object like :py:class:`.System`
System is used here since we need access to the temperature
and to the particle list.
* `order_function`: object like :py:class:`.OrderParameter`
The class used for obtaining the order parameter(s).
* `engine`: object like :py:class:`.EngineBase`
The engine to use for propagating a path.
* `rgen`: object like :py:class:`.RandomGenerator`
This is the random generator that will be used.
* `interfaces`: list of floats
These are the interface positions on form
[left, middle, right].
tis_settings : dict
This dictionary contains the settings for the TIS method. Here we
explicitly use:
* `freq`: float, the frequency of how often we should do time
reversal moves.
* `shooting_move`: string, the label of the shooting move to perform.
start_cond : string
The starting condition for the path. This is determined by the
ensemble we are generating for - it is 'R'ight or 'L'eft.
Returns
-------
out[0] : boolean
True if the new path can be accepted.
out[1] : object like :py:class:`.PathBase`
The generated path.
out[2] : string
The status of the path.
"""
shooting_move = tis_settings.get('shooting_move', 'sh')
if shooting_move == 'wt':
logger.info('Performing a Web Throwing move')
accept, new_path, status = web_throwing(ensemble, tis_settings)
elif shooting_move == 'ss':
logger.info('Performing a Stone Skipping move')
accept, new_path, status = stone_skipping(ensemble, tis_settings,
start_cond)
elif shooting_move == 'wf':
logger.info('Performing a Wire Fencing move')
accept, new_path, status = wire_fencing(ensemble, tis_settings,
start_cond)
elif shooting_move == '00':
logger.info('Performing a null move')
accept, new_path, status = null_move(ensemble, 0)
else:
logger.info('Performing a shooting move')
accept, new_path, status = shoot(ensemble, tis_settings, start_cond)
return accept, new_path, status
def mirror(ensemble):
"""Perform a mirror move.
This function performs a mirror move by mirroring the order function in
place. It then calculates the new order parameter for each phase point in
the path. The generated path is then returned. If the path is an
initialization path (a move in :py:data:`.INITIAL_MOVES`), the newly
generated path keeps that same initialization flag.
This move is always acceptable, so the status is set to 'ACC', and the
bool accept is set to True.
Parameters
----------
ensemble : dictionary of objects
Returns
-------
out[0] : boolean
True if the path can be accepted.
out[1] : object like :py:class:`.PathBase`
The generated path.
out[2] : string
Status of the path,
"""
order_function = ensemble['order_function']
# This also mirrors the ensemble['of'] inplace
order_function.mirror()
path = ensemble['path_ensemble'].last_path
new_path = path.copy()
# Calculate the new orderparameter
for phasepoint in new_path.phasepoints:
phasepoint.order = order_function.calculate(phasepoint)
# A mirror move does not regenerate the dynamics, so an initialization
# path stays an initialization path (it keeps its flag).
if path.get_move() in INITIAL_MOVES:
new_path.set_move(path.get_move())
else:
new_path.generated = ('mr', 0, 0, 0)
# This is an always accept move
accept = True
status = 'ACC'
new_path.status = status
return accept, new_path, status
def get_target_swap_choices(allowed_indices, path, ensemble):
"""Get the valid choices for the target swap move.
Parameters
----------
allowed_indices : list of integers
The allowed indices for the target swap.
path : object like :py:class:`.PathBase`
The path to use for the target swap.
ensemble : dictionary of objects
The ensemble to use for the target swap.
Returns
-------
choices : list of tuples
The choices for the target swap.
tuple = (index, frame)
"""
order_function = ensemble['order_function']
left, _, right = tuple(ensemble['interfaces'])
choices = []
old_idx = order_function.index
for idx in allowed_indices:
# swap out index on the order function
order_function.index = idx
for frame, phasepoint in enumerate(path.phasepoints):
if left <= order_function.calculate(phasepoint)[0] < right:
choices.append((idx, frame))
order_function.index = old_idx
return choices
def target_swap_backward(choice, choices, old_path, new_path, ensemble):
"""Perform the backward part of the target swap move.
This function loops backward from a selected frame (the "trial point")
until it detects a frame where the new target has a position outside a
certain interval. The number of steps taken in this process is recorded as
`n_n`. If there are not enough frames, it propagates backwards to
generate more.
Parameters
----------
choice : tuple
The choice to use for the target swap.
tuple = (index, frame)
choices : list of tuples
The choices for the target swap.
tuple = (index, frame)
old_path : object like :py:class:`.PathBase`
The path to use for the target swap.
new_path : object like :py:class:`.PathBase`
The new path to use for the target swap.
ensemble : dictionary of objects
The ensemble to use for the target swap.
Returns
-------
n_n : integer
The number of valid frames in the new path where a swap could
potentially occur.
n_o : integer
The number of valid frames in the old path where a swap could
potentially occur.
"""
order_function = ensemble['order_function']
engine = ensemble['engine']
old_idx = order_function.index
order_function.index = choice[0]
n_n = 0
first_frame = choice[1]
n_o = 0
# First loop back:
for i in range(choice[1], -1, -1):
if (choice[0], i) not in choices:
break
n_n += 1
first_frame = i
# Prevent double frames if we need to extend
if i != 0:
phasepoint = old_path.phasepoints[i].copy()
phasepoint.order = order_function.calculate(phasepoint)
new_path.append(phasepoint)
if first_frame != 0:
# We know we can append at least 1 more frame that would end up in a
# state
extra_frame = first_frame - 1
logger.debug("Found path starting at frame %s, skipping backward MD",
extra_frame)
phasepoint = old_path.phasepoints[extra_frame].copy()
phasepoint.order = order_function.calculate(phasepoint)
new_path.append(phasepoint)
# Not all old_points are valid
if extra_frame != 0:
n_o -= extra_frame-1
logger.debug("n_old updated from %s to %s",
n_o+(extra_frame-1), n_o)
else:
logger.debug("Need more frames, propagating backwards")
ensemble['system'] = old_path.phasepoints[0].copy()
engine.order_function = ensemble['order_function']
engine.propagate(new_path, ensemble, ensemble['system'], reverse=True)
order_function.idx = old_idx
return n_n, n_o
def target_swap_forward(choice, choices, old_path, new_path, ensemble):
"""Perform the forward part of the target swap move.
This function loops forward from a selected frame (the "trial point") until
it detects a frame where the new target has a position outside a certain
interval. The number of steps taken in this process is recorded as `n_n`.
If there are not enough frames, it propagates forwards to generate more.
Parameters
----------
choice : tuple
The choice to use for the target swap.
tuple = (index, frame)
choices : list of tuples
The choices for the target swap.
tuple = (index, frame)
old_path : object like :py:class:`.PathBase`
The path to use for the target swap.
new_path : object like :py:class:`.PathBase`
The new path to use for the target swap.
ensemble : dictionary of objects
The ensemble to use for the target swap.
Returns
-------
n_n : integer
The number of valid frames in the new path where a swap could
potentially occur.
n_o : integer
The number of valid frames in the old path where a swap could
potentially occur.
"""
order_function = ensemble['order_function']
engine = ensemble['engine']
old_idx = order_function.index
order_function.index = choice[0]
n_n = 0
last_frame = choice[1]
n_o = 0
for i in range(choice[1]+1, old_path.length, 1):
if (choice[0], i) not in choices:
break
n_n += 1
last_frame = i
# Prevent last point if we need to extend
if i != old_path.length-1:
phasepoint = old_path.phasepoints[i].copy()
phasepoint.order = order_function.calculate(phasepoint)
new_path.append(phasepoint)
if last_frame < old_path.length-1:
# We know we can append at least one frame that will result in a valid
# path
extra_frame = last_frame + 1
logger.debug("Found path ending at frame %s, skipping forward MD",
extra_frame)
phasepoint = old_path.phasepoints[extra_frame].copy()
phasepoint.order = order_function.calculate(phasepoint)
new_path.append(phasepoint)
# Not all old_points are valid
# -2 because we already dismissed the last frame
# Need to catch when we append the last frame here as we don't want to
# add that one
if extra_frame != old_path.length-1:
n_o -= (old_path.length-2)-extra_frame
logger.debug("n_old updated from %s to %s",
n_o+((old_path.length-2)-extra_frame), n_o)
else:
logger.debug("Need more frames, propagating forwards")
ensemble['system'] = old_path.phasepoints[-1].copy()
engine.order_function = ensemble['order_function']
engine.propagate(new_path, ensemble, ensemble['system'], reverse=False)
order_function.index = old_idx
return n_n, n_o
def target_swap(ensemble, tis_settings):
"""Perform a target swap move.
This function attempts a target swap move starting from a given index to
allowed indices. It finds a trial point from which a new path is generated.
It records Z(o->n) and Z(n->o), which are counts of valid frames where a
target swap could potentially occur (from old to new, and from new to old,
respectively). These are used to calculate the acceptance probability.
Once a trial point is chosen, it loops forwards and backwards in the
trajectory of the old path (along the new choice index) to construct a new
path. This may include MD integration if necessary. If there is no valid
swap, it returns with status 'TSS'.
Parameters
----------
ensemble : dictionary of objects
It contains:
* `path_ensemble`: object like :py:class:`.PathEnsemble`
This is the path ensemble to perform the target swap move for.
* `order_function`: object like :py:class:`.OrderParameter`
The class used for obtaining the order parameter(s).
tis_settings : dict
This dictionary contains the settings for the TIS method. Here we
explicitly use:
* `freq`: float, the frequency of how often we should do time
reversal moves.
* `shooting_move`: string, the label of the shooting move to perform.
Returns
-------
tuple : A tuple containing:
- accept (bool): Whether the move was accepted or not.
- new_path (object like :py:class:`.PathBase`): The new path.
- status (str): The status of the move.
Can be 'ACC', 'TSS', 'BTX', or 'FTX'
Raises
------
ValueError : If the current order parameter index is not part of the target
indices.
"""
# Grab required variables
target_indices = tis_settings['target_indices']
order_f = ensemble['order_function']
old_index = order_f.index
if old_index not in target_indices:
raise ValueError(f"Current orderparameter index: {old_index}, is not "
f"part of the target indices: {target_indices}.")
old_path = ensemble['path_ensemble'].last_path
maxlen = tis_settings['maxlength']
new_path = old_path.empty_path(maxlen=maxlen)
rgen = ensemble['rgen']
allowed_indices = [i for i in target_indices if i != old_index]
logger.debug("Attempt a target swap starting from index %s "
"to the allowed indices: %s",
old_index, allowed_indices)
# 2 Find trial point
choices = get_target_swap_choices(allowed_indices=allowed_indices,
path=old_path,
ensemble=ensemble)
# If there is no valid swap
if len(choices) == 0:
logger.info("Found no valid swapping frames")
accept = False
status = 'TSS'
new_path = old_path.copy()
new_path.generated = ('ts', 0, 0, 0)
new_path.status = status
return accept, new_path, status
choice_idx = rgen.choice(len(choices), 1)[0]
choice = choices[choice_idx]
logger.debug("Choose %s out of %s.", choice, choices)
logger.info("Attempting target swap from index %s to index %s",
old_index, choice[0])
# Record Z(o->n)
z_o_n = len(choices)
# Figure out n_o
n_n = 0
# default n_o is the old path min the 2 frames in the states
n_o = old_path.length-2
# First loop backwards
diff_n_n, diff_n_o = target_swap_backward(choice=choice,
choices=choices,
old_path=old_path,
new_path=new_path,
ensemble=ensemble)
n_n += diff_n_n
n_o += diff_n_o
new_path = new_path.reverse()
# Needed because path.reverse does not clone attributes
new_path.maxlen = maxlen
if new_path.length >= maxlen:
order_f.index = old_index
logger.debug("Backward trajectory to long")
accept = False
status = 'BTX'
new_path.generated = ('ts', 0, 0, 0)
new_path.status = status
return accept, new_path, status
# Loop forward:
diff_n_n, diff_n_o = target_swap_forward(choice=choice,
choices=choices,
old_path=old_path,
new_path=new_path,
ensemble=ensemble)
n_n += diff_n_n
n_o += diff_n_o
if new_path.length >= maxlen:
order_f.index = old_index
logger.debug("Forward trajectory to long")
accept = False
status = 'FTX'
new_path.generated = ('ts', 0, 0, 0)
new_path.status = status
return accept, new_path, status
# Need to calculate Z(n->o)
allowed_indices = [i for i in target_indices if i != choice[0]]
new_choices = get_target_swap_choices(allowed_indices=allowed_indices,
path=new_path,
ensemble=ensemble)
z_n_o = len(new_choices)
logger.debug("Going to calculate acceptance\n"
"Old choices: %s\n"
"n_n: %s, Z_o_n: %s\n"
"New choices: %s\n"
"n_o: %s, Z_n_o: %s\n",
choices,
n_n, z_o_n,
new_choices,
n_o, z_n_o)
acc = min(1, n_o/n_n*z_o_n/z_n_o)
ran_num = rgen.rand()[0]
logger.info("prob = %s, random = %s", acc, ran_num)
if ran_num >= acc:
order_f.index = old_index
logger.debug("Failed acceptance")
accept = False
status = "TSA"
new_path.generated = ('ts', 0, 0, 0)
new_path.status = status
return accept, new_path, status
order_f.index = choice[0]
logger.debug("Accepted target swap")
accept = True
status = "ACC"
new_path.generated = ('ts', 0, 0, 0)
new_path.status = status
return accept, new_path, status
def one_step_crossing(ensemble, interface):
"""Create a path of one step and check the crossing with the interface.
This function will do a single step to try to cross the interface.
Note that a step might involve several substeps, depending on the
input file selection/sampling strategy.
This task is the Achilles` Heel of Stone Skipping. If not wisely
done, a large number of attempts to cross the interface
in one step will be done, destroying the sampling efficiency.
Parameters
----------
ensemble : dictionary of objects
It contains:
* `system`: object like :py:class:`.System`
System is used here since we need access to the temperature
and to the particle list.
* `order_function`: object like :py:class:`.OrderParameter`
The class used for obtaining the order parameter(s).
* `engine`: object like :py:class:`.EngineBase`
The engine to use for propagating a path.
* `rgen`: object like :py:class:`.RandomGenerator`
This is the random generator that will be used.
interface : float
This is the interface position to be crossed.
Returns
-------
out[0] : boolean
True if the path can be accepted.
out[1] : object like :py:class:`.PathBase`
Returns the generated path.
"""
# The trial path we need to generate. Note, 1 step = 2 points
trial_path = Path(rgen=ensemble['rgen'], maxlen=2)
interfaces = [-float("inf"), float("inf"), float("inf")]
sub_ensemble = {'interfaces': interfaces,
'system': ensemble['system'],
'order_function': ensemble['order_function']}
ensemble['engine'].order_function = sub_ensemble['order_function']
ensemble['engine'].propagate(
trial_path, sub_ensemble, sub_ensemble['system'])
if crossing_counter(trial_path, interface) == 0:
return False, trial_path
return True, trial_path
[docs]def wire_fencing(ensemble, tis_settings, start_cond):
"""Perform a wire_fencing move.
This function will perform the non-famous wire fencing move
from an initial path.
Parameters
----------
ensemble: dict
It contains:
* `path_ensemble`: object like :py:class:`.PathEnsemble`
This is the path ensemble to perform the TIS step for.
* `system`: object like :py:class:`.System`
System is used here since we need access to the temperature
and to the particle list.
* `order_function`: object like :py:class:`.OrderParameter`
The class used for obtaining the order parameter(s).
* `engine`: object like :py:class:`.EngineBase`
The engine to use for propagating a path.
* `rgen`: object like :py:class:`.RandomGenerator`
This is the random generator that will be used.
* `interfaces`: list of floats
These are the interface positions on form
[left, middle, right].
tis_settings : dict
This contains the settings for TIS. Keys used here:
* `sigma_v`: velocity width. Negative values select aimless shooting.
* `allowmaxlength`: boolean, should paths be allowed to reach
maximum length?
* `maxlength`: integer, maximum allowed length of paths.
* `high_accept`: boolean, the option for High Acceptance WF.
start_cond : string
The starting condition for the current ensemble, 'L'eft or
'R'ight.
Returns
-------
out[0] : boolean
True if the path can be accepted.
out[1] : object like :py:class:`.PathBase`
Returns the generated path.
out[2] : string
Status of the path, this is one of the strings defined in
:py:const:`.path._STATUS`.
"""
trial_path = ensemble['path_ensemble'].last_path
engine = ensemble['engine']
intf_cap = tis_settings.get('interface_cap', ensemble['interfaces'][2])
# Wire fencing needs middle != cap interface. For the [0^-] ensemble
# middle == right == lambda_0, making WF inapplicable.
if ensemble['interfaces'][1] == intf_cap:
logger.info('Wire fencing not applicable (middle == cap '
'interface = %s). Falling back to standard shooting.',
ensemble['interfaces'][1])
return shoot(ensemble, tis_settings, start_cond)
wf_int = [ensemble['interfaces'][1], ensemble['interfaces'][1], intf_cap]
n_frames, new_segment = wirefence_weight_and_pick(trial_path, wf_int[0],
wf_int[2],
return_seg=True)
if n_frames == 0:
logger.warning('Wire fencing move not usable. N frames of Path = 0 '
'between interfaces %s and %s.', wf_int[0], wf_int[-1])
return False, trial_path, 'NSG'
sub_ens = {'interfaces': wf_int, 'engine': engine,
'rgen': ensemble['rgen'],
'order_function': ensemble['order_function'],
'path_ensemble': ensemble['path_ensemble']}
sub_settings = tis_settings.copy()
sub_settings['allowmaxlength'] = True
succ_seg = 0
for i in range(tis_settings['n_jumps']):
logger.debug('Trying a new web with Wire Fencing, jump %i', i)
# Select the shooting point:
sh_pt, _, _ = prepare_shooting_point(new_segment, sub_ens,
sub_settings)
engine.dump_phasepoint(sh_pt, str(counter()) + '_wf_shoot')
success, trial_seg, status = shoot(sub_ens,
sub_settings,
('L', 'R'),
sh_pt)
start, end, _, _ = trial_seg.check_interfaces(wf_int)
logger.info('Jump %s, len %s, status %s, intf: %s %s',
i, trial_seg.length, status, start, end)
if not success:
# This handles R to R (start_cond = L) paths. Counter + 1, no ups.
logger.debug('Wire Fencing Fail.')
else:
logger.debug('Acceptable Wire Fence link.')
succ_seg += 1
new_segment = trial_seg.copy()
if succ_seg == 0:
# No usable segments were generated.
trial_path.status = 'NSG'
success = False
else:
success, trial_path, _ = extender(new_segment, ensemble,
tis_settings, start_cond)
if success:
success, trial_path = ss_wt_wf_acceptance(trial_path, ensemble,
tis_settings,
start_cond)
trial_path.generated = ('wf', _order_value(sh_pt.order), succ_seg,
trial_path.length)
logger.debug('WF move %s', trial_path.status)
if not success:
return False, trial_path, trial_path.status
# This might get triggered when accepting 0-L paths.
left, _, right = ensemble['interfaces']
if start_cond != trial_path.get_start_point(left, right):
raise RuntimeError('WF: Path has an implausible start.')
trial_path.status = 'ACC'
return True, trial_path, trial_path.status
[docs]def ss_wt_wf_acceptance(trial_path, ensemble, tis_settings,
start_cond='L'):
"""Weights, possibly reverses and accept/rejects generated SS/WT/WF paths.
Parameters
----------
trial_path : object like :py:class:`.PathBase`
This is the new path that will obtain weights, and might be reversed
and accepted.
ensemble : dict
It contains:
* `path_ensemble`: object like :py:class:`.PathEnsemble`
This is the path ensemble to perform the TIS step for.
* `order_function`: object like :py:class:`.OrderParameter`
The class used for obtaining the order parameter(s).
* `rgen`: object like :py:class:`.RandomGenerator`
This is the random generator that will be used.
* `interfaces` : list/tuple of floats
These are the interface positions of the form
``[left, middle, right]``.
tis_settings : dict
This contains the settings for TIS. KEys used here;
* `high_accept` : boolean, the option for High Acceptance SS/WF.
* `shooting_move` : string, the label of the shooting move to perform.
start_cond : string, optional
The starting condition for the current ensemble, 'L'eft or 'R'ight.
Returns
-------
out[0] : boolean
True if the path can be accepted.
out[1] : object like :py:class:`.PathBase`
Returns the weighed and possibly reversed path.
"""
intf = list(ensemble['interfaces'])
move = tis_settings['shooting_move']
if move == 'wt' or not tis_settings.get('high_accept', False):
trial_path.weight = 1.
else:
if move == 'wf':
intf[2] = tis_settings.get('interface_cap', intf[2])
trial_path.weight = compute_weight(trial_path, intf, move)
if start_cond != trial_path.get_start_point(intf[0], intf[2]):
trial_path = trial_path.reverse(ensemble['order_function'])
success = ss_wt_wf_metropolis_acc(trial_path, ensemble, tis_settings,
start_cond)
return success, trial_path
[docs]def stone_skipping(ensemble, tis_settings, start_cond):
"""Perform a stone_skipping move.
This function will perform the famous stone skipping move
from an initial path.
Parameters
----------
ensemble: dict
It contains:
* `path_ensemble`: object like :py:class:`.PathEnsemble`
This is the path ensemble to perform the TIS step for.
* `path`: object like :py:class:`.PathBase`
This is the input path which will be used for generating a
new path.
* `system`: object like :py:class:`.System`
System is used here since we need access to the temperature
and to the particle list.
* `order_function`: object like :py:class:`.OrderParameter`
The class used for obtaining the order parameter(s).
* `engine`: object like :py:class:`.EngineBase`
The engine to use for propagating a path.
* `rgen`: object like :py:class:`.RandomGenerator`
This is the random generator that will be used.
* `interfaces`: list of floats
These are the interface positions on form
[left, middle, right].
tis_settings : dict
This contains the settings for TIS. Keys used here:
* `sigma_v`: velocity width. Negative values select aimless shooting.
* `allowmaxlength`: boolean, should paths be allowed to reach
maximum length?
* `maxlength`: integer, maximum allowed length of paths.
* `high_accept`: boolean, the option for High Acceptance SS.
start_cond : string
The starting condition for the current ensemble, 'L'eft or
'R'ight.
Returns
-------
out[0] : boolean
True if the path can be accepted.
out[1] : object like :py:class:`.PathBase`
Returns the generated path.
out[2] : string
Status of the path, this is one of the strings defined in
:py:const:`.path._STATUS`.
"""
path_old = ensemble['path_ensemble'].last_path
intf = ensemble['interfaces']
ph_pt1, ph_pt2 = crossing_finder(path_old, intf[1])
if ph_pt1 is None and ph_pt2 is None:
return False, path_old, 'NCR'
sub_ens = {'interfaces': [intf[1], intf[1], intf[2]],
'order_function': ensemble['order_function']}
osc_try = 0 # One step crossing attempt counter
success = False
for i in range(tis_settings['n_jumps']):
logger.debug('Trying a new stone skipping move, jump %i', i)
# Here we choose between the two
# possible shooting points that describe a crossing.
sh_pt = ph_pt1 if ensemble['rgen'].rand() >= 0.5 else ph_pt2
ensemble['engine'].dump_phasepoint(sh_pt, str(counter()) + '_ss_shoot')
# To continue, we must be sure that the new path
# CROSSES the interface in ONLY ONE step.
# Generate paths until it succeed. That is
# what makes this version of the SS move useless for large systems.
for j in range(tis_settings['maxlength']):
# This function can become actually fun to work on.
# e.g. have a 50% chance to give random v for each particle
# Modify the velocities:
# todo modify_v could just use system directly
logger.debug("jump %s, try %s, start: %s", i, j,
_order_value(sh_pt.order))
ensemble['system'] = sh_pt.copy()
ensemble['engine'].modify_velocities(ensemble['system'],
tis_settings,
ensemble['rgen'])
# A path of two frames is going to be generated.
success, path = one_step_crossing(ensemble, intf[1])
osc_try += 1
if osc_try > 5 * tis_settings['n_jumps'] and \
path_old.get_move() in INITIAL_MOVES:
logger.info('Performing a shooting move before the use of ss')
success, trial_path, status = shoot(ensemble,
tis_settings,
start_cond, sh_pt)
trial_path.set_move('is')
return success, trial_path, status
if success:
break
else: # In case we reached maxlength in jumps attempts.
success = False
path.status = 'NSS'
trial_path = path
break
# Depending on the shooting point (before or after the interface),
# a backward path or a continuation has to be generated.
new_segment = path.empty_path(maxlen=tis_settings['maxlength'] - 1)
if path.get_end_point(intf[1], intf[2]) == start_cond:
path = path.reverse(ensemble['order_function'])
sub_ens['system'] = path.phasepoints[1].copy()
ensemble['engine'].order_function = sub_ens['order_function']
success, _ = ensemble['engine'].propagate(new_segment, sub_ens,
sub_ens['system'])
new_segment.phasepoints.insert(0, path.phasepoints[0].copy())
if not success:
new_segment.status = 'XSS'
trial_path = new_segment
break
ph_pt1, ph_pt2 = crossing_finder(new_segment, intf[1], last_frame=True)
logger.debug('SS web: %s, one step crossing tries: %s', success, osc_try)
if success:
if ensemble['rgen'].rand() < 0.5:
new_segment = new_segment.reverse(ensemble['order_function'])
success, trial_path, _ = extender(new_segment, ensemble,
tis_settings, start_cond)
if success:
success, trial_path = ss_wt_wf_acceptance(trial_path, ensemble,
tis_settings,
start_cond)
trial_path.generated = ('ss', _order_value(sh_pt.order), osc_try,
trial_path.length)
logger.debug('SS move: %s', trial_path.status)
if not success:
return False, trial_path, trial_path.status
# This might get triggered when accepting 0-L paths.
if start_cond != trial_path.get_start_point(intf[0], intf[2]):
raise RuntimeError('SS: Path has an implausible start.')
trial_path.status = 'ACC'
return True, trial_path, trial_path.status
def ss_wt_wf_metropolis_acc(path_new, ensemble,
tis_settings, start_cond='L'):
"""Accept or reject the path_new.
Super detailed balance rule is used in the original version
and in the High Acceptance one for SS and WF.
In the regular version, P acc = min (1, Cold/Cnew), where
for Stone Skipping C is crossing, for Web Throwing C is segment,
for Wire Fencing C is number of phasepoint between ensemble and
right interface.
In the High Acceptance version, P acc = 1 for SS and Wf. It
also allows to accept paths that go from B to A, by reversing them.
NB. To respect super detailed balance, the weights have to be changed
accordingly. This is done elsewhere.
Parameters
----------
path_new : object like :py:class:`.PathBase`
This is the new path that might get accepted.
ensemble : dict
It contains:
* `path_ensemble`: object like :py:class:`.PathEnsemble`
This is the path ensemble to perform the TIS step for.
* `order_function`: object like :py:class:`.OrderParameter`
The class used for obtaining the order parameter(s).
* `rgen`: object like :py:class:`.RandomGenerator`
This is the random generator that will be used.
* `interfaces` : list/tuple of floats
These are the interface positions of the form
``[left, middle, right]``.
tis_settings : dict
This contains the settings for TIS. Keys used here:
* `high_accept` : boolean, the option for High Acceptance SS/WF.
start_cond : string, optional
The starting condition for the current ensemble, 'L'eft or
'R'ight.
Returns
-------
out[0] : boolean
True if the path can be accepted.
"""
interfaces = ensemble['interfaces']
path_old = ensemble['path_ensemble'].last_path
if tis_settings.get('shooting_move') == 'wt':
sour_int = tis_settings['interface_sour']
cr_old = segments_counter(path_old, sour_int, interfaces[1])
cr_new = segments_counter(path_new, sour_int, interfaces[1])
if ensemble['rgen'].rand() >= min(1.0, cr_old / cr_new):
path_new.status = 'WTA'
return False
else:
if not tis_settings.get('high_accept', False):
if tis_settings.get('shooting_move') == 'ss':
cr_old = crossing_counter(path_old, interfaces[1])
cr_new = crossing_counter(path_new, interfaces[1])
if ensemble['rgen'].rand() >= min(1.0, cr_old / cr_new):
path_new.status = 'SSA'
return False
elif tis_settings.get('shooting_move') == 'wf':
wf_cap = tis_settings.get('interface_cap', interfaces[2])
cr_old, _ = wirefence_weight_and_pick(path_old, interfaces[1],
wf_cap)
cr_new, _ = wirefence_weight_and_pick(path_new, interfaces[1],
wf_cap)
if ensemble['rgen'].rand() >= min(1.0, cr_old / cr_new):
path_new.status = 'WFA'
return False
if start_cond != path_new.get_start_point(interfaces[0], interfaces[2]):
path_new.status = 'BWI'
return False
path_new.status = 'ACC'
return True
[docs]def web_throwing(ensemble, tis_set, start_cond='L'):
"""Perform a web_throwing move.
This function performs the great web throwing move from an initial path.
Parameters
----------
ensemble : dict
It contains:
* `path_ensemble`: object like :py:class:`.PathEnsemble`
This is the path ensemble to perform the TIS step for.
* `system`: object like :py:class:`.System`
System is used here since we need access to the temperature
and to the particle list.
* `order_function`: object like :py:class:`.OrderParameter`
The class used for obtaining the order parameter(s).
* `engine`: object like :py:class:`.EngineBase`
The engine to use for propagating a path.
* `rgen`: object like :py:class:`.RandomGenerator`
This is the random generator that will be used.
* `interfaces`: list of floats
These are the interface positions on form
[left, middle, right].
tis_set : dict
This contains the settings for TIS. Keys used here:
* `sigma_v`: velocity width. Negative values select aimless shooting.
* `allowmaxlength`: boolean, should paths be allowed to reach
maximum length?
* `maxlength`: integer, maximum allowed length of paths.
start_cond : string, optional
The starting condition for the current ensemble, 'L'eft or
'R'ight.
Returns
-------
out[0] : boolean
True if the path can be accepted.
out[1] : object like :py:class:`.PathBase`
Returns the generated path.
out[2] : string
Status of the path, this is one of the strings defined in
:py:const:`.path._STATUS`.
"""
path_old = ensemble['path_ensemble'].last_path
interfaces = ensemble['interfaces']
sour = tis_set['interface_sour']
if not interfaces[0] < sour <= interfaces[1]:
raise ValueError('SOUR interface is not correctly positioned')
ccnt = segments_counter(path_old, sour, interfaces[1])
if ccnt == 0:
return False, path_old, 'NSG'
seg_i = int(ensemble['rgen'].rand()[0] * ccnt)
wt_int = [sour, sour, ensemble['interfaces'][1]]
source_seg = select_and_trim_a_segment(path_old, sour, wt_int[2], seg_i)
sub_ens = {'interfaces': wt_int,
'order_function': ensemble['order_function']}
shoots, save_acc = [0], 0
key = ensemble['rgen'].rand() >= 0.5 # Start from a random side
for _ in range(tis_set['n_jumps']):
if ensemble['rgen'].rand() >= 0.5:
shoots[-1] += 1 # One more on the Same side
else:
shoots.append(1) # A move in the other side
for n_virtual in shoots:
key = not key # Change side, key controls also path reverse
for _ in range(n_virtual):
if key:
pre_shooting_point = source_seg.phasepoints[-1]
shooting_point = source_seg.phasepoints[-2]
else:
pre_shooting_point = source_seg.phasepoints[0]
shooting_point = source_seg.phasepoints[1]
prefix = str(counter())
ensemble['engine'].dump_phasepoint(pre_shooting_point,
prefix + '_wt_pre_shoot')
ensemble['engine'].dump_phasepoint(shooting_point,
prefix + '_wt_shoot')
new_seg = path_old.empty_path(maxlen=tis_set['maxlength'])
new_seg.append(pre_shooting_point)
logger.debug('Trying a new web')
sub_ens['system'] = shooting_point.copy()
ensemble['engine'].order_function = sub_ens['order_function']
ensemble['engine'].propagate(new_seg, sub_ens,
sub_ens['system'], reverse=key)
start = new_seg.get_start_point(wt_int[0], wt_int[-1])
end = new_seg.get_end_point(wt_int[0], wt_int[-1])
logger.debug('WT web starts %s, ends %s, reverse %s',
start, end, key)
if segments_counter(new_seg, sour, wt_int[2], reverse=key) == 1:
logger.debug('Web successful')
source_seg = new_seg.reverse(ensemble['order_function'],
rev_v=False) if key else new_seg
source_seg.status = 'ACC'
save_acc += 1
break
logger.debug('WT segments accepted: %s', save_acc)
accept, trial_path, _ = extender(source_seg, ensemble, tis_set, start_cond)
trial_path.generated = ('wt',
_order_value(source_seg.phasepoints[1].order),
save_acc, trial_path.length)
# Also Check that we did not get a B to A or a B to B path.
if accept:
accept, trial_path = ss_wt_wf_acceptance(trial_path, ensemble, tis_set)
logger.debug('WT move: %s', trial_path.status)
# Set the path flags. If no segment was actually regenerated, an
# initialization path stays an initialization path (keeps its flag).
if accept and path_old.get_move() in INITIAL_MOVES and save_acc == 0:
trial_path.set_move(path_old.get_move())
return accept, trial_path, trial_path.status
[docs]def extender(source_seg, ensemble, tis_set, start_cond=('R', 'L')):
"""Extend a path to the given interfaces.
This function will perform the web throwing move from an initial path.
Parameters
----------
source_seg : object like :py:class:`.PathBase`
This is the input path which will be prolonged.
ensemble : dictionary of objects
It contains:
* `order_function`: object like :py:class:`.OrderParameter`
The class used for obtaining the order parameter(s).
* `engine`: object like :py:class:`.EngineBase`
The engine to use for propagating a path.
* `system`: object like :py:class:`.System`
System is used here since we need access to the temperature
and to the particle list.
* `interfaces`: list of floats
These are the interface positions on form
[left, middle, right].
tis_set : dict
This contains the settings for TIS. Keys used here:
* `sigma_v`: velocity width. Negative values select aimless shooting.
* `allowmaxlength`: boolean, should paths be allowed to reach
maximum length?
* `maxlength`: integer, maximum allowed length of paths.
* `high_accept`: boolean, the option for High Acceptance WF and SS.
start_cond : string or tuple of strings, optional
The starting condition for the current ensemble, 'L'eft or
'R'ight. ('L', 'R'), the default option, implies no directional
difference.
Returns
-------
out[0] : boolean
True if the path can be accepted.
out[1] : object like :py:class:`.PathBase`
Returns the generated path.
out[2] : string
Status of the path, this is one of the strings defined in
:py:const:`.path._STATUS`.
"""
interfaces = ensemble['interfaces']
ensemble['system'] = source_seg.phasepoints[0].copy()
maxlength = tis_set['maxlength']
# Extender
if interfaces[0] <= _order_value(ensemble['system'].order) < \
interfaces[-1]:
# Allocate ``back_segment`` only as long as the budget the source
# segment has not already consumed, so the merged path cannot
# exceed ``maxlength``. Clamp to >=1 for very long source segments.
back_budget = max(maxlength - source_seg.length + 1, 1)
back_segment = source_seg.empty_path(maxlen=back_budget)
logger.debug('Trying to extend backwards')
source_seg_copy = source_seg.copy()
backward_ok = shoot_backwards(back_segment, source_seg_copy, ensemble,
tis_set, start_cond)
if not backward_ok and not tis_set.get('high_accept', False):
# Metropolis acceptance (native-only) rejects a failed
# backward extension outright, keeping its own status.
return False, source_seg_copy, source_seg_copy.status
trial_path = paste_paths(back_segment, source_seg, overlap=True,
maxlen=maxlength)
# Canonical (high-acceptance) labelling, matching the
# ``_tis_inf.extender``: a backward extension that exhausts the
# length budget is BTX (backward too long), not the catch-all FTX.
if not backward_ok and trial_path.length >= trial_path.maxlen:
trial_path.status = 'BTX'
return False, trial_path, trial_path.status
else:
trial_path = source_seg.copy()
ensemble['system'] = trial_path.phasepoints[-1].copy()
if interfaces[0] <= _order_value(ensemble['system'].order) < \
interfaces[-1]:
# Same accounting trick going forward.
forth_budget = max(maxlength - trial_path.length + 1, 1)
forth_segment = source_seg.empty_path(maxlen=forth_budget)
ensemble['engine'].order_function = ensemble['order_function']
success_forw, _ = ensemble['engine'].propagate(
forth_segment, ensemble, ensemble['system'])
trial_path.phasepoints = trial_path.phasepoints[:-1] + \
forth_segment.phasepoints
# Forward propagation failure surfaces as FTX even before the
# final length check, as in the canonical ``_tis_inf.extender``,
# which captures the engine return and rejects explicitly.
# Without this an engine-side failure would be silently
# masked whenever the accumulated trial happens to sit below
# ``maxlength``.
if not success_forw:
trial_path.status = 'FTX'
return False, trial_path, trial_path.status
# Reject only paths that *exceed* ``maxlength`` (strict ``>``). A
# path landing exactly at ``maxlength`` after a *successful* forward
# propagation completed by crossing an interface and is valid -- the
# previous ``>=`` check falsely rejected such complete paths as FTX,
# diverging from the canonical ``_tis_inf.extender`` (which has no
# catch-all at all). In real flow ``paste_paths`` and the forward
# budget cap the merged path at ``maxlength``, so this strict guard
# never fires on a reachable state; it only protects against an
# over-length source segment that cannot arise from the bounded
# WF/SS shoots.
if trial_path.length > maxlength:
trial_path.status = 'FTX' # exceeds "memory".
return False, trial_path, trial_path.status
trial_path.status = 'ACC'
return True, trial_path, trial_path.status