"""Replica-exchange (REPEX) coordinator.
Runs the infinite-swap parallel REPEX bookkeeping for the scheduler in
:py:mod:`pyretis.simulation.scheduler`. A future phase will collapse
this with pyretis's :py:mod:`pyretis.core.retis` swap moves.
"""
import logging
import os
import shutil
import time
from datetime import datetime
import numpy as np
import tomli_w
from numpy.random import default_rng
from pyretis.inout.common import make_dirs
from pyretis.core._tis_inf import calc_cv_vector
from pyretis.core.provenance import collect_provenance
from pyretis.engines.factory import assign_engines
from pyretis.inout._formatter_inf import PathStorage
from pyretis.inout.native_output import write_native_pathensemble_data
logger = logging.getLogger("main") # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())
DATE_FORMAT = "%Y.%m.%d %H:%M:%S"
[docs]def spawn_rng(rgen):
"""
Reimplementation of np.random.Generator.spawn() for numpy <= 1.24.4.
Spawns a new random number generator (RNG) from an existing RNG.
This function creates a new instance of the same type of RNG as the input
RNG, using a seed generated from the input RNG's bit generator.
Parameters:
rgen (np.random.Generator): The input random number generator.
Returns:
np.random.Generator: A new random number generator instance.
"""
return type(rgen)(
type(rgen.bit_generator)(seed=rgen.bit_generator._seed_seq.spawn(1)[0])
)
[docs]class InfSwapState:
"""Define the infinite-swapping replica-exchange (REPEX) state object."""
# immutable "no worker yet" sentinel; safe at class scope.
cworker = None
[docs] def __init__(self, config, minus=False):
"""Initiate the swap state given the config dict from a TOML file."""
# Per-instance state. Previously the dicts and PathStorage lived
# at class scope, so two InfSwapState instances in the same
# process shared trajectory bookkeeping and the on-disk path
# archive -- a silent correctness risk for tests, restart
# tooling, or any future multi-simulation workflow.
self.config = config
self.traj_data: dict = {}
self.ensembles: dict = {}
self.engine_occ: dict = {}
# Lazily-built streaming engine used to recompute per-frame
# vpot/ekin for the native energy.txt output (see
# ``_native_energy_engine``); ``None`` once built means the
# configured engine cannot recompute energy.
self._energy_engine = None
self._energy_engine_built = False
self.pstore = PathStorage()
# storage of additional trajectory files
self.pstore.keep_traj_fnames = config.get("output", {}).get(
"keep_traj_fnames", []
)
# set rng
if "restarted_from" in config["current"]:
self.set_rgen()
else:
self.rgen = default_rng(seed=config["simulation"]["seed"])
n = config["current"]["size"]
if minus:
self._offset = int(minus)
n += int(minus)
else:
self._offset = 0
self.n = n
self.state = np.zeros(shape=(n, n))
self._locks = np.ones(shape=n)
self._last_prob = None
self._random_count = 0
self._trajs = [""] * n
self.zeroswap = config["simulation"]["zeroswap"]
# ∞REPPTIS: when set, adjacent POSITIVE ensembles may swap (the
# PPTIS swap), not just the [0^-]<->[0^+] zero swap. Off by default
# so standard infinite-swapping RETIS is unchanged.
self.repptis = bool(config["simulation"].get("repptis", False))
self.pick_scheme = config["simulation"]["pick_scheme"]
tis_set = config.get("simulation", {}).get("tis_set", {})
self._perm_threshold = tis_set.get("perm_threshold", 12)
self._perm_n_samples = tis_set.get("perm_n_samples", 10_000)
self._exact_perm_only = tis_set.get("exact_perm_only", False)
# Native permeability "frequency" moves (time-reversal / mirror /
# target-swap) drawn per cycle on top of the configured shooting
# move, mirroring pyretis.core.tis make_tis_step_ensemble. Off
# unless freq/mirror_freq/target_freq are set, so standard runs do
# NO extra RNG draw and stay byte-identical. mirror/targetswap
# PERSIST a global order-function mutation on accept and so require
# n_workers=1 (enforced at config time in the native route).
self._freq = float(tis_set.get("freq", 0.0))
self._mirror_freq = float(tis_set.get("mirror_freq", 0.0))
self._target_freq = float(tis_set.get("target_freq", 0.0))
self.has_freq_moves = (
self._freq + self._mirror_freq + self._target_freq
) > 0.0
# detect any locked ens-path pairs exist pre start
self.locked0 = list(self.config["current"].get("locked", []))
self.locked = []
# determines the number of initiation loops to do.
# either initiate all workers, or less if less steps left.
stepsleft = self.tsteps - self.cstep
self.toinitiate = min([self.workers, stepsleft])
# keep track of olds in case of delete_old = True
self.pn_olds = {}
# Native-output (compatibility-layer) bookkeeping. Populated only
# when the run was entered via the native route
# ([output] native_compat = true); empty/untouched otherwise so
# the normal inf route is unchanged. ``native_rows`` holds the
# per-path native pathensemble.txt column data keyed by path
# number; ``last_frac_increment`` holds, per ensemble index, the
# path number and per-cycle frac increment for the current cycle;
# the counters track the running No.-acc / No.-shoot columns.
self.native_rows: dict = {}
self.last_frac_increment: dict = {}
self.native_nacc: dict = {}
self.native_nshoot: dict = {}
@property
def prob(self):
"""Calculate the P matrix."""
if self._last_prob is None:
prob = self.compute_swap_matrix(abs(self.state), self._locks)
self._last_prob = prob.copy()
return self._last_prob
@property
def cstep(self):
"""Retrieve cstep from config dict."""
return self.config["current"]["cstep"]
@cstep.setter
def cstep(self, val):
"""Iterate += cstep from val."""
self.config["current"]["cstep"] = val
@property
def tsteps(self):
"""Retrieve total steps from config dict."""
return self.config["simulation"]["steps"]
@property
def screen(self):
"""Retrieve screen print frequency from config dict."""
return self.config["output"]["screen"]
@property
def mc_moves(self):
"""Retrieve mc moves list from config dict."""
return self.config["simulation"]["shooting_moves"]
@property
def cap(self):
"""Retrieve mc moves list from config dict."""
return self.config["simulation"]["tis_set"].get("interface_cap", None)
@property
def data_dir(self):
"""Retrieve data_dir from config dict."""
return self.config["output"]["data_dir"]
@property
def data_file(self):
"""Retrieve data_file from config dict."""
return self.config["output"]["data_file"]
@property
def interfaces(self):
"""Retrieve interfaces from config dict."""
return self.config["simulation"]["interfaces"]
@property
def workers(self):
"""Retrieve workers from config dict."""
return self.config["runner"]["workers"]
@property
def native_output(self):
"""Return True when native per-ensemble output is requested.
Set by the native-config input shim via ``[output]
native_compat = true``. The normal inf route never sets it, so
the coordinator keeps writing only ``infswap_data.txt``.
"""
return bool(self.config.get("output", {}).get("native_compat", False))
@property
def maxop(self):
"""Get the maximum orderparameter seen during the simulation."""
return self.config["current"].get("maxop", -float("inf"))
@maxop.setter
def maxop(self, val):
"""Update the maximum orderpameter seen during the sumulation."""
if self.config["output"]["keep_maxop_trajs"]:
self.config["current"]["maxop"] = val
[docs] def pick(self):
"""Pick path and ens."""
prob = self.prob.astype("float64")
if self.pick_scheme > 0:
# Pick ensemble based on weight, primarily only necessary
# for inf-init simulations.
valid_idx = np.where(1.0 - self._locks)[0]
ens_weights = np.zeros(self.n)
ens_weights[valid_idx] = np.arange(1, len(valid_idx) + 1)
prob *= ens_weights**self.pick_scheme
prob = prob.flatten()
p = self.rgen.choice(self.n**2, p=np.nan_to_num(prob / np.sum(prob)))
traj, ens = np.divmod(p, self.n)
self.swap(traj, ens)
self.lock(ens)
traj = self._trajs[ens]
# If available do 0+- swap with 50% probability
ens_nums = (ens - self._offset,)
inp_trajs = (traj,)
if self.repptis:
# ∞REPPTIS (adopted from infretis dz24/infpp): with probability
# ``zeroswap`` attempt a swap with an ADJACENT ensemble. The
# lowest ensemble can only swap up, the highest only down, an
# interior one picks a side at random. A pair that reaches the
# minus ensemble becomes the [0^-]<->[0^+] zero swap (dispatched
# to ``retis_swap_zero``); two positive ensembles dispatch to
# ``ppretis_swap``.
if self.rgen.random() < self.zeroswap:
if ens == 0:
neighb = ens + 1
elif ens == self.n - 1:
neighb = ens - 1
else:
neighb = ens + self.rgen.choice([-1, 1])
if not self._locks[neighb]:
other_traj = self.pick_traj_ens(neighb)
pair = np.array([neighb, ens])
order = np.argsort(pair)
ens_nums = tuple(
int(i) for i in pair[order] - self._offset
)
inp_trajs = tuple(
np.array([other_traj, traj], dtype=object)[order]
)
elif (
(ens == self._offset and not self._locks[self._offset - 1])
or (ens == self._offset - 1 and not self._locks[self._offset])
) and self.rgen.random() < self.zeroswap:
if ens == self._offset:
# ens = 0
other = self._offset - 1
other_traj = self.pick_traj_ens(other)
ens_nums = (-1, 0)
inp_trajs = (other_traj, traj)
else:
# ens = -1
other = self._offset
other_traj = self.pick_traj_ens(other)
ens_nums = (-1, 0)
inp_trajs = (traj, other_traj)
# lock and print the picked traj and ens
pat_nums = [str(i.path_number) for i in inp_trajs]
self.locked.append((list(ens_nums), pat_nums))
if self.printing():
self.print_pick(ens_nums, pat_nums, self.cworker)
picked = {}
child_rng = spawn_rng(self.rgen)
for ens_num, inp_traj in zip(ens_nums, inp_trajs):
ens_pick = self.ensembles[ens_num + 1]
ens_pick["rgen"] = spawn_rng(child_rng)
picked[ens_num] = {
"ens": ens_pick,
"traj": inp_traj,
"pn_old": inp_traj.path_number,
}
return picked
[docs] def pick_traj_ens(self, ens):
"""Pick traj ens."""
prob = self.prob.astype("float64")[:, ens].flatten()
traj = self.rgen.choice(self.n, p=np.nan_to_num(prob / np.sum(prob)))
self.swap(traj, ens)
self.lock(ens)
return self._trajs[ens]
[docs] def pick_lock(self):
"""Pick path and ens.
In case a crash, we pick lock locked from previous simulation.
"""
if not self.locked0:
if "restarted_from" in self.config["current"]:
# get the same pick() as pre-restart. Need to set it again
# because current self.rgen was used for calculating self.prob.
self.set_rgen()
return self.pick()
enss = []
trajs = []
enss0, trajs0 = self.locked0.pop(0)
logger.info("pick locked!")
for ens, traj in zip(enss0, trajs0):
enss.append(ens - self._offset)
traj_idx = self.live_paths().index(int(traj))
self.swap(traj_idx, ens)
self.lock(ens)
trajs.append(self._trajs[ens])
if self.printing():
self.print_pick(tuple(enss), tuple(trajs0), self.cworker)
picked = {}
child_rng = spawn_rng(self.rgen)
for ens_num, inp_traj in zip(enss, trajs):
ens_pick = self.ensembles[ens_num + 1]
ens_pick["rgen"] = spawn_rng(child_rng)
picked[ens_num] = {
"ens": ens_pick,
"traj": inp_traj,
"pn_old": inp_traj.path_number,
}
return picked
[docs] def prep_md_items(self, md_items):
"""Fill md_items with picked path and ens."""
# Remove previous picked
md_items.pop("picked", None)
# pick/lock ens & path
if self.toinitiate >= 0:
# assign pin
md_items.update({"pin": self.cworker})
# pick lock
md_items["picked"] = self.pick_lock()
# set the worker folder
w_folder = os.path.join(os.getcwd(), f"worker{md_items['pin']}")
make_dirs(w_folder)
md_items["w_folder"] = w_folder
else:
md_items["picked"] = self.pick()
# Record ens_nums
md_items["ens_nums"] = list(md_items["picked"].keys())
# Native permeability frequency-move selection (tr / targetswap /
# the configured shooting move), drawn per cycle. No-op (and no RNG
# draw) unless freq/mirror_freq/target_freq are configured.
self._apply_freq_moves(md_items["picked"])
# allocate worker pin:
ens_engs = self.config["simulation"]["ensemble_engines"]
eng_names = []
for ens_num in md_items["ens_nums"]:
md_items["picked"][ens_num]["exe_dir"] = md_items["w_folder"]
if self.config["runner"].get("wmdrun", False):
md_items["picked"][ens_num]["wmdrun"] = self.config["runner"][
"wmdrun"
][md_items["pin"]]
# spawn rgen for all engines
ens_rgen = md_items["picked"][ens_num]["ens"]["rgen"]
md_items["picked"][ens_num]["rgen-eng"] = spawn_rng(ens_rgen)
md_items["picked"][ens_num]["pin"] = md_items["pin"]
eng_names += ens_engs[ens_num + 1]
# engine assignment
unique_eng_names = list(set(eng_names))
eng_idx = assign_engines(
self.engine_occ, unique_eng_names, md_items["pin"]
)
for ens_num in md_items["ens_nums"]:
md_items["picked"][ens_num]["eng_idx"] = {
eng: eng_idx[eng] for eng in ens_engs[ens_num + 1]
}
# check time:
md_items["md_start"] = time.time()
# record pnum_old
md_items["pnum_old"] = []
for key in md_items["picked"].keys():
pnum_old = md_items["picked"][key]["traj"].path_number
md_items["pnum_old"].append(pnum_old)
# empty / update md_items:
for key in ["moves", "trial_len", "trial_op", "generated"]:
md_items[key] = []
return md_items
[docs] def _apply_freq_moves(self, picked):
"""Draw the per-cycle native frequency move for a single pick.
Ports the per-cycle move selection of
:func:`pyretis.core.tis.make_tis_step_ensemble` onto the
scheduler. For a single-ensemble (non-swap) pick the configured
shooting move is replaced, with the native ``[tis]`` weights, by a
time reversal (``tr``, weight ``freq``), a mirror (``mr``, weight
``mirror_freq``) or a target swap (``ts``, weight ``target_freq``);
otherwise the configured move is kept (weight ``max(1 - freq -
mirror_freq - target_freq, 0)``). Mirror and target swap are
allowed only in the ``[0^-]`` ensemble -- native ``ensemble_number
== 0``, which is scheduler pick key ``-1`` -- matching native's
``ensemble_number != 0`` gate.
The draw is taken from the ensemble's per-cycle ``rgen`` (the same
generator the move itself consumes, so the choice-then-move
ordering follows native), and the result overrides ``mc_move`` on
the (locked, hence worker-private for this cycle) ensemble dict.
The move order in the draw matches native (``tr``, mirror, target
swap, configured move).
Parameters
----------
picked : dict
The per-ensemble pick dict from :meth:`pick` / :meth:`pick_lock`.
"""
if not self.has_freq_moves:
return
# tr / mirror / targetswap are ensemble moves, never swaps; a
# two-ensemble pick is a zero / PPTIS swap and is left untouched.
if len(picked) != 1:
return
ens_num = next(iter(picked))
pens = picked[ens_num]
base_move = self.mc_moves[ens_num + 1]
# native ensemble_number 0 == [0^-] == scheduler pick key -1
is_zero_minus = ens_num == -1
moves = ["tr", "mr", "ts", base_move]
weight_tr = self._freq
weight_mr = self._mirror_freq if is_zero_minus else 0.0
weight_ts = self._target_freq if is_zero_minus else 0.0
weight_base = max(1.0 - weight_tr - weight_mr - weight_ts, 0.0)
weights = [weight_tr, weight_mr, weight_ts, weight_base]
if sum(weights) > 1.0 + 1.0e-12:
raise ValueError(
"freq move probabilities sum to more than 1: "
f"{list(zip(moves, weights))}"
)
rgen = pens["ens"]["rgen"]
choice = rgen.choice(moves, 1, p=weights)[0]
pens["ens"]["mc_move"] = choice
[docs] def add_traj(self, ens, traj, valid, count=True, n=0):
"""Add traj to state and calculate P matrix."""
if self.repptis:
# ∞REPPTIS one-hot occupancy (infretis dz24/infpp ``add_traj``):
# a PPTIS partial path occupies ONLY its own ensemble, so the
# passed multi-ensemble cv is replaced by a one-hot row and the
# strict TIS validity check does not apply (partial-path
# validity is the start/end side classification done in the
# move acceptance, not a TIS crossing of this ensemble).
valid = np.zeros(self.n)
valid[ens + 1] = 1.0
ens += self._offset
else:
if ens >= 0 and self._offset != 0:
valid = tuple([0 for _ in range(self._offset)] + list(valid))
elif ens < 0:
valid = tuple(
list(valid) + [0 for _ in range(self.n - self._offset)]
)
ens += self._offset
if valid[ens] == 0:
# The path is not valid in ensemble. This situation should
# only occur in the initial path loading.
raise_msg = (
f"Path {traj.path_number} lying in {traj.adress} "
f"is not valid in ensemble {ens:03.0f}!\n"
)
cap = self.cap if self.cap is not None \
else self.interfaces[-1]
if ens > 0:
raise_msg += (
f"Path {traj.path_number} has max_op "
f"{traj.ordermax[0]} and does not have any phase "
f"points between {self.interfaces[ens-1]} and "
f"{cap}.\n"
)
raise ValueError(raise_msg)
# invalidate last prob
self._last_prob = None
self._trajs[ens] = traj
self.state[ens, :] = valid
self.unlock(ens)
# Calculate P matrix
self.prob
[docs] def sort_trajstate(self):
"""Sort trajs and calculate P matrix."""
needstomove = [
self.state[idx][:-1][idx] == 0 for idx in range(self.n - 1)
]
while True in needstomove and self.toinitiate == -1:
ens_idx = list(needstomove).index(True)
locks = self.locked_paths()
zero_idx = list(self.state[ens_idx][1:-1]).index(0) + 1
avail = [1 if i != 0 else 0 for i in self.state[:, zero_idx]]
avail = [
j if self._trajs[i].path_number not in locks else 0
for i, j in enumerate(avail[:-1])
]
trj_idx = avail.index(1)
self.swap(ens_idx, trj_idx)
needstomove = [
self.state[idx][:-1][idx] == 0 for idx in range(self.n - 1)
]
self._last_prob = None
self.prob
[docs] def lock(self, ens):
"""Lock ensemble."""
# invalidate last prob
self._last_prob = None
if self._locks[ens] != 0:
raise RuntimeError(
f"Cannot lock ensemble {ens}: already locked "
f"(lock state = {self._locks[ens]})"
)
self._locks[ens] = 1
[docs] def unlock(self, ens):
"""Unlock ensemble."""
# invalidate last prob
self._last_prob = None
if self._locks[ens] != 1:
raise RuntimeError(
f"Cannot unlock ensemble {ens}: not locked "
f"(lock state = {self._locks[ens]})"
)
self._locks[ens] = 0
[docs] def swap(self, traj, ens):
"""Swap to keep the locks symmetric."""
# mainly to keep the locks symmetric
self.state[[ens, traj]] = self.state[[traj, ens]].copy()
temp1 = self._trajs[ens]
self._trajs[ens] = self._trajs[traj]
self._trajs[traj] = temp1
[docs] def live_paths(self):
"""Return list of live paths."""
return [traj.path_number for traj in self._trajs[:-1]]
[docs] def locked_paths(self):
"""Return list of locked paths."""
locks = [
t0.path_number
for t0, l0 in zip(self._trajs[:-1], self._locks[:-1])
if l0
]
return locks
[docs] def set_rgen(self):
"""Set numpy random generator state from restart."""
seed_sequence = np.random.SeedSequence(
entropy=0, n_children_spawned=self.cstep
)
self.rgen = default_rng(seed_sequence)
self.rgen.bit_generator.state = self.config["current"]["rng_state"]
[docs] def loop(self):
"""Check and iterate loop."""
if self.printing():
if self.cstep not in (
0,
self.config["current"].get("restarted_from", 0),
):
logger.info("date: " + datetime.now().strftime(DATE_FORMAT))
logger.info(
f"------- infinity {self.cstep:5.0f} END -------" + "\n"
)
if self.cstep >= self.tsteps:
# should probably add a check for stopping when all workers
# are free to close the while loop, but for now when
# cstep >= tsteps we return false.
self.print_end()
self.write_toml()
logger.info("date: " + datetime.now().strftime(DATE_FORMAT))
return False
self.cstep += 1
if self.printing() and self.cstep <= self.tsteps:
logger.info(f"------- infinity {self.cstep:5.0f} START -------")
logger.info("date: " + datetime.now().strftime(DATE_FORMAT))
return self.cstep <= self.tsteps
[docs] def initiate(self):
"""Initiate loop."""
if not self.cstep < self.tsteps:
return False
self.cworker = self.workers - self.toinitiate
if self.toinitiate == self.workers:
if self.screen > 0:
self.print_start()
if self.toinitiate < self.workers:
if self.screen > 0:
logger.info(
f"------- submit worker {self.cworker-1} END -------"
+ datetime.now().strftime(DATE_FORMAT)
+ "\n"
)
if self.toinitiate > 0:
if self.screen > 0:
logger.info(
f"------- submit worker {self.cworker} START -------"
+ datetime.now().strftime(DATE_FORMAT)
)
self.toinitiate -= 1
return self.toinitiate >= 0
[docs] def compute_swap_matrix(self, input_mat, locks):
"""Permanent calculator (the infinite-swap probability matrix)."""
if self.repptis:
# ∞REPPTIS uses a one-hot (per-ensemble) occupancy, so the
# infinite-swap permanent is the identity: each ensemble keeps
# its own path in the base pick, and paths move between
# ensembles ONLY through the explicit ppretis_swap move (the
# adjacent-swap branch of ``pick``). Matching infretis
# dz24/infpp's disabled permanent calculator; unlocked
# ensembles only. (Standard ∞RETIS keeps the full permanent.)
out = np.identity(self.n)
out *= (1 - np.asarray(locks))
return out
# Drop locked rows and columns
bool_locks = locks == 1
# get non_locked minus interfaces
offset = self._offset - sum(bool_locks[: self._offset])
# make insert list
i = 0
insert_list = []
for lock in bool_locks:
if lock:
insert_list.append(i)
else:
i += 1
# Drop locked rows and columns
non_locked = input_mat[~bool_locks, :][:, ~bool_locks]
# Sort based on the index of the last non-zero values in the rows
# argmax(a>0) gives back the first column index that is nonzero
# so looping over the columns backwards and multiplying by -1
# gives the right ordering
minus_idx = np.argsort(np.argmax(non_locked[:offset] > 0, axis=1))
pos_idx = (
np.argsort(-1 * np.argmax(non_locked[offset:, ::-1] > 0, axis=1))
+ offset
)
sort_idx = np.append(minus_idx, pos_idx)
sorted_non_locked = non_locked[sort_idx]
# check if all trajectories have equal weights
sorted_non_locked_T = sorted_non_locked.T
# Check the minus interfaces
equal_minus = np.all(
sorted_non_locked_T[
np.where(
sorted_non_locked_T[:, :offset]
!= sorted_non_locked_T[offset - 1, :offset]
)
]
== 0
)
# check the positive interfaces
if len(sorted_non_locked_T) <= offset:
equal_pos = True
else:
equal_pos = np.all(
sorted_non_locked_T[:, offset:][
np.where(
sorted_non_locked_T[:, offset:]
!= sorted_non_locked_T[offset, offset:]
)
]
== 0
)
equal = equal_minus and equal_pos
out = np.zeros(shape=sorted_non_locked.shape, dtype="longdouble")
if equal:
# All trajectories have equal weights, run fast algorithm
# run_fast
# minus move should be run backwards
out[:offset, ::-1] = self.quick_prob(
sorted_non_locked[:offset, ::-1]
)
if offset < len(out):
# Catch only minus ens available
out[offset:] = self.quick_prob(sorted_non_locked[offset:])
else:
# TODO DEBUG print
# print("DEBUG this should not happen outside of wirefencing")
blocks = self.find_blocks(sorted_non_locked, offset=offset)
for start, stop, direction in blocks:
if direction == -1:
cstart, cstop = stop - 1, start - 1
if cstop < 0:
cstop = None
else:
cstart, cstop = start, stop
subarr = sorted_non_locked[start:stop, cstart:cstop:direction]
subarr_T = subarr.T
if len(subarr) == 1:
out[start:stop, start:stop] = 1
elif np.all(subarr_T[np.where(subarr_T != subarr_T[0])] == 0):
# Either the same weight as the last one or zero
temp = self.quick_prob(subarr)
out[start:stop, cstart:cstop:direction] = temp
elif len(subarr) <= self._perm_threshold:
temp = self.permanent_prob(subarr)
out[start:stop, cstart:cstop:direction] = temp
else:
if self._exact_perm_only:
raise RuntimeError(
f"Probability block of size {len(subarr)} exceeds "
f"the exact permanent threshold "
f"({self._perm_threshold}). Set "
f"simulation.tis_set.exact_perm_only = false "
f"to allow stochastic approximation, or increase "
f"simulation.tis_set.perm_threshold."
)
self._random_count += 1
logger.info(
"Stochastic permanent #%d, block size %d, "
"n_samples=%d",
self._random_count, len(subarr),
self._perm_n_samples,
)
temp = self.random_prob(subarr, n=self._perm_n_samples)
out[start:stop, cstart:cstop:direction] = temp
out[sort_idx] = out.copy() # COPY REQUIRED TO NOT BRAKE STATE!!!
row_sums = np.sum(out, axis=1)
col_sums = np.sum(out, axis=0)
if not np.allclose(row_sums, 1):
raise ValueError(
f"Probability matrix rows do not sum to 1: {row_sums}"
)
if not np.allclose(col_sums, 1):
raise ValueError(
f"Probability matrix columns do not sum to 1: {col_sums}"
)
# edge case that negative probs exist: set to zero
if np.sum(out < 0) > 0:
out[out < 0] = 0
logger.info(f"Found {int(np.sum(out < 0))} precision \
errors in the P-matrix, setting negative \
elements to 0. min: {np.min(out):.3e}")
# reinsert zeroes for the locked ensembles
final_out_rows = np.insert(out, insert_list, 0, axis=0)
# reinsert zeroes for the locked trajectories
final_out = np.insert(final_out_rows, insert_list, 0, axis=1)
return final_out
[docs] def find_blocks(self, arr, offset):
"""Find blocks in a W matrix."""
if len(arr) == 1:
return (0, 1, 1)
# Assume no zeroes on the diagonal or lower triangle
temp_arr = arr.copy()
# for counting minus blocks
temp_arr[:offset, :offset] = arr[:offset, :offset].T
temp_arr[offset:, :offset] = 1 # add ones to the lower triangle
non_zero = np.count_nonzero(temp_arr, axis=1)
blocks = []
start = 0
for i, e in enumerate(non_zero):
if e == i + 1:
direction = -1 if start < offset else 1
blocks.append((start, e, direction))
start = e
return blocks
[docs] def quick_prob(self, arr):
"""Quick P matrix calculation for specific W matrix."""
total_traj_prob = np.ones(shape=arr.shape[0], dtype="longdouble")
out_mat = np.zeros(shape=arr.shape, dtype="longdouble")
working_mat = np.where(arr != 0, 1, 0) # convert non-zero numbers to 1
for i, column in enumerate(working_mat.T[::-1]):
ens = column * total_traj_prob
s = ens.sum()
if s != 0:
ens /= s
out_mat[:, -(i + 1)] = ens
total_traj_prob -= ens
# force negative values to 0
total_traj_prob[np.where(total_traj_prob < 0)] = 0
return out_mat
[docs] def permanent_prob(self, arr):
"""P matrix calculation for specific W matrix."""
out = np.zeros(shape=arr.shape, dtype="longdouble")
# Don't overwrite input arr
scaled_arr = arr.copy()
n = len(scaled_arr)
# Rescaling the W-matrix avoids numerical instabilites when the
# matrix is large and contains large weights from
# high-acceptance moves
for i in range(n):
scaled_arr[i, :] /= np.max(scaled_arr[i, :])
for i in range(n):
rows = [r for r in range(n) if r != i]
sub_arr = scaled_arr[rows, :]
for j in range(n):
if scaled_arr[i][j] == 0:
continue
columns = [r for r in range(n) if r != j]
M = sub_arr[:, columns]
f = self.fast_glynn_perm(M)
out[i][j] = f * scaled_arr[i][j]
return out / max(np.sum(out, axis=1))
[docs] def random_prob(self, arr, n=10_000):
"""P matrix calculation for specific W matrix."""
out = np.eye(len(arr), dtype="longdouble")
current_state = np.eye(len(arr))
choices = len(arr) // 2
even = choices * 2 == len(arr)
# The probability to go right
prob_right = np.nan_to_num(np.roll(arr, -1, axis=1) / arr)
# The probability to go left
prob_left = np.nan_to_num(np.roll(arr, 1, axis=1) / arr)
start = 0
zero_one = np.array([0, 1])
p_m = np.array([1, -1])
temp = np.where(current_state == 1)
for i in range(n):
direction = self.rgen.choice(p_m)
if not even:
start = self.rgen.choice(zero_one)
temp_left = prob_left[temp]
temp_right = prob_right[temp]
if not even:
start = self.rgen.choice(zero_one)
if direction == -1:
probs = (
temp_left[start:-1:2]
* np.roll(temp_right, 1, axis=0)[start:-1:2]
)
else:
probs = temp_right[start:-1:2] * temp_left[start + 1::2]
r_nums = self.rgen.random(choices)
success = r_nums < probs
for j in np.where(success)[0]:
idx = j * 2 + start
temp_state = current_state[:, [idx + direction, idx]]
current_state[:, [idx, idx + direction]] = temp_state
temp_state_2 = temp[0][[idx + direction, idx]]
temp[0][[idx, idx + direction]] = temp_state_2
out += current_state
return out / (n + 1)
[docs] def fast_glynn_perm(self, M):
"""Glynn permanent."""
def cmp(a, b):
if a == b:
return 0
elif a > b:
return 1
else:
return -1
row_comb = np.sum(M, axis=0, dtype="longdouble")
n = len(M)
total = 0
old_grey = 0
sign = +1
binary_power_dict = {2**i: i for i in range(n)}
num_loops = 2 ** (n - 1)
for bin_index in range(1, num_loops + 1):
total += sign * np.multiply.reduce(row_comb)
new_grey = bin_index ^ (bin_index // 2)
grey_diff = old_grey ^ new_grey
grey_diff_index = binary_power_dict[grey_diff]
direction = 2 * cmp(old_grey, new_grey)
if direction:
new_vector = M[grey_diff_index]
row_comb += new_vector * direction
sign = -sign
old_grey = new_grey
return total / num_loops
[docs] def write_toml(self):
"""Write the restart state to ``./restart.toml`` atomically.
Sequence: serialise to ``restart.toml.tmp`` with an explicit
``fsync`` so the new payload is on disk; copy any existing
``restart.toml`` to ``restart.toml.prev`` so the previous
cycle's restart survives even if the final rename loses a race
with a power loss; ``os.replace`` the tmp file over
``restart.toml`` (POSIX-atomic on the same filesystem); finally
best-effort fsync the parent directory so the rename commit is
past the filesystem journal.
"""
self.config["current"]["active"] = self.live_paths()
self.config["current"]["provenance"] = collect_provenance()
locked_ep = []
for tup in self.locked:
locked_ep.append(
([int(tup0 + self._offset) for tup0 in tup[0]], tup[1])
)
self.config["current"]["locked"] = locked_ep
self.config["current"]["rng_state"] = self.rgen.bit_generator.state
# save accumulative fracs
self.config["current"]["frac"] = {}
for key in sorted(self.traj_data.keys()):
fracs = [str(i) for i in self.traj_data[key]["frac"]]
self.config["current"]["frac"][str(key)] = fracs
target = "./restart.toml"
tmp = target + ".tmp"
prev = target + ".prev"
with open(tmp, "wb") as f:
tomli_w.dump(self.config, f)
f.flush()
os.fsync(f.fileno())
if os.path.exists(target):
shutil.copy2(target, prev)
os.replace(tmp, target)
try:
dir_fd = os.open(os.path.dirname(target) or ".", os.O_RDONLY)
except OSError:
pass
else:
try:
os.fsync(dir_fd)
finally:
os.close(dir_fd)
[docs] def printing(self):
"""Check if print."""
return self.screen > 0 and np.mod(self.cstep, self.screen) == 0
[docs] def print_pick(self, ens_nums, pat_nums, pin):
"""Print pick."""
if len(ens_nums) > 1 or ens_nums[0] == -1:
move = "sh"
else:
move = self.mc_moves[ens_nums[0] + 1]
ens_p = " ".join([f"00{ens_num+1}" for ens_num in ens_nums])
pat_p = " ".join(pat_nums)
logger.info(
f"shooting {move} in ensembles: {ens_p} with paths:"
f" {pat_p} and worker: {pin}"
)
[docs] def print_shooted(self, md_items, pn_news):
"""Print shooted."""
moves = md_items["moves"]
ens_nums = " ".join([f"00{i+1}" for i in md_items["ens_nums"]])
pnum_old = " ".join([str(i) for i in md_items["pnum_old"]])
pnum_new = " ".join([str(i) for i in pn_news])
trial_lens = " ".join([str(i) for i in md_items["trial_len"]])
trial_ops = " ".join(
[f"[{i[0]:4.4f} {i[1]:4.4f}]" for i in md_items["trial_op"]]
)
status = md_items["status"]
simtime = md_items["md_end"] - md_items["md_start"]
subcycles = md_items["subcycles"]
arrow = "=)" if status == "ACC" else "=("
logger.info(
f"shooted {' '.join(moves)} in ensembles: {ens_nums}"
f" with paths: {pnum_old} {arrow} {pnum_new}"
)
logger.info(
"with status:" f" {status} len: {trial_lens} op: {trial_ops} and"
)
logger.info(
f"worker: {self.cworker} total time:"
f"{simtime:.2f}s and subcycles: {subcycles}"
)
self.print_state()
[docs] def print_start(self):
"""Print start."""
if self.pick_scheme > 0:
logger.info(
f"ensemble selection scheme: {self.pick_scheme}"
+ " should only be used with Inf-init"
)
logger.info("stored ensemble paths:")
ens_num = self.live_paths()
logger.info(
" ".join([f"00{i}: {j}," for i, j in enumerate(ens_num)]) + "\n"
)
self.print_state()
[docs] def print_state(self):
"""Print state."""
last_prob = True
if isinstance(self._last_prob, type(None)):
self.prob
last_prob = False
logger.info("===")
logger.info(" xx |\tv Ensemble numbers v")
to_print = [f"{i:03.0f}" for i in range(self.n - 1)]
for i in range(len(to_print[0])):
to_print0 = " ".join([j[i] for j in to_print])
if i == len(to_print[0]) - 1:
to_print0 += "\t\tmax_op\tmin_op\tlen"
logger.info(" xx |\t" + to_print0)
logger.info(" -- |\t" + "".join("--" for _ in range(self.n + 14)))
locks = self.locked_paths()
oil = False
for idx, live in enumerate(self.live_paths()):
if live not in locks:
to_print = f"p{live:02.0f} |\t"
if (
self.state[idx][:-1][idx] == 0
or self._last_prob[idx][:-1][idx] < 0.001
):
oil = True
for prob in self._last_prob[idx][:-1]:
if prob == 1:
marker = "x "
elif prob == 0:
marker = "- "
else:
marker = f"{int(round(prob*10, 1))} "
# change if marker == 10
if len(marker) == 3:
marker = "9 "
to_print += marker
to_print += f"|\t{self.traj_data[live]['max_op'][0]:5.3f} \t"
to_print += f"{self.traj_data[live]['min_op'][0]:5.3f} \t"
to_print += f"{self.traj_data[live]['length']:5.0f}"
logger.info(to_print)
else:
to_print = f"p{live:02.0f} |\t"
logger.info(
to_print + "".join(["- " for j in range(self.n - 1)]) + "|"
)
if oil:
logger.info("olive oil")
oil = False
logger.info("===")
if not last_prob:
self._last_prob = None
[docs] def print_end(self):
"""Print end."""
live_trajs = self.live_paths()
stopping = self.cstep
logger.info("--------------------------------------------------")
logger.info(f"live trajs: {live_trajs} after {stopping} cycles")
logger.info("==================================================")
logger.info("xxx | 000 001 002 003 004 |")
logger.info("--------------------------------------------------")
for key, item in self.traj_data.items():
values = "\t".join(
[
f"{item0:02.2f}" if item0 != 0.0 else "----"
for item0 in item["frac"][:-1]
]
)
logger.info(f"{key:03.0f} * {values} *")
[docs] def treat_output(self, md_items):
"""Treat output."""
pn_news = []
md_items["md_end"] = time.time()
picked = md_items["picked"]
traj_num = self.config["current"]["traj_num"]
for ens_num in picked.keys():
pn_old = picked[ens_num]["pn_old"]
out_traj = picked[ens_num]["traj"]
self.ensembles[ens_num + 1] = picked[ens_num]["ens"]
for idx, lock in enumerate(self.locked):
if str(pn_old) in lock[1]:
self.locked.pop(idx)
# if path is new: number and save the path:
if out_traj.path_number is None or md_items["status"] == "ACC":
# keep track of the highest order value seen during the sim
if ens_num != -1 and out_traj.ordermax[0] > self.maxop:
self.maxop = out_traj.ordermax[0]
# move to accept:
ens_save_idx = self.traj_data[pn_old]["ens_save_idx"]
out_traj.path_number = traj_num
data = {
"path": out_traj,
"dir": os.path.join(
os.getcwd(), self.config["simulation"]["load_dir"]
),
}
out_traj = self.pstore.output(self.cstep, data)
self.traj_data[traj_num] = {
"frac": np.zeros(self.n, dtype="longdouble"),
"max_op": out_traj.ordermax,
"min_op": out_traj.ordermin,
"length": out_traj.length,
"weights": out_traj.weights,
"adress": out_traj.adress,
"ens_save_idx": ens_save_idx,
}
if self.native_output:
self.capture_native_row(
traj_num, out_traj, md_items["status"]
)
traj_num += 1
if self.config["output"]["delete_old"] and pn_old > self.n - 2:
if len(self.pn_olds) > self.n - 2:
pn_old_del, del_dic = next(iter(self.pn_olds.items()))
load_dir = self.config["simulation"]["load_dir"]
path_dir = os.path.join(load_dir, pn_old_del)
if self.config["output"]["delete_old_all"]:
# delete_old_all: remove the entire old path
# directory in one step. shutil.rmtree is robust
# to any files still present under accepted/ --
# on restart the reconstructed adress list may
# not enumerate the loaded trajectory file, which
# left os.rmdir('.../accepted') failing with
# "Directory not empty". (Config validation
# forbids combining this with keep_maxop_trajs.)
if os.path.isdir(path_dir):
shutil.rmtree(path_dir)
elif self.config["output"]["keep_maxop_trajs"]:
# delete trajectory files if low orderp (infinit)
# and directory is not a symlink
if del_dic["max_op"][
0
] < self.maxop and not os.path.islink(path_dir):
# update maxop and then delete|
for adress in del_dic["adress"]:
os.remove(adress)
else:
# delete trajectory files
for adress in del_dic["adress"]:
os.remove(adress)
# pop the deleted path.
self.pn_olds.pop(pn_old_del)
# keep delete list:
if len(self.pn_olds) <= self.n - 2:
self.pn_olds[str(pn_old)] = {
"adress": self.traj_data[pn_old]["adress"],
"max_op": self.traj_data[pn_old]["max_op"],
}
pn_news.append(out_traj.path_number)
self.add_traj(ens_num, out_traj, valid=out_traj.weights)
# record weights
locked_trajs = self.locked_paths()
if self._last_prob is None:
self.prob
# Per-cycle frac increments, captured per ensemble for the native
# output route. ``last_frac_increment[j]`` is the list of
# ``(path_number, frac_increment)`` pairs the live paths
# contribute to ensemble j this cycle; the native writer turns
# each into a pathensemble.txt row (Weight = 1 / frac_increment).
if self.native_output:
self.last_frac_increment = {j: [] for j in range(self.n - 1)}
for idx, live in enumerate(self.live_paths()):
if live not in locked_trajs:
increment = self._last_prob[:-1][idx, :]
self.traj_data[live]["frac"] += increment
if self.native_output:
for j in range(self.n - 1):
if increment[j] > 0.0:
self.last_frac_increment[j].append(
(live, float(increment[j]))
)
# write succ data to the path-ensemble data file. On the native
# route we do NOT write the inf-format infswap_data.txt -- the
# native per-ensemble pathensemble.txt below is the canonical
# output, and a stray infswap_data.txt is not native structure.
if md_items["status"] == "ACC" and not self.native_output:
write_path_ensemble_data(self, md_items["pnum_old"])
if self.native_output:
write_native_pathensemble_data(self)
self.prune_native_rows()
self.sort_trajstate()
cdict = self.config["current"]
cdict["traj_num"] = traj_num
cdict["wsubcycles"][md_items["pin"]] += md_items["subcycles"]
cdict["tsubcycles"] = int(sum(self.config["current"]["wsubcycles"]))
self.cworker = md_items["pin"]
if self.printing():
self.print_shooted(md_items, pn_news)
# save for possible restart
self.write_toml()
return md_items
[docs] def load_paths(self, paths):
"""Load paths."""
size = self.n - 1
# we add all the i+ paths.
for i in range(size - 1):
paths[i + 1].weights = calc_cv_vector(
paths[i + 1],
self.config["simulation"]["interfaces"],
self.mc_moves,
lambda_minus_one=self.config["simulation"]["tis_set"][
"lambda_minus_one"
],
cap=self.cap,
)
self.add_traj(
ens=i,
traj=paths[i + 1],
valid=paths[i + 1].weights,
count=False,
)
pnum = paths[i + 1].path_number
frac = self.config["current"]["frac"].get(
str(pnum), np.zeros(size + 1)
)
self.traj_data[pnum] = {
"ens_save_idx": i + 1,
"max_op": paths[i + 1].ordermax,
"min_op": paths[i + 1].ordermin,
"length": paths[i + 1].length,
"adress": paths[i + 1].adress,
"weights": paths[i + 1].weights,
"frac": np.array(frac, dtype="longdouble"),
}
if self.native_output:
self.capture_native_row(pnum, paths[i + 1], "ACC")
# keep track of max op of i+ path
if paths[i + 1].ordermax[0] > self.maxop:
self.maxop = paths[i + 1].ordermax[0]
# add minus path:
paths[0].weights = (1.0,)
pnum = paths[0].path_number
self.add_traj(
ens=-1, traj=paths[0], valid=paths[0].weights, count=False
)
frac = self.config["current"]["frac"].get(
str(pnum), np.zeros(size + 1)
)
self.traj_data[pnum] = {
"ens_save_idx": 0,
"max_op": paths[0].ordermax,
"min_op": paths[0].ordermin,
"length": paths[0].length,
"weights": paths[0].weights,
"adress": paths[0].adress,
"frac": np.array(frac, dtype="longdouble"),
}
if self.native_output:
self.capture_native_row(pnum, paths[0], "ACC")
[docs] def _native_energy_engine(self):
"""Return a cached streaming engine for native energy recompute.
Built once from ``self.config`` (force field + masses, via the
engine's ``setup_streaming``) and reused for every captured path.
Returns ``None`` when the configured engine is not a streaming
internal integrator that can recompute energy from stored frames
(e.g. an external engine that reports energy through its own
files), in which case the native energy.txt keeps the engine's
in-memory terms (``nan`` when absent).
"""
if self._energy_engine_built:
return self._energy_engine
self._energy_engine_built = True
# Local import: the engines factory imports simulation helpers, so
# a module-top import would risk a cycle.
from pyretis.engines.factory import create_inf_engine # noqa: E501 pylint: disable=C0415
engine = create_inf_engine(self.config)
if engine is None or not hasattr(engine, "streaming_energies"):
self._energy_engine = None
return None
engine.setup_streaming(self.config)
self._energy_engine = engine
return self._energy_engine
[docs] def capture_native_row(self, pn, path, status):
"""Store the native pathensemble.txt column data for a path.
Only used on the native-output route; the data is read back by
:py:mod:`pyretis.inout.native_output` to format the per-ensemble
``pathensemble.txt`` rows. Stored at path-creation time because
the live ``Path`` object (with its phase points, ``generated``
move tag and status) is not retained in ``traj_data``.
Parameters
----------
pn : int
The path number.
path : object like :py:class:`pyretis.core.path.Path`
The accepted path.
status : str
The native path status (e.g. ``"ACC"``).
"""
start_op = float(path.phasepoints[0].order[0])
end_op = float(path.phasepoints[-1].order[0])
generated = path.generated
if generated is None:
generated = ("sh", 0.0, 0, 0)
# The full per-phasepoint order and energy series feed the native
# per-ensemble order.txt / energy.txt blocks. They are captured
# here (the live Path is not retained), as plain Python scalars
# so the writer needs no knowledge of the System flavour. Energy
# terms an engine does not report stay None (written as nan,
# matching the native energy-file convention).
#
# The scheduler's phase points are file-backed (the path has been
# written to and reloaded from disk by ``pstore.output``), so they
# carry positions and velocities but no in-memory vpot/ekin. As
# vpot/ekin are exact functions of the stored frame, a streaming
# engine recomputes them per frame (one trajectory read each),
# restoring parity with the native energy.txt. etot/temp are left
# as nan, matching the native internal-engine convention.
recomputed = None
if path.phasepoints and all(
getattr(pp, "config", None) is not None
and getattr(pp, "particles", None) is None
for pp in path.phasepoints
):
engine = self._native_energy_engine()
if engine is not None:
recomputed = engine.streaming_energies(path.phasepoints)
order_series = []
energy_series = []
for idx, phasepoint in enumerate(path.phasepoints):
order_series.append(
tuple(float(orderp) for orderp in phasepoint.order)
)
energy = {}
if recomputed is not None:
vpot, ekin = recomputed[idx]
energy["vpot"] = vpot
energy["ekin"] = ekin
energy["etot"] = None
energy["temp"] = None
else:
for key in ("vpot", "ekin", "etot", "temp"):
value = getattr(phasepoint, key, None)
if value is None:
particles = getattr(phasepoint, "particles", None)
if particles is not None:
value = getattr(particles, key, None)
energy[key] = None if value is None else float(value)
energy_series.append(energy)
self.native_rows[pn] = {
"generated": generated,
"status": status,
"length": path.length,
"ordermax": path.ordermax,
"ordermin": path.ordermin,
"start_op": start_op,
"end_op": end_op,
"order_series": order_series,
"energy_series": energy_series,
# Per-ensemble HA crossing-count weight (wf/ss) -- the native
# writer divides the occupancy by it (the WHAM Cxy/HA
# unweighting). ``Path.ha_weights`` defaults to ``weights`` (the
# cv), so this is the cv: compute_weight for wf/ss, 1 otherwise.
"ha_weights": getattr(path, "ha_weights", None),
}
[docs] def prune_native_rows(self):
"""Drop captured native rows for paths that are no longer live.
Only live paths receive frac increments (see ``treat_output``),
so a path that has left every ensemble can never contribute
another native output row. Its captured order/energy series
would otherwise accumulate without bound over a long run.
"""
keep = set(self.live_paths())
keep.update(self.locked_paths())
for pn in list(self.native_rows):
if pn not in keep:
del self.native_rows[pn]
[docs] def initiate_ensembles(self):
"""Create all the ensemble dicts from the TOML config dict."""
intfs = self.config["simulation"]["interfaces"]
lambda_minus_one = self.config["simulation"]["tis_set"][
"lambda_minus_one"
]
ens_intfs = []
# set intfs for [0-] and [0+]
if lambda_minus_one is not False:
ens_intfs.append(
[lambda_minus_one, (lambda_minus_one + intfs[0]) / 2, intfs[0]]
)
else:
ens_intfs.append([float("-inf"), intfs[0], intfs[0]])
ens_intfs.append([intfs[0], intfs[0], intfs[-1]])
# set interfaces and set detect for [1+], [2+], ...
reactant, product = intfs[0], intfs[-1]
for i in range(len(intfs) - 2):
middle = intfs[i + 1]
ens_intfs.append([reactant, middle, product])
# create all path ensembles
pensembles = {}
for i, ens_intf in enumerate(ens_intfs):
pensembles[i] = {
"interfaces": tuple(ens_intf),
"tis_set": self.config["simulation"]["tis_set"],
"mc_move": self.config["simulation"]["shooting_moves"][i],
"ens_name": f"{i:03d}",
# ∞REPPTIS: the positive PPTIS ensembles accept paths that
# start on EITHER side (["L", "R"]) so both L- and R-starting
# partial paths (LM*/RM*) are sampled; the first ensemble
# starts on the right (the permeability flux side). This is
# the infretis dz24/infpp convention. Standard ∞RETIS keeps
# the L-only positive-ensemble start condition.
"start_cond": (
("R" if i == 0 else ["L", "R"])
if self.repptis
else (
["L", "R"]
if lambda_minus_one is not False and i == 0
else ("R" if i == 0 else "L")
)
),
}
self.ensembles = pensembles
[docs]def write_path_ensemble_data(state, pn_archive):
"""Write data to the path-ensemble data file (``infswap_data.txt``)."""
traj_data = state.traj_data
size = state.n
with open(state.data_file, "a") as fp:
for pn in pn_archive:
string = ""
string += f"\t{pn:3.0f}\t"
string += f"{traj_data[pn]['length']:5.0f}" + "\t"
string += f"{traj_data[pn]['max_op'][0]:8.5f}" + "\t"
frac = []
weight = []
if len(traj_data[pn]["weights"]) == 1:
f0 = traj_data[pn]["frac"][0]
w0 = traj_data[pn]["weights"][0]
frac.append("----" if f0 == 0.0 else str(f0))
weight.append("----" if f0 == 0.0 else str(w0))
frac += ["----"] * (size - 2)
weight += ["----"] * (size - 2)
else:
frac.append("----")
weight.append("----")
for w0, f0 in zip(
traj_data[pn]["weights"][:-1], traj_data[pn]["frac"][1:-1]
):
frac.append("----" if f0 == 0.0 else str(f0))
weight.append("----" if f0 == 0.0 else str(w0))
fp.write(
string + "\t".join(frac) + "\t" + "\t".join(weight) + "\t\n"
)
traj_data.pop(pn)