Source code for pyretis.analysis.repptis_analysis

# Copyright (c) 2026, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Local and global crossing probabilities for partial path TIS (REPPTIS).

A REPPTIS run samples short partial paths, each classified by where it
enters and leaves the ensemble: ``LML`` / ``LMR`` / ``RML`` / ``RMR``
(Left/Right entry, Middle, Left/Right exit). From the
weighted counts of these path types this module computes:

* the **local crossing probabilities** of one ensemble
  (:py:func:`local_crossing_probabilities`) -- the per-ensemble
  ``p_mm`` (LML) / ``p_mp`` (LMR) / ``p_pm`` (RML) / ``p_pp`` (RMR)
  that feed the REPPTIS Markov state model
  (:py:mod:`pyretis.analysis.repptis_msm`), and
* the **global crossing probability** across all ensembles
  (:py:func:`global_crossing_probabilities_from_local`) via the
  closed-form recursion, an alternative to the MSM that does not need
  the transition-time vector.

This is a faithful port of the local/global crossing-probability part
of the ``tistools`` ``repptis_analysis`` module (the analysis of
Vervust, Wils, Safaei, Zhang and Ghysels, J. Chem. Theory Comput.
2026); the path-type weighting is preserved, only adapted to consume
the ``lmrs`` / ``weights`` arrays that
:py:func:`pyretis.analysis.path_analysis.analyse_repptis_ensemble`
already produces, rather than the ``tistools`` reader objects.

References
----------
.. [vervust2026loc] W. Vervust, E. Wils, S. Safaei, D. T. Zhang and
   A. Ghysels, "Estimating full path lengths and kinetics from partial
   path transition interface sampling simulations", J. Chem. Theory
   Comput. (2026), https://doi.org/10.1021/acs.jctc.5c01498.

