# Copyright (c) 2026, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Local and global crossing probabilities for partial path TIS (REPPTIS).
A REPPTIS run samples short partial paths, each classified by where it
enters and leaves the ensemble: ``LML`` / ``LMR`` / ``RML`` / ``RMR``
(Left/Right entry, Middle, Left/Right exit). From the
weighted counts of these path types this module computes:
* the **local crossing probabilities** of one ensemble
(:py:func:`local_crossing_probabilities`) -- the per-ensemble
``p_mm`` (LML) / ``p_mp`` (LMR) / ``p_pm`` (RML) / ``p_pp`` (RMR)
that feed the REPPTIS Markov state model
(:py:mod:`pyretis.analysis.repptis_msm`), and
* the **global crossing probability** across all ensembles
(:py:func:`global_crossing_probabilities_from_local`) via the
closed-form recursion, an alternative to the MSM that does not need
the transition-time vector.
This is a faithful port of the local/global crossing-probability part
of the ``tistools`` ``repptis_analysis`` module (the analysis of
Vervust, Wils, Safaei, Zhang and Ghysels, J. Chem. Theory Comput.
2026); the path-type weighting is preserved, only adapted to consume
the ``lmrs`` / ``weights`` arrays that
:py:func:`pyretis.analysis.path_analysis.analyse_repptis_ensemble`
already produces, rather than the ``tistools`` reader objects.
References
----------
.. [vervust2026loc] W. Vervust, E. Wils, S. Safaei, D. T. Zhang and
A. Ghysels, "Estimating full path lengths and kinetics from partial
path transition interface sampling simulations", J. Chem. Theory
Comput. (2026), https://doi.org/10.1021/acs.jctc.5c01498.
"""
import logging
import numpy as np
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())
__all__ = [
'local_crossing_probabilities',
'global_crossing_probabilities_from_local',
'tau_before_middle',
'tau_after_middle',
'tau_total',
'path_type_taus',
]
#: The four REPPTIS partial-path types, by (entry, middle, exit) side.
PATH_TYPES = ('RMR', 'RML', 'LMR', 'LML')
[docs]def local_crossing_probabilities(lmrs, weights, tr=False):
"""Return the local crossing probabilities of one REPPTIS ensemble.
The accepted partial paths of an ``[i^+-]`` / ``[0^+-']`` ensemble
are classified by type (``RMR`` / ``RML`` / ``LMR`` / ``LML``) and
weighted by their Monte-Carlo weights. The local crossing
probability of a type is its weight divided by the total weight of
paths entering from the **same** side: paths entering from the left
(``LMR`` + ``LML``) and from the right (``RMR`` + ``RML``) are
normalised separately. This routine is **not** for the ``[0^-']``
ensemble.
Parameters
----------
lmrs : sequence of string
One ``"LMR"``-style type tag per accepted path (the joined
left/middle/right interface flags). Tags that are not one of
:py:data:`PATH_TYPES` (partial ``*`` tags) contribute zero
weight.
weights : sequence of int or float
The Monte-Carlo weight of each accepted path (same length and
order as ``lmrs``).
tr : boolean, optional
If True, apply infinite time-reversal reweighting: double the
``RMR`` and ``LML`` weights and symmetrise ``RML`` / ``LMR``.
Default is False.
Returns
-------
p : dict
Local crossing probabilities keyed by path type (``RMR`` /
``RML`` / ``LMR`` / ``LML``), plus the milestoning arrival
probabilities ``2R`` / ``2L``. A probability is ``nan`` when its
normalising weight is zero.
"""
lmrs = np.asarray(lmrs)
weights = np.asarray(weights, dtype=float)
if lmrs.shape[0] != weights.shape[0]:
raise ValueError(
"lmrs and weights must have the same length "
f"({lmrs.shape[0]} != {weights.shape[0]})."
)
# Total weight of each path type.
w_path = {}
for pathtype in PATH_TYPES:
if weights.size == 0:
w_path[pathtype] = 0.0
else:
w_path[pathtype] = float(np.sum(weights[lmrs == pathtype]))
if tr: # infinite time-reversal reweighting
w_path['RMR'] = 2 * w_path['RMR']
w_path['LML'] = 2 * w_path['LML']
symmetric = w_path['RML'] + w_path['LMR']
w_path['RML'] = symmetric
w_path['LMR'] = symmetric
# Normalise by the weight of paths entering from the same side.
w_right = w_path['RMR'] + w_path['RML']
w_left = w_path['LMR'] + w_path['LML']
p = {}
for pathtype, norm in zip(PATH_TYPES, (w_right, w_right,
w_left, w_left)):
p[pathtype] = w_path[pathtype] / norm if norm != 0 else np.nan
# Milestoning: probability of arriving at the right / left interface.
w_to_right = w_path['LMR'] + w_path['RMR']
w_to_left = w_path['LML'] + w_path['RML']
w_total = w_to_right + w_to_left
p['2R'] = w_to_right / w_total if w_total != 0 else np.nan
p['2L'] = w_to_left / w_total if w_total != 0 else np.nan
return p
[docs]def global_crossing_probabilities_from_local(
p_minus_plus, p_minus_minus, p_plus_plus, p_plus_minus):
r"""Return global crossing probabilities from the local ones.
Combine the per-ensemble local crossing probabilities into the
global crossing probabilities using the closed-form REPPTIS
recursion
.. math::
P^+_j = \frac{p^{LMR}_{j-1} P^+_{j-1}}
{p^{LMR}_{j-1} + p^{LML}_{j-1} P^-_{j-1}}, \quad
P^-_j = \frac{p^{RML}_{j-1} P^-_{j-1}}
{p^{LMR}_{j-1} + p^{LML}_{j-1} P^-_{j-1}},
with :math:`P^+_0 = P^-_0 = 1`. ``P_cross`` is the corresponding
TIS probability of crossing interface ``i + 1``.
Parameters
----------
p_minus_plus : sequence of float
Per-ensemble local probability of type ``LMR`` (forward
crossing).
p_minus_minus : sequence of float
Per-ensemble local probability of type ``LML`` (return left).
p_plus_plus : sequence of float
Per-ensemble local probability of type ``RMR`` (return right).
p_plus_minus : sequence of float
Per-ensemble local probability of type ``RML`` (cross left).
Returns
-------
p_min : list of float
Per ensemble ``i``, the probability of reaching state A before
interface ``i + 1`` given a crossing of ``i``.
p_plus : list of float
Per ensemble ``i``, the probability of reaching interface
``i + 1`` before A given a crossing of ``i``.
p_cross : list of float
Per ensemble ``i``, the TIS probability of crossing ``i + 1``.
Notes
-----
If any input contains ``nan``, or a denominator vanishes, the
function returns ``[nan, nan, nan]`` -- matching the upstream
behaviour, so a degenerate ensemble does not silently produce a
spurious finite rate.
"""
if (np.isnan(p_minus_plus).any() or np.isnan(p_minus_minus).any()
or np.isnan(p_plus_plus).any() or np.isnan(p_plus_minus).any()):
return [np.nan, np.nan, np.nan]
p_plus = [1.0]
p_min = [1.0]
p_cross = [1.0, p_minus_plus[0]]
for i, (pmp, pmm, _, ppm) in enumerate(zip(
p_minus_plus, p_minus_minus, p_plus_plus, p_plus_minus)):
if i == 0: # the first ensemble is an initial condition
continue
denominator = pmp + pmm * p_min[-1]
if denominator == 0 or np.isnan(pmp) or np.isnan(pmm * p_min[-1]):
return [np.nan, np.nan, np.nan]
p_plus.append((pmp * p_plus[-1]) / denominator)
p_min.append((ppm * p_min[-1]) / denominator)
p_cross.append(p_minus_plus[0] * p_plus[-1])
return p_min, p_plus, p_cross
[docs]def tau_before_middle(orders, ptype, interfaces):
"""Return the steps a path takes to first cross the middle interface.
For a left-entering path (``LMR`` / ``LML``) this is the number of
steps between the first crossing of the left interface
``interfaces[0]`` and the first crossing of the middle interface
``interfaces[1]``; for a right-entering path (``RML`` / ``RMR``) it
is measured from the right interface ``interfaces[2]`` inward.
Parameters
----------
orders : numpy.ndarray
The per-phase-point order parameters of the path; column 0 is
the main order parameter.
ptype : string
The path type (``LMR`` / ``LML`` / ``RML`` / ``RMR``).
interfaces : sequence of float
The ensemble's ``(left, middle, right)`` interfaces.
Returns
-------
int
The number of steps before the middle interface is crossed.
Raises
------
ValueError
If the path type is not recognised.
"""
orders = np.asarray(orders)
if ptype in ('LMR', 'LML'):
first = np.where(orders[:, 0] >= interfaces[0])[0][0]
middle = np.where(orders[:, 0] >= interfaces[1])[0][0]
return middle - first
if ptype in ('RML', 'RMR'):
first = np.where(orders[:, 0] <= interfaces[2])[0][0]
middle = np.where(orders[:, 0] <= interfaces[1])[0][0]
return middle - first
raise ValueError(f"Unknown path type {ptype}")
[docs]def tau_after_middle(orders, ptype, interfaces):
"""Return the steps a path takes after its last middle crossing.
The mirror of :py:func:`tau_before_middle`, measured from the end of
the path inward: for a path that exits left (``LML`` / ``RML``) from
the left interface, for a path that exits right (``LMR`` / ``RMR``)
from the right interface.
Parameters
----------
orders : numpy.ndarray
The per-phase-point order parameters of the path.
ptype : string
The path type (``LMR`` / ``LML`` / ``RML`` / ``RMR``).
interfaces : sequence of float
The ensemble's ``(left, middle, right)`` interfaces.
Returns
-------
int
The number of steps after the last middle crossing.
Raises
------
ValueError
If the path type is not recognised.
"""
orders = np.asarray(orders)
reverse = orders[::-1, 0]
if ptype in ('LML', 'RML'):
last = np.where(reverse >= interfaces[0])[0][0]
middle = np.where(reverse >= interfaces[1])[0][0]
return middle - last
if ptype in ('LMR', 'RMR'):
last = np.where(reverse <= interfaces[2])[0][0]
middle = np.where(reverse <= interfaces[1])[0][0]
return middle - last
raise ValueError(f"Unknown path type {ptype}")
[docs]def tau_total(orders, ptype, interfaces):
"""Return the path length between its first and last boundary crossing.
The total time the path spends inside the ensemble, excluding the
leading segment before the entry crossing and the trailing segment
after the exit crossing. Equals ``tau_before_middle`` +
``tau_after_middle`` + the middle (recrossing) time.
Parameters
----------
orders : numpy.ndarray
The per-phase-point order parameters of the path.
ptype : string
The path type (``LMR`` / ``LML`` / ``RML`` / ``RMR``).
interfaces : sequence of float
The ensemble's ``(left, middle, right)`` interfaces.
Returns
-------
int
The number of interior steps of the path.
Raises
------
ValueError
If the path type is not recognised.
"""
orders = np.asarray(orders)
if ptype in ('LMR', 'LML'):
start = np.where(orders[:, 0] >= interfaces[0])[0][0]
elif ptype in ('RML', 'RMR'):
start = np.where(orders[:, 0] <= interfaces[2])[0][0]
else:
raise ValueError(f"Unknown path type {ptype}")
reverse = orders[::-1, 0]
if ptype in ('LML', 'RML'):
end = np.where(reverse >= interfaces[0])[0][0]
elif ptype in ('LMR', 'RMR'):
end = np.where(reverse <= interfaces[2])[0][0]
else:
raise ValueError(f"Unknown path type {ptype}")
return len(orders) - start - end
[docs]def path_type_taus(orders_list, lmrs, weights, interfaces):
"""Return the weighted-average transition times per path type.
For every accepted path the total interior time
(:py:func:`tau_total`), the time before the middle crossing
(:py:func:`tau_before_middle`) and the time after it
(:py:func:`tau_after_middle`) are computed and weight-averaged per
path type. The middle (recrossing) time ``taum`` is the remainder
``tau - tau1 - tau2``. These per-type, per-ensemble averages feed the
REPPTIS MSM mean-first-passage-time vector
(:py:func:`pyretis.analysis.repptis_msm.construct_tau_vector`).
Parameters
----------
orders_list : sequence of numpy.ndarray
One order-parameter trajectory per accepted path.
lmrs : sequence of string
The path type of each accepted path (same length/order).
weights : sequence of int or float
The Monte-Carlo weight of each accepted path.
interfaces : sequence of float
The ensemble's ``(left, middle, right)`` interfaces.
Returns
-------
dict
Maps each path type (``LML`` / ``LMR`` / ``RML`` / ``RMR``) to a
dict with keys ``tau`` / ``tau1`` / ``tau2`` / ``taum`` (the
weighted averages, or ``nan`` when the type is unsampled).
"""
lmrs = np.asarray(lmrs)
weights = np.asarray(weights, dtype=float)
taus = {pathtype: {'tau': np.nan, 'tau1': np.nan, 'tau2': np.nan,
'taum': np.nan}
for pathtype in ('LML', 'LMR', 'RML', 'RMR')}
for pathtype in ('LML', 'LMR', 'RML', 'RMR'):
mask = lmrs == pathtype
total_weight = float(np.sum(weights[mask])) if mask.any() else 0.0
if total_weight == 0.0:
continue
tau_vals, tau1_vals, tau2_vals, type_weights = [], [], [], []
for keep, orders, weight in zip(mask, orders_list, weights):
if not keep:
continue
tau_vals.append(tau_total(orders, pathtype, interfaces))
tau1_vals.append(tau_before_middle(orders, pathtype, interfaces))
tau2_vals.append(tau_after_middle(orders, pathtype, interfaces))
type_weights.append(weight)
type_weights = np.asarray(type_weights, dtype=float)
tau = float(np.average(tau_vals, weights=type_weights))
tau1 = float(np.average(tau1_vals, weights=type_weights))
tau2 = float(np.average(tau2_vals, weights=type_weights))
taus[pathtype] = {'tau': tau, 'tau1': tau1, 'tau2': tau2,
'taum': tau - tau1 - tau2}
return taus