# Copyright (c) 2026, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Markov-state-model kinetics for partial path TIS (REPPTIS).
REPPTIS samples *short* partial paths, so the crossing probability,
mean first passage time (MFPT), flux and rate cannot be read off the
path ensembles directly the way they are for full RETIS. This module
reconstructs them by building a Markov state model (MSM) over the
partial-path ensembles from the local crossing probabilities and the
per-ensemble transition times, following the closed-form analysis of
Vervust, Wils, Safaei, Zhang and Ghysels [vervust2026]_ (building on
the permeability-from-(RE)TIS formalism of Ghysels, Roet, Davoudi and
van Erp [ghysels2021]_, and the REPPTIS replica-exchange scheme of
Vervust, Zhang, van Erp and Ghysels [vervust2023]_).
This is a faithful port of the ``repptis_msm`` module from the
``tistools`` analysis package (Elias W., May 2025); the numerical
construction is preserved verbatim, only adapted to the PyRETIS coding
conventions.
Note on the index convention
----------------------------
In the paper, ``N + 1`` is the number of interfaces, while in this
code ``N`` is the number of interfaces directly. Both yield the same
state-space size ``NS = 4 * N - 5`` (paper: ``4 * (N - 1) - 1``).
Important methods defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
construct_M (:py:func:`.construct_M`)
Build the PPTIS transition matrix from the local crossing
probabilities.
construct_M_milestoning (:py:func:`.construct_M_milestoning`)
Build the milestoning transition matrix.
global_pcross_msm (:py:func:`.global_pcross_msm`)
Global crossing probability from the transition matrix.
mfpt_to_absorbing_states (:py:func:`.mfpt_to_absorbing_states`)
Mean first passage time to a set of absorbing states.
mfpt_to_first_last_state (:py:func:`.mfpt_to_first_last_state`)
Mean first passage time to the first or last state.
construct_tau_vector (:py:func:`.construct_tau_vector`)
Flatten the per-path-type transition times into the MSM vector.
References
----------
.. [vervust2026] W. Vervust, E. Wils, S. Safaei, D. T. Zhang and
A. Ghysels, "Estimating full path lengths and kinetics from partial
path transition interface sampling simulations", J. Chem. Theory
Comput. (2026), https://doi.org/10.1021/acs.jctc.5c01498
(arXiv:2602.12835).
.. [ghysels2021] A. Ghysels, S. Roet, S. Davoudi and T. S. van Erp,
"Exact non-Markovian permeability from rare event simulations",
Phys. Rev. Research 3, 033068 (2021),
https://doi.org/10.1103/PhysRevResearch.3.033068.
.. [vervust2023] W. Vervust, D. T. Zhang, T. S. van Erp and
A. Ghysels, "Path sampling with memory reduction and replica
exchange to reach long permeation timescales", Biophys. J. 122,
2960-2972 (2023), https://doi.org/10.1016/j.bpj.2023.02.021.
"""
import logging
import numpy as np
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())
__all__ = [
'construct_tau_vector',
'construct_M',
'construct_M_N3',
'construct_M_milestoning',
'global_pcross_msm',
'mfpt_to_absorbing_states',
'mfpt_to_first_last_state',
'check_valid_indices',
'get_pieces_matrix',
'get_pieces_vector',
'create_labels_states',
'create_labels_states_all',
'print_vector',
'print_all_tau',
]
# =====================================
# PPTIS
# =====================================
[docs]def construct_tau_vector(N, NS, taumm, taump, taupm, taupp):
"""Construct the flattened vector of PPTIS transition times.
The per-path-type transition times (LML, LMR, RML, RMR) are
organised into a single flattened vector following the PPTIS
ensemble structure.
Parameters
----------
N : int
The number of ensembles. Must be at least 3.
NS : int
The expected size of the output vector. Must satisfy
``NS = 4 * N - 5``.
taumm : list of float
Transition times for the LML (Left-to-Left) path type. Must
have length ``N``.
taump : list of float
Transition times for the LMR (Left-to-Right) path type. Must
have length ``N``.
taupm : list of float
Transition times for the RML (Right-to-Left) path type. Must
have length ``N``.
taupp : list of float
Transition times for the RMR (Right-to-Right) path type. Must
have length ``N``.
Returns
-------
tau : numpy.ndarray
A flattened vector of transition times with length ``NS``.
Raises
------
ValueError
If any of the input constraints are not met (e.g. invalid
lengths or values).
"""
# Validate input constraints
if N < 3:
raise ValueError(f"N must be at least 3, but got {N}.")
if NS != 4 * N - 5:
raise ValueError(
f"NS must be equal to 4*N - 5 ({4 * N - 5}), but got {NS}."
)
if (len(taumm) != N or len(taump) != N or len(taupm) != N
or len(taupp) != N):
raise ValueError(
"All input transition time lists (taumm, taump, taupm, "
f"taupp) must have length N={N}."
)
# Initialize the output vector with zeros
tau = np.zeros(NS)
# Assign values for the [0-] ensemble
tau[0] = taupp[0]
# Assign values for the [0+-] ensemble
tau[1] = taumm[1]
tau[2] = taump[1]
tau[3] = taupm[1]
# Assign values for intermediate ensembles [1+-], [2+-], ...
for i in range(1, N - 2):
tau[4 * i] = taumm[i + 1]
tau[4 * i + 1] = taump[i + 1]
tau[4 * i + 2] = taupm[i + 1]
tau[4 * i + 3] = taupp[i + 1]
# Assign values for the [(N-2)^(-1)] ensemble
tau[-3] = taumm[-1]
tau[-2] = taump[-1]
# Assign a placeholder value for the final ensemble (B)
tau[-1] = 0.0 # This value is arbitrary and can be adjusted
return tau
[docs]def construct_M(p_mm, p_mp, p_pm, p_pp, N):
"""Construct the PPTIS transition matrix M.
The matrix describes the probabilities of transitioning between
states in the PPTIS framework, built from the local crossing
probabilities for the different path types (LML, LMR, RML, RMR).
Parameters
----------
p_mm : list of float
Local crossing probabilities for the LML (Left-to-Left) path
type. Must have length ``N - 1``.
p_mp : list of float
Local crossing probabilities for the LMR (Left-to-Right) path
type. Must have length ``N - 1``.
p_pm : list of float
Local crossing probabilities for the RML (Right-to-Left) path
type. Must have length ``N - 1``.
p_pp : list of float
Local crossing probabilities for the RMR (Right-to-Right) path
type. Must have length ``N - 1``.
N : int
The number of interfaces. Must be at least 3.
Returns
-------
M : numpy.ndarray
The transition matrix of shape ``(NS, NS)``, where
``NS = 4 * N - 5``.
Raises
------
ValueError
If any of the input constraints are not met (e.g. invalid
lengths or values).
"""
# Validate input constraints
if N < 3:
raise ValueError(f"N must be at least 3, but got {N}.")
if (len(p_mm) != N - 1 or len(p_mp) != N - 1 or len(p_pm) != N - 1
or len(p_pp) != N - 1):
raise ValueError(
"All input probability lists (p_mm, p_mp, p_pm, p_pp) "
f"must have length N-1={N - 1}."
)
NS = 4 * N - 5 # Dimension of the transition matrix
# Handle the special case for N=3
if N == 3:
return construct_M_N3(p_mm, p_mp, NS)
# Initialize the transition matrix with zeros
M = np.zeros((NS, NS))
# Fill in transitions for states [0-] and [0+-]
M[0, 1:4] = [p_mm[0], p_mp[0], 0] # Transitions from [0-]
M[1, 0] = 1 # Transition from [0+-] back to [0-]
M[2, 4:8] = [p_mm[1], p_mp[1], 0, 0] # Transitions from [0+-]
M[3, 0] = 1 # Transition from [0+-] back to [0-]
# Fill in transitions for states [1+-] (special case)
M[4, 3] = 1 # Transition from [1+-] back to [0+-]
M[6, 3] = 1 # Transition from [1+-] back to [0+-]
# Fill in transitions for states [(N-2)+-] (special case)
M[(N - 2) * 4, -5:-3] = [p_pm[N - 3], p_pp[N - 3]]
M[(N - 2) * 4 + 1, -1] = 1 # Transition from [(N-2)+-] to state B
# Fill in transition for state B (special case)
M[-1, 0] = 1 # Transition from state B back to [0-]
# Fill in transitions for intermediate states [1+-], ..., [(N-3)+-]
for i in range(1, N - 2):
M[1 + 4 * i, 4 * (i + 1):4 * (i + 1) + 2] = [p_mm[i + 1],
p_mp[i + 1]]
M[3 + 4 * i, 4 * (i + 1):4 * (i + 1) + 2] = [p_mm[i + 1],
p_mp[i + 1]]
# Fill in transitions for intermediate states [2+-], ..., [(N-3)+-]
for i in range(2, N - 2):
M[4 * i, 4 * i - 2:4 * i] = [p_pm[i - 1], p_pp[i - 1]]
M[4 * i + 2, 4 * i - 2:4 * i] = [p_pm[i - 1], p_pp[i - 1]]
return M
[docs]def construct_M_N3(p_mm, p_mp, NS):
"""Construct the PPTIS transition matrix M for the case N=3.
Build the transition matrix when there are exactly 3 interfaces.
Parameters
----------
p_mm : list of float
Local crossing probabilities for the LML (Left-to-Left) path
type. Must have length 2.
p_mp : list of float
Local crossing probabilities for the LMR (Left-to-Right) path
type. Must have length 2.
NS : int
The dimension of the transition matrix. Must satisfy
``NS = 4 * N - 5`` with ``N = 3``.
Returns
-------
M : numpy.ndarray
The transition matrix of shape ``(NS, NS)``.
Raises
------
ValueError
If the input probability lists do not have the correct length
or if ``NS`` is invalid.
"""
# Validate input constraints
if len(p_mm) != 2 or len(p_mp) != 2:
raise ValueError(
"For N=3, input probability lists (p_mm, p_mp) must have "
"length 2."
)
if NS != 7: # NS = 4*N - 5 = 7 when N=3
raise ValueError(f"For N=3, NS must be 7, but got {NS}.")
# Initialize the transition matrix with zeros
M = np.zeros((NS, NS))
# Fill in transitions for states [0-] and [0+-]
M[0, 1:4] = [p_mm[0], p_mp[0], 0] # Transitions from [0-]
M[1, 0] = 1 # Transition from [0+-] back to [0-]
M[2, 4:6] = [p_mm[1], p_mp[1]] # Transitions from [0+-] (N=3)
M[3, 0] = 1 # Transition from [0+-] back to [0-]
# Fill in transitions for states [1+-] (special case)
M[4, 3] = 1 # Transition from [1+-] back to [0+-]
# Fill in transitions for states [(N-2)+-] (special case)
M[5, -1] = 1 # Transition from [(N-2)+-] to state B
# Fill in transition for state B (special case)
M[-1, 0] = 1 # Transition from state B back to [0-]
return M
# =====================================
# Milestoning
# =====================================
[docs]def construct_M_milestoning(p_min, p_plus, N):
"""Construct the transition matrix M for milestoning PPTIS.
Build a transition matrix for a milestoning framework. The states
are lambda0-, lambda0+, lambda1, ..., lambda(N-1) = B.
Parameters
----------
p_min : list of float
Probabilities of transitioning to the previous milestone. Must
have length ``N - 1``.
p_plus : list of float
Probabilities of transitioning to the next milestone. Must
have length ``N - 1``.
N : int
The number of interfaces (milestones). Must be at least 3.
Returns
-------
M : numpy.ndarray
The transition matrix of shape ``(NS, NS)``, where
``NS = N + 1``.
Raises
------
ValueError
If ``N`` is less than 3 or if the lengths of ``p_min`` and
``p_plus`` are not ``N - 1``.
"""
# Validate input constraints
if N < 3:
raise ValueError(f"N must be at least 3, but got {N}.")
if len(p_min) != N - 1 or len(p_plus) != N - 1:
raise ValueError(
f"Both p_min and p_plus must have length N-1={N - 1}."
)
# NS: states (lambda0-, lambda0+, lambda1, ..., lambda(N-1)=B)
NS = N + 1
# Initialize the transition matrix with zeros
M = np.zeros((NS, NS))
# Transitions for lambda0- and lambda0+
M[0, 1] = 1 # Transition from lambda0- to lambda0+
M[1, 0] = p_min[0] # Transition from lambda0+ to lambda0-
M[1, 2] = p_plus[0] # Transition from lambda0+ to lambda1
# Transitions for lambda1
M[2, 0] = p_min[1] # Transition from lambda1 to lambda0-
M[2, 3] = p_plus[1] # Transition from lambda1 to lambda2
# Transitions for lambda2 to lambda(N-2)
# Shift index by 1 because lambda0 has two rows/cols.
for i in range(2, N - 1):
M[i + 1, i - 1 + 1] = p_min[i] # To the previous milestone
M[i + 1, i + 1 + 1] = p_plus[i] # To the next milestone
# Transitions for state B (lambda(N-1))
M[N, 0] = 1 # Transition from B back to lambda0-
return M
# =====================================
# crossing probabilities
# =====================================
[docs]def global_pcross_msm(M, doprint=False):
"""Compute global crossing probabilities in a Markov process.
Calculate the probability of reaching state ``-1`` before state
``0`` under different conditions, using the transition matrix. The
function solves a linear system rather than inverting matrices,
for better numerical stability.
Parameters
----------
M : numpy.ndarray
A square transition matrix representing the Markov process.
Must have at least 3 states.
doprint : bool, optional
If True, log intermediate computation details. Default False.
Returns
-------
z1 : numpy.ndarray
A 2-element array with crossing probabilities for states 0
and -1.
z2 : numpy.ndarray
An ``(NS-2)``-element array with crossing probabilities for
the other states.
y1 : numpy.ndarray
A 2-element array with adjusted crossing probabilities for
states 0 and -1. ``y1[0]`` is the global crossing probability
from state 0 to -1, given that the process starts at 0 and
leaves it.
y2 : numpy.ndarray
An ``(NS-2)``-element array with adjusted crossing
probabilities for the other states.
Raises
------
ValueError
If the transition matrix has fewer than three states.
"""
NS = len(M)
if NS < 3:
raise ValueError(
f"Transition matrix must have at least 3 states, but got "
f"{NS}."
)
# Extract the core transition matrix (excluding boundary states)
Mp = M[1:-1, 1:-1]
a = np.identity(NS - 2) - Mp # (I - Mp) for solving equations
# Transition probabilities related to states 0 and -1
D = M[1:-1, np.array([0, -1])] # Intermediate -> 0 or -1
E = M[np.array([0, -1]), 1:-1] # 0 or -1 -> intermediate
M11 = M[np.ix_(np.array([0, -1]), np.array([0, -1]))] # 0/-1 block
# Solve for Z vector (probability of reaching -1 before 0)
z1 = np.array([[0], [1]]) # Boundary: 0 for state 0, 1 for state -1
z2 = np.linalg.solve(a, np.dot(D, z1)) # (I - Mp) z2 = D z1
# Y vector (adjusted probability given leaving the current state)
y1 = np.dot(M11, z1) + np.dot(E, z2) # prob reaching -1 leaving 0
y2 = np.dot(D, z1) + np.dot(Mp, z2)
if doprint:
logger.info("Eigenvalues of Mp:\n%s", np.linalg.eigvals(Mp))
logger.info("Eigenvalues of (I - Mp):\n%s", np.linalg.eigvals(a))
logger.info("Transition matrix components:")
logger.info("D:\n%s", D)
logger.info("E:\n%s", E)
logger.info("M11:\n%s", M11)
logger.info("Vectors:")
logger.info("z1:\n%s", z1)
logger.info("z2:\n%s", z2)
logger.info("y1:\n%s", y1)
logger.info("y2:\n%s", y2)
logger.info("Verification (should be ~0): %s",
np.sum((y2 - z2) ** 2))
return z1, z2, y1, y2
# =====================================
# Mean first passage times
# =====================================
[docs]def mfpt_to_absorbing_states(M, tau1, taum, tau2, absor, kept,
doprint=False, remove_initial_m=True):
"""Compute the mean first passage time (MFPT) to absorbing states.
Calculate the MFPT to reach any of the specified absorbing states,
both unconditionally (``g``) and conditionally on leaving the
current state (``h``). The system is solved without explicit
matrix inversion, for numerical stability.
Parameters
----------
M : numpy.ndarray
The transition matrix of the Markov process.
tau1 : numpy.ndarray
Time before the first visit to an absorbing state.
taum : numpy.ndarray
Time spent between the first and last visit to an absorbing
state.
tau2 : numpy.ndarray
Time after the last visit to an absorbing state.
absor : list or numpy.ndarray
Indices of absorbing states.
kept : list or numpy.ndarray
Indices of nonboundary (non-absorbing) states.
doprint : bool, optional
If True, log intermediate computation details. Default False.
remove_initial_m : bool or str, optional
If True or ``"m"``, the middle part (``taum``) is removed from
the initial state's MFPT. Default True.
Returns
-------
g1 : numpy.ndarray
Array of size ``(n_absorb, 1)`` with unconditional MFPTs for
the absorbing states.
g2 : numpy.ndarray
Array of size ``(n_kept, 1)`` with unconditional MFPTs for the
nonboundary states.
h1 : numpy.ndarray
Array of size ``(n_absorb, 1)`` with conditional MFPTs for the
absorbing states.
h2 : numpy.ndarray
Array of size ``(n_kept, 1)`` with conditional MFPTs for the
nonboundary states.
Raises
------
ValueError
If the transition matrix has fewer than three states, or if
the absorbing and nonboundary states do not partition the
state space.
"""
taum2 = taum + tau2 # Total time excluding the initial phase
NS = len(M)
if NS < 3:
raise ValueError(
f"Transition matrix must have at least 3 states, but got "
f"{NS}."
)
check_valid_indices(M, absor, kept)
if len(M) != len(absor) + len(kept):
raise ValueError(
"The number of states must be the sum of absorbing and "
"nonboundary states."
)
# Submatrix for nonboundary states
Mp = np.take(np.take(M, kept, axis=0), kept, axis=1)
# Transition probabilities between nonboundary and absorbing states
D = np.take(np.take(M, kept, axis=0), absor, axis=1)
E = np.take(np.take(M, absor, axis=0), kept, axis=1)
M11 = np.take(np.take(M, absor, axis=0), absor, axis=1)
a = np.identity(len(Mp)) - Mp # (I - Mp) for solving equations
# Construct time vectors. (m2) is the sum of the middle part (m)
# and the end part (2).
t1 = taum2[absor].reshape(len(absor), 1) # (m2) from absorbing
tp = taum2[kept].reshape(len(kept), 1) # (m2) from nonboundary
st1 = taum[absor].reshape(len(absor), 1) # (m) of absorbing state
stp = taum[kept].reshape(len(kept), 1) # (m) for nonboundary
# Compute G vector (unconditional MFPT)
g1 = np.zeros((len(absor), 1)) # Boundary condition
g2 = np.linalg.solve(a, np.dot(D, g1) + tp) # (I - Mp) g2 = D g1+tp
# Compute H vector (conditional MFPT)
h1 = np.dot(M11, g1) + np.dot(E, g2) + t1
h2 = np.dot(D, g1) + np.dot(Mp, g2) + tp
# Remove the middle part (m) of the initial state if specified
if remove_initial_m:
h1 -= st1
h2 -= stp
if doprint:
logger.info("Eigenvalues of Mp:\n%s", np.linalg.eigvals(Mp))
logger.info("Eigenvalues of (I - Mp):\n%s", np.linalg.eigvals(a))
logger.info("Transition matrix components:")
logger.info("D:\n%s", D)
logger.info("E:\n%s", E)
logger.info("M11:\n%s", M11)
logger.info("Time vectors (tau m2):")
logger.info("t1:\n%s", t1)
logger.info("tp:\n%s", tp)
logger.info("Vectors:")
logger.info("g1:\n%s", g1)
logger.info("g2:\n%s", g2)
logger.info("h1:\n%s", h1)
logger.info("h2:\n%s", h2)
logger.info("Verification (should be ~0): %s",
np.sum((g2 - h2) ** 2))
return g1, g2, h1, h2
[docs]def mfpt_to_first_last_state(M, tau1, taum, tau2, doprint=False):
"""Compute the MFPT to reach either state 0 or state -1.
Calculate the MFPT to reach the first (0) or last (``NS-1``) state
by treating these two states as absorbing. The key result is
``h1[0]``, the MFPT from state 0 to either 0 or -1, given that the
process leaves state 0. Calls
:py:func:`mfpt_to_absorbing_states` with ``remove_initial_m="m"``
so the intermediate passage time is excluded.
Parameters
----------
M : numpy.ndarray
The transition matrix of the Markov process.
tau1 : numpy.ndarray
Time before the first visit to an absorbing state.
taum : numpy.ndarray
Time spent between the first and last visit to an absorbing
state.
tau2 : numpy.ndarray
Time after the last visit to an absorbing state.
doprint : bool, optional
If True, log intermediate computation details. Default False.
Returns
-------
g1 : numpy.ndarray
Unconditional MFPT for the absorbing states (0 and ``NS-1``).
g2 : numpy.ndarray
Unconditional MFPT for the nonboundary states.
h1 : numpy.ndarray
Conditional MFPT for the absorbing states.
h2 : numpy.ndarray
Conditional MFPT for the nonboundary states.
Raises
------
ValueError
If the transition matrix has fewer than three states.
"""
NS = len(M)
if NS < 3:
raise ValueError(
f"Transition matrix must have at least 3 states, but got "
f"{NS}."
)
absor = np.array([0, NS - 1])
kept = np.array([i for i in range(NS) if i not in absor])
return mfpt_to_absorbing_states(M, tau1, taum, tau2, absor, kept,
doprint=doprint,
remove_initial_m="m")
# =====================================
# help functions
# =====================================
[docs]def check_valid_indices(M, absor, kept):
"""Validate the absorbing and non-absorbing state indices.
Ensure that the indices of absorbing (``absor``) and non-absorbing
(``kept``) states are correctly defined, in range, and partition
the state space (each state in exactly one of the two sets).
Parameters
----------
M : numpy.ndarray
The transition matrix of the Markov process.
absor : array-like
Indices of absorbing states.
kept : array-like
Indices of non-absorbing (nonboundary) states.
Raises
------
ValueError
If there are duplicate indices in ``absor`` or ``kept``, if
any index is out of bounds, or if any state is neither in
``absor`` nor in ``kept`` (or in both).
"""
NS = len(M)
if len(set(absor)) != len(absor):
raise ValueError("Duplicate indices found in `absor`.")
if len(set(kept)) != len(kept):
raise ValueError("Duplicate indices found in `kept`.")
if min(absor) < 0 or max(absor) >= NS:
raise ValueError(
"Indices in `absor` must be within the valid range "
"[0, NS-1]."
)
if min(kept) < 0 or max(kept) >= NS:
raise ValueError(
"Indices in `kept` must be within the valid range "
"[0, NS-1]."
)
for i in range(NS):
if (i in absor) == (i in kept):
raise ValueError(
f"State {i} must be in either `absor` or `kept`, but "
"not both."
)
[docs]def get_pieces_matrix(M, absor, kept):
"""Partition a transition matrix into absorbing/nonboundary blocks.
Split ``M`` into four submatrices: ``Mp`` (nonboundary to
nonboundary), ``D`` (nonboundary to absorbing), ``E`` (absorbing to
nonboundary), and ``M11`` (absorbing to absorbing).
Parameters
----------
M : numpy.ndarray
The transition matrix of the Markov process.
absor : array-like
Indices of absorbing states.
kept : array-like
Indices of non-absorbing (nonboundary) states.
Returns
-------
Mp : numpy.ndarray
Submatrix for nonboundary-to-nonboundary transitions.
D : numpy.ndarray
Submatrix for nonboundary-to-absorbing transitions.
E : numpy.ndarray
Submatrix for absorbing-to-nonboundary transitions.
M11 : numpy.ndarray
Submatrix for absorbing-to-absorbing transitions.
Raises
------
ValueError
If ``absor`` and ``kept`` contain invalid or overlapping
indices.
"""
check_valid_indices(M, absor, kept)
# Extract matrix blocks
Mp = np.take(np.take(M, kept, axis=0), kept, axis=1)
D = np.take(np.take(M, kept, axis=0), absor, axis=1)
E = np.take(np.take(M, absor, axis=0), kept, axis=1)
M11 = np.take(np.take(M, absor, axis=0), absor, axis=1)
# Handle the edge case where there is only one absorbing state
if len(absor) == 1:
D = D.reshape(-1, 1) # Ensure D is a column vector
E = E.reshape(1, -1) # Ensure E is a row vector
M11 = M11.reshape(1, 1) # Ensure M11 is a 1x1 matrix
return Mp, D, E, M11
[docs]def get_pieces_vector(vec, absor, kept):
"""Partition a vector into absorbing/nonboundary sub-vectors.
Split ``vec`` into ``v1`` (elements at absorbing-state indices) and
``v2`` (elements at nonboundary-state indices).
Parameters
----------
vec : numpy.ndarray
A 1D array of state values.
absor : array-like
Indices of absorbing states.
kept : array-like
Indices of non-absorbing (nonboundary) states.
Returns
-------
v1 : numpy.ndarray
A column vector (``len(absor) x 1``) of elements at ``absor``.
v2 : numpy.ndarray
A column vector (``len(kept) x 1``) of elements at ``kept``.
Raises
------
ValueError
If ``absor`` and ``kept`` contain invalid or overlapping
indices, if ``vec`` is not 1D, or if ``vec`` does not have the
expected length ``len(absor) + len(kept)``.
"""
# Ensure `vec` is a 1D array
vec = np.asarray(vec)
if vec.ndim != 1:
raise ValueError(
f"Expected `vec` to be 1D, but got shape {vec.shape}"
)
# Ensure the length of vec matches the expected total size
NS = len(vec)
if NS != len(absor) + len(kept):
raise ValueError(
f"Expected `vec` to have length {len(absor) + len(kept)}, "
f"but got {NS}"
)
check_valid_indices(vec, absor, kept) # dummy matrix for validation
# Extract vector components
v1 = vec[np.array(absor)].reshape(len(absor), 1)
v2 = vec[np.array(kept)].reshape(len(kept), 1)
return v1, v2
[docs]def create_labels_states(N):
"""Generate labels for absorbing and non-absorbing states.
Create two lists of state labels: ``labels1`` for the two
absorbing states (``0-`` and ``B``), and ``labels2`` for the
``N-2`` non-absorbing states.
Parameters
----------
N : int
The total number of states. Must be at least 3.
Returns
-------
labels1 : list of str
Labels for the two absorbing states.
labels2 : list of str
Labels for the ``N-2`` non-absorbing states.
Raises
------
ValueError
If ``N`` is less than 3.
"""
if N < 3:
raise ValueError(f"Expected N >= 3, but got {N}")
labels1 = ["0- ", "B "]
labels2 = ["0+- LML", "0+- LMR", "0+- RML", "1+- LML", "1+- LMR"]
if N > 3:
for i in range(1, N - 2):
labels2.extend([
f"{i}+- RML",
f"{i}+- RMR",
f"{i + 1}+- LML",
f"{i + 1}+- LMR",
])
return labels1, labels2
[docs]def create_labels_states_all(N):
"""Generate labels for all states in sequential order.
Create a single list of state labels: the absorbing state ``0-``,
all non-absorbing states, then the absorbing state ``B``.
Parameters
----------
N : int
The total number of states. Must be at least 3.
Returns
-------
labels : list of str
Labels for all ``N`` states in sequential order.
Raises
------
ValueError
If ``N`` is less than 3.
"""
if N < 3:
raise ValueError(f"Expected N >= 3, but got {N}")
labels = ["0- ", "0+- LML", "0+- LMR", "0+- RML",
"1+- LML", "1+- LMR"]
if N > 3:
for i in range(1, N - 2):
labels.extend([
f"{i}+- RML",
f"{i}+- RMR",
f"{i + 1}+- LML",
f"{i + 1}+- LMR",
])
labels.append("B ")
return labels
[docs]def print_vector(g, states=None, sel=None):
"""Print a vector ``g`` with the corresponding state labels.
Parameters
----------
g : array-like
The vector to print.
states : list of str, optional
Labels corresponding to each state. If None, indices are used.
sel : list of int, optional
Indices of the selected states to print. If None, all states
are printed.
Raises
------
ValueError
If ``sel`` is provided but does not match the length of ``g``.
"""
if sel is not None and len(g) != len(sel):
raise ValueError(
"Length of `g` must match length of `sel` if `sel` is "
"provided."
)
for i in range(len(g)):
if states and sel:
state_label = f"state {states[sel[i]]}"
elif states:
state_label = f"state {states[i]}"
else:
state_label = f"state {i}"
if isinstance(g[i], (list, tuple, np.ndarray)):
value = g[i][0]
else:
value = g[i]
print(f"{state_label}: {value}")
[docs]def print_all_tau(pathensembles, taumm, taump, taupm, taupp):
"""Print all tau values for each path ensemble.
Parameters
----------
pathensembles : list
List of path ensemble objects, each with a ``.name`` attribute.
taumm : array-like
First tau metric for each path.
taump : array-like
Second tau metric for each path.
taupm : array-like
Third tau metric for each path.
taupp : array-like
Fourth tau metric for each path.
Raises
------
ValueError
If the input arrays do not have the same length as
``pathensembles``.
"""
num_paths = len(pathensembles)
if not all(len(arr) == num_paths
for arr in [taumm, taump, taupm, taupp]):
raise ValueError(
"All tau arrays must have the same length as "
"`pathensembles`."
)
print(f"{'Index':<5} {'Name':<5} {'mm':>12} {'mp':>12} "
f"{'pm':>12} {'pp':>12}")
print("-" * 53)
for i in range(num_paths):
if hasattr(pathensembles[i], 'name'):
name_suffix = pathensembles[i].name[-3:]
else:
name_suffix = "N/A"
print(f"{i:<5} {name_suffix:<5} {taumm[i]:12.1f} "
f"{taump[i]:12.1f} {taupm[i]:12.1f} {taupp[i]:12.1f}")