"""
import logging

import numpy as np

logger = logging.getLogger(__name__)  # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())

__all__ = [
    'local_crossing_probabilities',
    'global_crossing_probabilities_from_local',
    'tau_before_middle',
    'tau_after_middle',
    'tau_total',
    'path_type_taus',
]

#: The four REPPTIS partial-path types, by (entry, middle, exit) side.
PATH_TYPES = ('RMR', 'RML', 'LMR', 'LML')


[docs]def local_crossing_probabilities(lmrs, weights, tr=False): """Return the local crossing probabilities of one REPPTIS ensemble. The accepted partial paths of an ``[i^+-]`` / ``[0^+-']`` ensemble are classified by type (``RMR`` / ``RML`` / ``LMR`` / ``LML``) and weighted by their Monte-Carlo weights. The local crossing probability of a type is its weight divided by the total weight of paths entering from the **same** side: paths entering from the left (``LMR`` + ``LML``) and from the right (``RMR`` + ``RML``) are normalised separately. This routine is **not** for the ``[0^-']`` ensemble. Parameters ---------- lmrs : sequence of string One ``"LMR"``-style type tag per accepted path (the joined left/middle/right interface flags). Tags that are not one of :py:data:`PATH_TYPES` (partial ``*`` tags) contribute zero weight. weights : sequence of int or float The Monte-Carlo weight of each accepted path (same length and order as ``lmrs``). tr : boolean, optional If True, apply infinite time-reversal reweighting: double the ``RMR`` and ``LML`` weights and symmetrise ``RML`` / ``LMR``. Default is False. Returns ------- p : dict Local crossing probabilities keyed by path type (``RMR`` / ``RML`` / ``LMR`` / ``LML``), plus the milestoning arrival probabilities ``2R`` / ``2L``. A probability is ``nan`` when its normalising weight is zero. """ lmrs = np.asarray(lmrs) weights = np.asarray(weights, dtype=float) if lmrs.shape[0] != weights.shape[0]: raise ValueError( "lmrs and weights must have the same length " f"({lmrs.shape[0]} != {weights.shape[0]})." ) # Total weight of each path type. w_path = {} for pathtype in PATH_TYPES: if weights.size == 0: w_path[pathtype] = 0.0 else: w_path[pathtype] = float(np.sum(weights[lmrs == pathtype])) if tr: # infinite time-reversal reweighting w_path['RMR'] = 2 * w_path['RMR'] w_path['LML'] = 2 * w_path['LML'] symmetric = w_path['RML'] + w_path['LMR'] w_path['RML'] = symmetric w_path['LMR'] = symmetric # Normalise by the weight of paths entering from the same side. w_right = w_path['RMR'] + w_path['RML'] w_left = w_path['LMR'] + w_path['LML'] p = {} for pathtype, norm in zip(PATH_TYPES, (w_right, w_right, w_left, w_left)): p[pathtype] = w_path[pathtype] / norm if norm != 0 else np.nan # Milestoning: probability of arriving at the right / left interface. w_to_right = w_path['LMR'] + w_path['RMR'] w_to_left = w_path['LML'] + w_path['RML'] w_total = w_to_right + w_to_left p['2R'] = w_to_right / w_total if w_total != 0 else np.nan p['2L'] = w_to_left / w_total if w_total != 0 else np.nan return p
[docs]def global_crossing_probabilities_from_local( p_minus_plus, p_minus_minus, p_plus_plus, p_plus_minus): r"""Return global crossing probabilities from the local ones. Combine the per-ensemble local crossing probabilities into the global crossing probabilities using the closed-form REPPTIS recursion .. math:: P^+_j = \frac{p^{LMR}_{j-1} P^+_{j-1}} {p^{LMR}_{j-1} + p^{LML}_{j-1} P^-_{j-1}}, \quad P^-_j = \frac{p^{RML}_{j-1} P^-_{j-1}} {p^{LMR}_{j-1} + p^{LML}_{j-1} P^-_{j-1}}, with :math:`P^+_0 = P^-_0 = 1`. ``P_cross`` is the corresponding TIS probability of crossing interface ``i + 1``. Parameters ---------- p_minus_plus : sequence of float Per-ensemble local probability of type ``LMR`` (forward crossing). p_minus_minus : sequence of float Per-ensemble local probability of type ``LML`` (return left). p_plus_plus : sequence of float Per-ensemble local probability of type ``RMR`` (return right). p_plus_minus : sequence of float Per-ensemble local probability of type ``RML`` (cross left). Returns ------- p_min : list of float Per ensemble ``i``, the probability of reaching state A before interface ``i + 1`` given a crossing of ``i``. p_plus : list of float Per ensemble ``i``, the probability of reaching interface ``i + 1`` before A given a crossing of ``i``. p_cross : list of float Per ensemble ``i``, the TIS probability of crossing ``i + 1``. Notes ----- If any input contains ``nan``, or a denominator vanishes, the function returns ``[nan, nan, nan]`` -- matching the upstream behaviour, so a degenerate ensemble does not silently produce a spurious finite rate. """ if (np.isnan(p_minus_plus).any() or np.isnan(p_minus_minus).any() or np.isnan(p_plus_plus).any() or np.isnan(p_plus_minus).any()): return [np.nan, np.nan, np.nan] p_plus = [1.0] p_min = [1.0] p_cross = [1.0, p_minus_plus[0]] for i, (pmp, pmm, _, ppm) in enumerate(zip( p_minus_plus, p_minus_minus, p_plus_plus, p_plus_minus)): if i == 0: # the first ensemble is an initial condition continue denominator = pmp + pmm * p_min[-1] if denominator == 0 or np.isnan(pmp) or np.isnan(pmm * p_min[-1]): return [np.nan, np.nan, np.nan] p_plus.append((pmp * p_plus[-1]) / denominator) p_min.append((ppm * p_min[-1]) / denominator) p_cross.append(p_minus_plus[0] * p_plus[-1]) return p_min, p_plus, p_cross
[docs]def tau_before_middle(orders, ptype, interfaces): """Return the steps a path takes to first cross the middle interface. For a left-entering path (``LMR`` / ``LML``) this is the number of steps between the first crossing of the left interface ``interfaces[0]`` and the first crossing of the middle interface ``interfaces[1]``; for a right-entering path (``RML`` / ``RMR``) it is measured from the right interface ``interfaces[2]`` inward. Parameters ---------- orders : numpy.ndarray The per-phase-point order parameters of the path; column 0 is the main order parameter. ptype : string The path type (``LMR`` / ``LML`` / ``RML`` / ``RMR``). interfaces : sequence of float The ensemble's ``(left, middle, right)`` interfaces. Returns ------- int The number of steps before the middle interface is crossed. Raises ------ ValueError If the path type is not recognised. """ orders = np.asarray(orders) if ptype in ('LMR', 'LML'): first = np.where(orders[:, 0] >= interfaces[0])[0][0] middle = np.where(orders[:, 0] >= interfaces[1])[0][0] return middle - first if ptype in ('RML', 'RMR'): first = np.where(orders[:, 0] <= interfaces[2])[0][0] middle = np.where(orders[:, 0] <= interfaces[1])[0][0] return middle - first raise ValueError(f"Unknown path type {ptype}")
[docs]def tau_after_middle(orders, ptype, interfaces): """Return the steps a path takes after its last middle crossing. The mirror of :py:func:`tau_before_middle`, measured from the end of the path inward: for a path that exits left (``LML`` / ``RML``) from the left interface, for a path that exits right (``LMR`` / ``RMR``) from the right interface. Parameters ---------- orders : numpy.ndarray The per-phase-point order parameters of the path. ptype : string The path type (``LMR`` / ``LML`` / ``RML`` / ``RMR``). interfaces : sequence of float The ensemble's ``(left, middle, right)`` interfaces. Returns ------- int The number of steps after the last middle crossing. Raises ------ ValueError If the path type is not recognised. """ orders = np.asarray(orders) reverse = orders[::-1, 0] if ptype in ('LML', 'RML'): last = np.where(reverse >= interfaces[0])[0][0] middle = np.where(reverse >= interfaces[1])[0][0] return middle - last if ptype in ('LMR', 'RMR'): last = np.where(reverse <= interfaces[2])[0][0] middle = np.where(reverse <= interfaces[1])[0][0] return middle - last raise ValueError(f"Unknown path type {ptype}")
[docs]def tau_total(orders, ptype, interfaces): """Return the path length between its first and last boundary crossing. The total time the path spends inside the ensemble, excluding the leading segment before the entry crossing and the trailing segment after the exit crossing. Equals ``tau_before_middle`` + ``tau_after_middle`` + the middle (recrossing) time. Parameters ---------- orders : numpy.ndarray The per-phase-point order parameters of the path. ptype : string The path type (``LMR`` / ``LML`` / ``RML`` / ``RMR``). interfaces : sequence of float The ensemble's ``(left, middle, right)`` interfaces. Returns ------- int The number of interior steps of the path. Raises ------ ValueError If the path type is not recognised. """ orders = np.asarray(orders) if ptype in ('LMR', 'LML'): start = np.where(orders[:, 0] >= interfaces[0])[0][0] elif ptype in ('RML', 'RMR'): start = np.where(orders[:, 0] <= interfaces[2])[0][0] else: raise ValueError(f"Unknown path type {ptype}") reverse = orders[::-1, 0] if ptype in ('LML', 'RML'): end = np.where(reverse >= interfaces[0])[0][0] elif ptype in ('LMR', 'RMR'): end = np.where(reverse <= interfaces[2])[0][0] else: raise ValueError(f"Unknown path type {ptype}") return len(orders) - start - end
[docs]def path_type_taus(orders_list, lmrs, weights, interfaces): """Return the weighted-average transition times per path type. For every accepted path the total interior time (:py:func:`tau_total`), the time before the middle crossing (:py:func:`tau_before_middle`) and the time after it (:py:func:`tau_after_middle`) are computed and weight-averaged per path type. The middle (recrossing) time ``taum`` is the remainder ``tau - tau1 - tau2``. These per-type, per-ensemble averages feed the REPPTIS MSM mean-first-passage-time vector (:py:func:`pyretis.analysis.repptis_msm.construct_tau_vector`). Parameters ---------- orders_list : sequence of numpy.ndarray One order-parameter trajectory per accepted path. lmrs : sequence of string The path type of each accepted path (same length/order). weights : sequence of int or float The Monte-Carlo weight of each accepted path. interfaces : sequence of float The ensemble's ``(left, middle, right)`` interfaces. Returns ------- dict Maps each path type (``LML`` / ``LMR`` / ``RML`` / ``RMR``) to a dict with keys ``tau`` / ``tau1`` / ``tau2`` / ``taum`` (the weighted averages, or ``nan`` when the type is unsampled). """ lmrs = np.asarray(lmrs) weights = np.asarray(weights, dtype=float) taus = {pathtype: {'tau': np.nan, 'tau1': np.nan, 'tau2': np.nan, 'taum': np.nan} for pathtype in ('LML', 'LMR', 'RML', 'RMR')} for pathtype in ('LML', 'LMR', 'RML', 'RMR'): mask = lmrs == pathtype total_weight = float(np.sum(weights[mask])) if mask.any() else 0.0 if total_weight == 0.0: continue tau_vals, tau1_vals, tau2_vals, type_weights = [], [], [], [] for keep, orders, weight in zip(mask, orders_list, weights): if not keep: continue tau_vals.append(tau_total(orders, pathtype, interfaces)) tau1_vals.append(tau_before_middle(orders, pathtype, interfaces)) tau2_vals.append(tau_after_middle(orders, pathtype, interfaces)) type_weights.append(weight) type_weights = np.asarray(type_weights, dtype=float) tau = float(np.average(tau_vals, weights=type_weights)) tau1 = float(np.average(tau1_vals, weights=type_weights)) tau2 = float(np.average(tau2_vals, weights=type_weights)) taus[pathtype] = {'tau': tau, 'tau1': tau1, 'tau2': tau2, 'taum': tau - tau1 - tau2} return taus