# Copyright (c) 2026, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Collection of statistical methods for PyVisA.
Important methods defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
correlation_matrix (:py:func:`.correlation_matrix`)
Method for displaying the correlations from the simulation variables.
gaussian_mixture (:py:func:`.gaussian_mixture`)
Generic method for performing gaussian mixture clustering
on simulation data.
hierarchical (:py:func:`.hierarchical`)
Generic method for performing hierarchical clustering on simulation data.
k_means (:py:func:`.k_means`)
Generic method for performing K-means clustering on simulation data.
pyvis_pca (:py:func:`.pyvisa_pca`)
Generic method for performing pca on simulation data.
spectral (:py:func:`.spectral`)
Generic method for performing spectral clustering on simulation data.
support_vector_machine (:py:func:`.support_vector_machine`)
Generic method for performing support vector machine classification
on simulation data.
"""
import graphviz
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.cluster import AgglomerativeClustering, SpectralClustering, \
MiniBatchKMeans
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier
from sklearn.mixture import GaussianMixture
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier, export_graphviz
[docs]def _lambda_max(lambda_values):
"""Return the largest progress-coordinate value in one trajectory."""
lambda_values = np.asarray(lambda_values, dtype=float)
if lambda_values.ndim != 1 or lambda_values.size == 0:
raise ValueError('lambda values must be a non-empty 1D array')
if not np.all(np.isfinite(lambda_values)):
raise ValueError('lambda values must be finite')
return float(np.max(lambda_values))
[docs]def _first_crossing_index(lambda_values, lambda_c):
"""Return the first frame index with lambda >= lambda_c.
Parameters
----------
lambda_values : array_like, shape (n_frames,)
Progress-coordinate values along one trajectory.
lambda_c : float
Crossing interface.
Returns
-------
index : int or None
First index crossing ``lambda_c``. ``None`` is returned when the
trajectory never reaches the interface.
"""
lambda_values = np.asarray(lambda_values, dtype=float)
hits = np.flatnonzero(lambda_values >= lambda_c)
if hits.size == 0:
return None
return int(hits[0])
[docs]def _as_2d_cv(cv_values):
"""Convert 1D or multidimensional CV data to a 2D frame array."""
cv_values = np.asarray(cv_values, dtype=float)
if cv_values.ndim == 1:
cv_values = cv_values[:, np.newaxis]
if cv_values.ndim != 2 or cv_values.shape[0] == 0:
raise ValueError('CV values must have shape (n_frames,) or '
'(n_frames, n_cv)')
if not np.all(np.isfinite(cv_values)):
raise ValueError('CV values must be finite')
return cv_values
[docs]def _coerce_trajectory(trajectory):
"""Return a normalized trajectory dictionary for WHAM analysis."""
if isinstance(trajectory, dict):
lambda_values = None
cv_values = None
for key in ('lambda', 'lambda_values', 'order_parameter',
'orderparam', 'progress_coordinate'):
if key in trajectory:
lambda_values = trajectory[key]
break
for key in ('cv', 'cvs', 'collective_variables'):
if key in trajectory:
cv_values = trajectory[key]
break
if lambda_values is None or cv_values is None:
raise ValueError('trajectory dictionaries must contain lambda '
'and cv data')
else:
try:
lambda_values, cv_values = trajectory
except (TypeError, ValueError) as exc:
raise ValueError('trajectories must be dictionaries or '
'(lambda, cv) pairs') from exc
lambda_values = np.asarray(lambda_values, dtype=float)
cv_values = _as_2d_cv(cv_values)
if lambda_values.ndim != 1 or lambda_values.size == 0:
raise ValueError('lambda values must be a non-empty 1D array')
if cv_values.shape[0] != lambda_values.size:
raise ValueError('lambda and CV arrays must have the same number '
'of frames')
if not np.all(np.isfinite(lambda_values)):
raise ValueError('lambda values must be finite')
return {'lambda': lambda_values,
'cv': cv_values,
'lambda_max': _lambda_max(lambda_values)}
[docs]def _prepare_path_ensembles(path_ensembles):
"""Validate and normalize a nested sequence of path ensembles."""
if not path_ensembles:
raise ValueError('at least one path ensemble is required')
prepared = []
n_cv = None
for ensemble in path_ensembles:
if not ensemble:
raise ValueError('path ensembles must not be empty')
prepared_ensemble = [_coerce_trajectory(path) for path in ensemble]
for path in prepared_ensemble:
if n_cv is None:
n_cv = path['cv'].shape[1]
elif path['cv'].shape[1] != n_cv:
raise ValueError('all trajectories must have the same '
'number of CV columns')
prepared.append(prepared_ensemble)
return prepared, n_cv
[docs]def _validate_grid(grid, name):
"""Validate a strictly increasing 1D numerical grid."""
grid = np.asarray(grid, dtype=float)
if grid.ndim != 1 or grid.size < 2:
raise ValueError(f'{name} must be a 1D array with at least 2 values')
if not np.all(np.isfinite(grid)):
raise ValueError(f'{name} values must be finite')
if np.any(np.diff(grid) <= 0):
raise ValueError(f'{name} must be strictly increasing')
return grid
[docs]def _make_lambda_grid(interfaces, lambda_grid=None, lambda_bin_width=None):
"""Return the fine lambda grid used for WHAM output."""
if lambda_grid is not None:
return _validate_grid(lambda_grid, 'lambda_grid')
if lambda_bin_width is None:
return np.linspace(interfaces[0], interfaces[-1], 200)
if lambda_bin_width <= 0:
raise ValueError('lambda_bin_width must be positive')
stop = interfaces[-1] + 0.5 * lambda_bin_width
return np.arange(interfaces[0], stop, lambda_bin_width, dtype=float)
[docs]def _make_cv_edges(path_ensembles, n_cv, cv_bin_widths, cv_ranges=None):
"""Create configurable histogram edges for CV-space bins."""
widths = np.asarray(cv_bin_widths, dtype=float)
if widths.ndim == 0:
widths = np.repeat(widths, n_cv)
if widths.shape != (n_cv,) or np.any(widths <= 0):
raise ValueError('cv_bin_widths must be a positive scalar or one '
'positive width per CV')
if cv_ranges is None:
all_cv = np.vstack([path['cv'] for ensemble in path_ensembles
for path in ensemble])
low = np.min(all_cv, axis=0)
high = np.max(all_cv, axis=0)
else:
ranges = np.asarray(cv_ranges, dtype=float)
if ranges.shape != (n_cv, 2):
raise ValueError('cv_ranges must have shape (n_cv, 2)')
low = ranges[:, 0]
high = ranges[:, 1]
if np.any(~np.isfinite(ranges)) or np.any(high <= low):
raise ValueError('each CV range must be finite and increasing')
edges = []
for dim, width in enumerate(widths):
lo = low[dim]
hi = high[dim]
if cv_ranges is None:
if lo == hi:
lo -= 0.5 * width
hi += 0.5 * width
lo = np.floor(lo / width) * width
hi = np.ceil(hi / width) * width
edge = np.arange(lo, hi + 0.5 * width, width, dtype=float)
if edge.size < 2 or edge[-1] < hi:
edge = np.append(edge, edge[-1] + width)
edges.append(edge)
return edges
[docs]def _cv_bin_index(cv_point, cv_edges):
"""Return the flattened CV-bin index for one CV point."""
indices = []
shape = []
for value, edges in zip(cv_point, cv_edges):
shape.append(edges.size - 1)
idx = np.searchsorted(edges, value, side='right') - 1
if idx == edges.size - 1 and np.isclose(value, edges[-1]):
idx -= 1
if idx < 0 or idx >= edges.size - 1:
return None
indices.append(idx)
return np.ravel_multi_index(tuple(indices), tuple(shape))
[docs]def _k_index(values, interfaces, n_ensembles):
"""Return K(lambda) from the WHAM expression."""
values = np.asarray(values, dtype=float)
k_values = np.searchsorted(interfaces, values, side='left') - 1
return np.clip(k_values, 0, n_ensembles - 1).astype(int)
[docs]def _interface_probabilities(lambda_maxima, interfaces):
"""Compute WHAM crossing probabilities at the TIS interfaces."""
n_ensembles = len(lambda_maxima)
probabilities = np.zeros(len(interfaces), dtype=float)
probabilities[0] = 1.0
n_paths = np.array([values.size for values in lambda_maxima],
dtype=float)
for idx in range(1, len(interfaces)):
last = min(idx, n_ensembles)
numerator = sum(np.count_nonzero(lambda_maxima[j] >= interfaces[idx])
for j in range(last))
previous = probabilities[:last]
denominator_terms = np.divide(
n_paths[:last],
previous,
out=np.full(last, np.inf, dtype=float),
where=previous > 0)
denominator = np.sum(denominator_terms)
if numerator == 0 or not np.isfinite(denominator) or denominator <= 0:
probabilities[idx] = 0.0
else:
probabilities[idx] = numerator / denominator
return probabilities
[docs]def _wham_core(path_ensembles, interfaces, lambda_grid, cv_edges):
"""Implement path-sampling WHAM after input normalization."""
n_ensembles = len(path_ensembles)
lambda_maxima = [np.array([path['lambda_max'] for path in ensemble],
dtype=float)
for ensemble in path_ensembles]
n_paths = np.array([values.size for values in lambda_maxima],
dtype=float)
interface_prob = _interface_probabilities(lambda_maxima, interfaces)
ensemble_prob = interface_prob[:n_ensembles]
q_factors = np.zeros(n_ensembles, dtype=float)
for idx in range(n_ensembles):
terms = np.divide(n_paths[:idx + 1],
ensemble_prob[:idx + 1],
out=np.full(idx + 1, np.inf, dtype=float),
where=ensemble_prob[:idx + 1] > 0)
denominator = np.sum(terms)
if np.isfinite(denominator) and denominator > 0:
q_factors[idx] = 1.0 / denominator
k_grid = _k_index(lambda_grid, interfaces, n_ensembles)
absolute_crossing = np.zeros(lambda_grid.size, dtype=float)
for alpha, lambda_value in enumerate(lambda_grid):
k_value = k_grid[alpha]
count = sum(np.count_nonzero(lambda_maxima[j] >= lambda_value)
for j in range(k_value + 1))
absolute_crossing[alpha] = q_factors[k_value] * count
absolute_crossing = np.clip(absolute_crossing, 0.0, 1.0)
with np.errstate(divide='ignore', invalid='ignore'):
conditional = absolute_crossing[np.newaxis, :] / \
absolute_crossing[:, np.newaxis]
valid_pair = lambda_grid[np.newaxis, :] >= lambda_grid[:, np.newaxis]
conditional[~valid_pair] = np.nan
conditional[~np.isfinite(conditional)] = np.nan
cv_shape = tuple(edge.size - 1 for edge in cv_edges)
n_bins = int(np.prod(cv_shape))
n_lambda = lambda_grid.size
reactive = np.zeros((n_lambda, n_lambda, n_bins), dtype=float)
unreactive = np.zeros_like(reactive)
all_paths = [path for ensemble in path_ensembles for path in ensemble]
for path in all_paths:
path_max = path['lambda_max']
path_k = _k_index([path_max], interfaces, n_ensembles)[0]
weight = q_factors[path_k]
if weight == 0.0:
continue
alpha_indices = np.flatnonzero(lambda_grid <= path_max)
for alpha in alpha_indices:
crossing = _first_crossing_index(path['lambda'],
lambda_grid[alpha])
if crossing is None:
continue
bin_index = _cv_bin_index(path['cv'][crossing], cv_edges)
if bin_index is None:
continue
beta_mask = lambda_grid >= lambda_grid[alpha]
reactive_beta = beta_mask & (lambda_grid <= path_max)
unreactive_beta = beta_mask & (lambda_grid > path_max)
reactive[alpha, reactive_beta, bin_index] += weight
unreactive[alpha, unreactive_beta, bin_index] += weight
normalizers = absolute_crossing.copy()
with np.errstate(divide='ignore', invalid='ignore'):
reactive = np.divide(
reactive,
normalizers[:, np.newaxis, np.newaxis],
out=np.zeros_like(reactive),
where=normalizers[:, np.newaxis, np.newaxis] > 0)
unreactive = np.divide(
unreactive,
normalizers[:, np.newaxis, np.newaxis],
out=np.zeros_like(unreactive),
where=normalizers[:, np.newaxis, np.newaxis] > 0)
reactive[~valid_pair, :] = 0.0
unreactive[~valid_pair, :] = 0.0
total = reactive + unreactive
with np.errstate(divide='ignore', invalid='ignore'):
overlap_bins = np.divide(reactive * unreactive, total,
out=np.zeros_like(total),
where=total > 0)
reactive_probability = np.sum(reactive, axis=2)
overlap = np.divide(np.sum(overlap_bins, axis=2),
reactive_probability,
out=np.full_like(reactive_probability, np.nan),
where=reactive_probability > 0)
predictive_capacity = 1.0 - overlap
predictive_capacity[~valid_pair] = np.nan
positive_reactive = valid_pair & (reactive_probability > 0)
predictive_capacity[positive_reactive] = np.maximum(
predictive_capacity[positive_reactive],
reactive_probability[positive_reactive])
predictive_capacity = np.clip(predictive_capacity, 0.0, 1.0)
enhancement = np.divide(predictive_capacity, reactive_probability,
out=np.full_like(predictive_capacity, np.nan),
where=reactive_probability > 0)
enhancement[positive_reactive] = np.maximum(enhancement[positive_reactive],
1.0)
result_shape = (n_lambda, n_lambda) + cv_shape
return {
'interfaces': interfaces,
'lambda_grid': lambda_grid,
'cv_edges': cv_edges,
'cv_centers': [0.5 * (edge[:-1] + edge[1:]) for edge in cv_edges],
'interface_probabilities': interface_prob,
'absolute_crossing': absolute_crossing,
'crossing_probabilities': conditional,
'reactive': reactive.reshape(result_shape),
'unreactive': unreactive.reshape(result_shape),
'total': total.reshape(result_shape),
'overlap': overlap,
'predictive_capacity': predictive_capacity,
'enhancement': enhancement,
}
[docs]def _bootstrap_wham(path_ensembles, interfaces, lambda_grid, cv_edges,
n_bootstrap, random_state):
"""Return bootstrap standard errors by resampling trajectories."""
rng = np.random.default_rng(random_state)
samples = {'absolute_crossing': [],
'crossing_probabilities': [],
'predictive_capacity': [],
'enhancement': []}
for _ in range(n_bootstrap):
resampled = []
for ensemble in path_ensembles:
indices = rng.integers(0, len(ensemble), size=len(ensemble))
resampled.append([ensemble[idx] for idx in indices])
result = _wham_core(resampled, interfaces, lambda_grid, cv_edges)
for key in samples:
samples[key].append(result[key])
bootstrap = {}
for key, values in samples.items():
values = np.asarray(values)
finite = np.isfinite(values)
count = np.sum(finite, axis=0)
total = np.sum(np.where(finite, values, 0.0), axis=0)
mean = np.divide(total, count, out=np.full_like(total, np.nan),
where=count > 0)
squared = np.where(finite, (values - mean) ** 2, 0.0)
variance = np.divide(np.sum(squared, axis=0), count - 1,
out=np.full_like(mean, np.nan),
where=count > 1)
bootstrap[f'{key}_stderr'] = np.sqrt(variance)
return bootstrap
[docs]def wham(path_ensembles, interfaces, cv_bin_widths, lambda_grid=None,
lambda_bin_width=None, cv_ranges=None, n_bootstrap=0,
random_state=None):
r"""Path-sampling weighted histogram analysis method (WHAM).
This implements the WHAM reweighting described by van Erp et al.
for path-sampling trajectories. It computes WHAM-reweighted crossing
probabilities, first-crossing distributions in arbitrary CV space,
predictive capacity, enhancement, and optional bootstrap uncertainty
estimates over trajectories.
Parameters
----------
path_ensembles : sequence
Nested sequence of path ensembles ``[0+], [1+], ...``. Each
trajectory may be a dictionary with lambda data under one of
``'lambda'``, ``'order_parameter'`` or ``'progress_coordinate'``
and CV data under ``'cv'``/``'cvs'``; or a ``(lambda, cv)`` pair.
Lambda arrays must have shape ``(n_frames,)``. CV arrays may have
shape ``(n_frames,)`` or ``(n_frames, n_cv)``.
interfaces : array_like, shape (n_ensembles + 1,)
TIS/RETIS/FFS interfaces, including the reactant and product
interfaces. The number of path ensembles must be one fewer than
the number of interfaces.
cv_bin_widths : float or array_like
Histogram bin width for each CV dimension.
lambda_grid : array_like, optional
Fine lambda grid for output. It does not need to coincide with
the TIS interfaces.
lambda_bin_width : float, optional
Used to create ``lambda_grid`` when ``lambda_grid`` is not given.
cv_ranges : array_like, optional
Explicit CV histogram ranges with shape ``(n_cv, 2)``.
n_bootstrap : int, optional
Number of bootstrap samples. Resampling is done within each path
ensemble.
random_state : int, optional
Seed for bootstrap resampling.
Returns
-------
result : dict
Dictionary containing ``crossing_probabilities`` for
:math:`P_A(\\lambda_r | \\lambda_c)`, ``reactive``,
``unreactive`` and ``total`` first-crossing distributions,
``predictive_capacity``, ``enhancement`` and optional
``bootstrap`` standard errors.
Notes
-----
Empty CV bins are handled by zero-contribution divisions. Values with
``lambda_r < lambda_c`` are outside the intended domain and are
returned as ``nan`` for scalar probability-like outputs.
"""
path_ensembles, n_cv = _prepare_path_ensembles(path_ensembles)
interfaces = _validate_grid(interfaces, 'interfaces')
if len(path_ensembles) != interfaces.size - 1:
raise ValueError('the number of path ensembles must be one fewer '
'than the number of interfaces')
lambda_grid = _make_lambda_grid(interfaces, lambda_grid,
lambda_bin_width)
if lambda_grid[0] < interfaces[0] or lambda_grid[-1] > interfaces[-1]:
raise ValueError('lambda_grid must stay within the first and last '
'interfaces')
cv_edges = _make_cv_edges(path_ensembles, n_cv, cv_bin_widths,
cv_ranges=cv_ranges)
result = _wham_core(path_ensembles, interfaces, lambda_grid, cv_edges)
if n_bootstrap:
if n_bootstrap < 2:
raise ValueError('n_bootstrap must be 0 or at least 2')
result['bootstrap'] = _bootstrap_wham(path_ensembles, interfaces,
lambda_grid, cv_edges,
n_bootstrap, random_state)
return result
[docs]def _marginal_distribution(distribution, cv_axis):
"""Return a 1D marginal distribution along one CV axis."""
axes = tuple(axis for axis in range(distribution.ndim)
if axis != cv_axis)
return np.sum(distribution, axis=axes)
[docs]def plot_wham(result, lambda_c_index=None, lambda_r_index=None, cv_axis=0,
cmap='viridis', show=True):
"""Create publication-style PyVisA plots for path-sampling WHAM.
The figure contains heat maps for crossing probability, predictive
capacity and enhancement as functions of ``lambda_c`` and
``lambda_r``. The fourth panel shows the reactive, unreactive and
total first-crossing distributions for one selected
``(lambda_c, lambda_r)`` pair, marginalized onto one CV when the CV
space is multidimensional.
Parameters
----------
result : dict
Result dictionary returned by :func:`wham`.
lambda_c_index, lambda_r_index : int, optional
Indices selecting the distribution panel. Defaults to the middle
valid lambda-c value and the final lambda-r value.
cv_axis : int, optional
CV axis used for the marginal distribution panel.
cmap : string, optional
Matplotlib colormap.
show : bool, optional
If True, call :func:`matplotlib.pyplot.show`.
Returns
-------
figure, axes : tuple
Matplotlib figure and axes.
"""
lambda_grid = result['lambda_grid']
if lambda_c_index is None:
lambda_c_index = max(0, len(lambda_grid) // 2 - 1)
if lambda_r_index is None:
lambda_r_index = len(lambda_grid) - 1
if lambda_r_index < lambda_c_index:
raise ValueError('lambda_r_index must be >= lambda_c_index')
if cv_axis < 0 or cv_axis >= len(result['cv_centers']):
raise ValueError('cv_axis is out of range')
figure, axes = plt.subplots(2, 2, constrained_layout=True)
heatmaps = [('Crossing probability',
result['crossing_probabilities']),
('Predictive capacity',
result['predictive_capacity']),
('Enhancement',
result['enhancement'])]
extent = [lambda_grid[0], lambda_grid[-1],
lambda_grid[0], lambda_grid[-1]]
for axis, (title, data) in zip(axes.flat[:3], heatmaps):
image = axis.imshow(data.T, origin='lower', extent=extent,
aspect='auto', cmap=cmap)
figure.colorbar(image, ax=axis)
axis.set_title(title)
axis.set_xlabel(r'$\lambda_c$')
axis.set_ylabel(r'$\lambda_r$')
dist_axis = axes.flat[3]
centers = result['cv_centers'][cv_axis]
selectors = (lambda_c_index, lambda_r_index)
reactive = _marginal_distribution(result['reactive'][selectors],
cv_axis)
unreactive = _marginal_distribution(result['unreactive'][selectors],
cv_axis)
total = _marginal_distribution(result['total'][selectors], cv_axis)
dist_axis.plot(centers, total, label='total', color='black')
dist_axis.plot(centers, reactive, label='reactive', color='tab:green')
dist_axis.plot(centers, unreactive, label='unreactive',
color='tab:orange')
dist_axis.set_title('First-crossing distributions')
dist_axis.set_xlabel(f'CV {cv_axis}')
dist_axis.set_ylabel('Probability')
dist_axis.legend(frameon=False)
if show:
plt.show()
return figure, axes
[docs]def _plot_pca_results(n_pca, principal_df, loadings, pca_info, cmap):
"""Plot the results of a PCA analysis.
Parameters
----------
n_pca : integer
Number of principal components.
principal_df : object like pandas.DataFrame
Principal components data.
loadings : object like pandas.DataFrame
PCA loadings.
pca_info : dict
Dict with keys 'model' (fitted PCA), 'cols' (column labels),
'features' (feature labels).
cmap : string
Matplotlib colormap.
"""
pca_model = pca_info['model']
cols = pca_info['cols']
features = pca_info['features']
# Plot PC1 vs PC2
if n_pca > 1:
fig_1 = plt.figure()
axis_1 = fig_1.add_subplot(111)
axis_1.scatter(principal_df['PC1'], principal_df['PC2'],
edgecolors=None, alpha=0.5,
cmap=cmap)
axis_1.set_xlabel('PC1')
axis_1.set_ylabel('PC2')
# Plot the loadings
fig_2 = plt.figure()
axis_2 = fig_2.add_subplot(111)
fig_2.colorbar(axis_2.matshow(loadings))
axis_2.set_xticks(np.arange(len(cols)))
axis_2.set_yticks(np.arange(len(features)))
axis_2.set_xticklabels(cols)
axis_2.set_yticklabels(features)
# Plot Cumulative explained variance
fig_3 = plt.figure()
axis_3 = fig_3.add_subplot(111)
axis_3.plot(np.cumsum(pca_model.explained_variance_ratio_))
axis_3.set_xlabel('Principal components')
axis_3.set_ylabel('Cumulative explained variance')
plt.show()
[docs]def pyvisa_pca(n_pca, settings, data, cmap):
"""Perform PCA on ensemble data.
This function performs PCA on data from the selected ensembles
and stores the data to a hdf-file for further analysis, plots the
first two principal components against each other,
the cumulative explained variance and the loadings.
The hdf5 file contains the keywords:
* `sim_data`: pandas Dataframe containing the simulation data.
* `PC`: Dataframe containing the data from the principal components.
* `loadings`: pandas Dataframe containing the loadings.
* `explained`: pandas Series containing the simulation data.
* `correlation_matrix`: Dataframe containing the correlation matrix.
Parameters
----------
n_pca : integer
Number of clusters.
settings : dict
Settings from GUI.
data : object like pandas.DataFrame
Simulation data from chosen ensembles.
cmap : string
Matplotlib colormap.
"""
basename = f'PCA_data_{settings["fol"]}_{n_pca}.hdf'
data.to_hdf(basename, key='sim_data', mode='w')
features = data.columns
data = StandardScaler().fit_transform(data)
cols = [f'PC{i}' for i in range(1, n_pca + 1)]
pca_model = PCA(n_components=n_pca)
principal_comps = pca_model.fit_transform(data)
principal_df = pd.DataFrame(principal_comps, columns=cols)
explained = pd.Series(pca_model.explained_variance_ratio_)
explained.index += 1
loadings = pd.DataFrame(pca_model.components_.T, columns=cols,
index=features)
load_corr = pca_model.components_.T * np.sqrt(
pca_model.explained_variance_)
load_corr_mat = pd.DataFrame(load_corr, columns=cols,
index=features)
principal_df.to_hdf(basename, key='PC', mode='a')
loadings.to_hdf(basename, key='loadings', mode='a')
load_corr_mat.to_hdf(basename, key='correlation_matrix', mode='a')
explained.to_hdf(basename, key='explained', mode='a')
pca_info = {'model': pca_model, 'cols': cols, 'features': features}
_plot_pca_results(n_pca, principal_df, loadings, pca_info, cmap)
[docs]def k_means(n_clusters, data, settings, cmap):
"""Perform K-means clustering.
This function performs K-means clustering on simulation data
with a chosen amount of clusters, and plots the results.
Parameters
----------
n_clusters : integer
Number of clusters.
data : object like numpy.ndarray
Simulation data from chosen ensembles.
settings : dict
Settings from GUI.
cmap : string
Matplotlib colormap.
"""
model = MiniBatchKMeans(n_clusters=n_clusters)
labels = model.fit_predict(data)
fig = plt.figure()
axis = fig.add_subplot(111)
axis.scatter(data[:, 0], data[:, 1],
c=labels,
s=5,
cmap=cmap)
axis.set_title('Kmeans clustering')
axis.set_xlabel(settings['op1'])
axis.set_ylabel(settings['op2'])
plt.show()
[docs]def hierarchical(n_clusters, data, settings, cmap):
"""Perform hierarchical clustering.
This function performs hierarchical clustering on simulation data
with a chosen amount of clusters, and plots the results.
Parameters
----------
n_clusters : integer
Number of clusters.
data : object like numpy.ndarray
Simulation data from chosen ensembles.
settings : dict
Settings from GUI.
cmap : string
Matplotlib colormap.
"""
model = AgglomerativeClustering(n_clusters=n_clusters,
linkage='average')
labels = model.fit_predict(data)
fig = plt.figure()
axis = fig.add_subplot(111)
axis.scatter(data[:, 0], data[:, 1],
c=labels,
s=5,
cmap=cmap)
axis.set_title('Hierarchical clustering')
axis.set_xlabel(settings['op1'])
axis.set_ylabel(settings['op2'])
plt.show()
[docs]def spectral(n_clusters, data, settings, cmap):
"""Perform spectral clustering.
This function performs spectral clustering on simulation data
with a chosen amount of clusters, and plots the results.
Parameters
----------
n_clusters : integer
Number of clusters.
data : object like numpy.ndarray
Simulation data from chosen ensembles.
settings : dict
Settings from GUI.
cmap : string
Matplotlib colormap.
"""
model = SpectralClustering(
n_clusters=n_clusters,
affinity='nearest_neighbors',
n_neighbors=30,
assign_labels='kmeans')
labels = model.fit_predict(data)
fig = plt.figure()
axis = fig.add_subplot(111)
axis.scatter(data[:, 0], data[:, 1],
c=labels,
s=5,
cmap=cmap)
axis.set_title('Spectral clustering')
axis.set_xlabel(settings['op1'])
axis.set_ylabel(settings['op2'])
plt.show()
[docs]def gaussian_mixture(n_clusters, data, settings, cmap):
"""Perform gaussian mixture clustering.
This function performs gaussian mixture clustering on simulation data
with a chosen amount of clusters, and plots the results.
Parameters
----------
n_clusters : integer
Number of clusters.
data : object like numpy.ndarray
Simulation data from chosen ensembles.
settings : dict
Settings from GUI.
cmap : string
Matplotlib colormap.
"""
model = GaussianMixture(n_components=n_clusters)
labels = model.fit_predict(data)
fig = plt.figure()
axis = fig.add_subplot(111)
axis.scatter(data[:, 0], data[:, 1],
c=labels,
s=5,
cmap=cmap)
axis.set_title('Gaussian mixture clustering')
axis.set_xlabel(settings['op1'])
axis.set_ylabel(settings['op2'])
plt.show()
[docs]def correlation_matrix(dataframe):
"""Show correlation matrix of simulation variables.
Function which computes the Pearson correlations between parameters
and shows it in a correlation plot.
Parameters
----------
dataframe : object like pandas.DataFrame
Dataframe containing simulation data.
"""
titles = list(dataframe.columns)
fig = plt.figure()
axis = fig.add_subplot(111)
corr = axis.matshow(dataframe.corr())
fig.colorbar(corr)
axis.set_xticks(np.arange(len(titles)))
axis.set_yticks(np.arange(len(titles)))
axis.set_xticklabels(titles)
axis.set_yticklabels(titles)
plt.show()
[docs]def random_forest(xdata, ydata, depth):
"""Create random forest classification.
Parameters
----------
xdata : object like pandas.DataFrame
Pandas dataframe containing the op and cv data from selected
frames in the simulation.
ydata : object like pandas.DataFrame
Pandas dataframe containing True/False for each frame in the
selected ensembles. True if the frame is reactive, else False.
depth : integer
Depth of random forest model.
"""
rf_model = RandomForestClassifier(n_estimators=500,
max_depth=depth,
random_state=16)
ydata = ydata.values.ravel()
rf_model.fit(xdata, ydata)
important_features = np.argsort(rf_model.feature_importances_)
feature_imp = pd.Series(rf_model.feature_importances_,
index=xdata.columns).sort_values(ascending=False)
figure = plt.figure()
axis = figure.add_subplot(111)
x_ticks = [i + 0.5 for i in range(len(important_features))]
axis.bar(x_ticks, feature_imp)
axis.set_xticks(x_ticks)
axis.set_xticklabels(feature_imp.keys(), rotation=90, ha='center')
axis.set_ylabel('Gini feature importance')
plt.show()
[docs]def support_vector_machine(xdata, ydata, kernel='rbf', c_value=1.0,
gamma='scale', cmap='viridis'):
"""Create support vector machine classification.
Parameters
----------
xdata : object like pandas.DataFrame
Pandas dataframe containing the op and cv data from selected
frames in the simulation.
ydata : object like pandas.DataFrame
Pandas dataframe containing True/False for each frame in the
selected ensembles. True if the frame is reactive, else False.
kernel : string, optional
Kernel type to be used in the SVM model.
c_value : float, optional
Regularization parameter for the SVM model.
gamma : string or float, optional
Kernel coefficient for 'rbf', 'poly' and 'sigmoid' kernels.
cmap : string, optional
Matplotlib colormap.
Returns
-------
svm_model : object like sklearn.pipeline.Pipeline
Fitted support vector machine model.
"""
ydata = ydata.values.ravel()
svm_model = make_pipeline(
StandardScaler(),
SVC(kernel=kernel, C=c_value, gamma=gamma, random_state=16))
labels = svm_model.fit(xdata, ydata).predict(xdata)
data = np.asarray(xdata)
feature_names = xdata.columns.astype(str).tolist()
y_plot = data[:, 1] if data.shape[1] > 1 else np.zeros(data.shape[0])
y_label = feature_names[1] if data.shape[1] > 1 else 'Classification'
figure = plt.figure()
axis = figure.add_subplot(111)
axis.scatter(data[:, 0], y_plot,
c=labels,
s=5,
cmap=cmap)
axis.set_title('Support vector machine classification')
axis.set_xlabel(feature_names[0])
axis.set_ylabel(y_label)
plt.show()
return svm_model
[docs]def decision_tree(xdata, ydata, depth):
"""Create a decision tree.
Parameters
----------
xdata : object like pandas.DataFrame
Pandas dataframe containing the op and cv data from selected
frames i the simulation.
ydata : object like pandas.DataFrame
Pandas dataframe containing True/False for each frame in the
selected ensembles. True if the frame is reactive, else False.
depth : integer
Depth of the decision tree.
"""
names = xdata.columns.astype(str).tolist()
x_train, _, y_train, _ = train_test_split(xdata, ydata,
test_size=0.2,
stratify=ydata,
random_state=16,
shuffle=True)
clf = DecisionTreeClassifier(criterion='entropy', max_depth=depth)
clf.fit(x_train, y_train)
dot_data = export_graphviz(clf, out_file=None,
feature_names=names,
class_names=['reactive', 'unreactive'],
filled=True, rounded=True,
special_characters=True)
graph = graphviz.Source(dot_data)
graph.render('decision_tree', view=True, format='png', cleanup=True)