Source code for pyretis.pyvisa.statistical_methods

# Copyright (c) 2026, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Collection of statistical methods for PyVisA.

Important methods defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
correlation_matrix (:py:func:`.correlation_matrix`)
    Method for displaying the correlations from the simulation variables.

gaussian_mixture (:py:func:`.gaussian_mixture`)
    Generic method for performing gaussian mixture clustering
    on simulation data.

hierarchical (:py:func:`.hierarchical`)
    Generic method for performing hierarchical clustering on simulation data.

k_means (:py:func:`.k_means`)
    Generic method for performing K-means clustering on simulation data.

pyvis_pca (:py:func:`.pyvisa_pca`)
    Generic method for performing pca on simulation data.

spectral (:py:func:`.spectral`)
    Generic method for performing spectral clustering on simulation data.

support_vector_machine (:py:func:`.support_vector_machine`)
    Generic method for performing support vector machine classification
    on simulation data.

"""
import graphviz
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.cluster import AgglomerativeClustering, SpectralClustering, \
    MiniBatchKMeans
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier
from sklearn.mixture import GaussianMixture
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier, export_graphviz


[docs]def _lambda_max(lambda_values): """Return the largest progress-coordinate value in one trajectory.""" lambda_values = np.asarray(lambda_values, dtype=float) if lambda_values.ndim != 1 or lambda_values.size == 0: raise ValueError('lambda values must be a non-empty 1D array') if not np.all(np.isfinite(lambda_values)): raise ValueError('lambda values must be finite') return float(np.max(lambda_values))
[docs]def _first_crossing_index(lambda_values, lambda_c): """Return the first frame index with lambda >= lambda_c. Parameters ---------- lambda_values : array_like, shape (n_frames,) Progress-coordinate values along one trajectory. lambda_c : float Crossing interface. Returns ------- index : int or None First index crossing ``lambda_c``. ``None`` is returned when the trajectory never reaches the interface. """ lambda_values = np.asarray(lambda_values, dtype=float) hits = np.flatnonzero(lambda_values >= lambda_c) if hits.size == 0: return None return int(hits[0])
[docs]def _as_2d_cv(cv_values): """Convert 1D or multidimensional CV data to a 2D frame array.""" cv_values = np.asarray(cv_values, dtype=float) if cv_values.ndim == 1: cv_values = cv_values[:, np.newaxis] if cv_values.ndim != 2 or cv_values.shape[0] == 0: raise ValueError('CV values must have shape (n_frames,) or ' '(n_frames, n_cv)') if not np.all(np.isfinite(cv_values)): raise ValueError('CV values must be finite') return cv_values
[docs]def _coerce_trajectory(trajectory): """Return a normalized trajectory dictionary for WHAM analysis.""" if isinstance(trajectory, dict): lambda_values = None cv_values = None for key in ('lambda', 'lambda_values', 'order_parameter', 'orderparam', 'progress_coordinate'): if key in trajectory: lambda_values = trajectory[key] break for key in ('cv', 'cvs', 'collective_variables'): if key in trajectory: cv_values = trajectory[key] break if lambda_values is None or cv_values is None: raise ValueError('trajectory dictionaries must contain lambda ' 'and cv data') else: try: lambda_values, cv_values = trajectory except (TypeError, ValueError) as exc: raise ValueError('trajectories must be dictionaries or ' '(lambda, cv) pairs') from exc lambda_values = np.asarray(lambda_values, dtype=float) cv_values = _as_2d_cv(cv_values) if lambda_values.ndim != 1 or lambda_values.size == 0: raise ValueError('lambda values must be a non-empty 1D array') if cv_values.shape[0] != lambda_values.size: raise ValueError('lambda and CV arrays must have the same number ' 'of frames') if not np.all(np.isfinite(lambda_values)): raise ValueError('lambda values must be finite') return {'lambda': lambda_values, 'cv': cv_values, 'lambda_max': _lambda_max(lambda_values)}
[docs]def _prepare_path_ensembles(path_ensembles): """Validate and normalize a nested sequence of path ensembles.""" if not path_ensembles: raise ValueError('at least one path ensemble is required') prepared = [] n_cv = None for ensemble in path_ensembles: if not ensemble: raise ValueError('path ensembles must not be empty') prepared_ensemble = [_coerce_trajectory(path) for path in ensemble] for path in prepared_ensemble: if n_cv is None: n_cv = path['cv'].shape[1] elif path['cv'].shape[1] != n_cv: raise ValueError('all trajectories must have the same ' 'number of CV columns') prepared.append(prepared_ensemble) return prepared, n_cv
[docs]def _validate_grid(grid, name): """Validate a strictly increasing 1D numerical grid.""" grid = np.asarray(grid, dtype=float) if grid.ndim != 1 or grid.size < 2: raise ValueError(f'{name} must be a 1D array with at least 2 values') if not np.all(np.isfinite(grid)): raise ValueError(f'{name} values must be finite') if np.any(np.diff(grid) <= 0): raise ValueError(f'{name} must be strictly increasing') return grid
[docs]def _make_lambda_grid(interfaces, lambda_grid=None, lambda_bin_width=None): """Return the fine lambda grid used for WHAM output.""" if lambda_grid is not None: return _validate_grid(lambda_grid, 'lambda_grid') if lambda_bin_width is None: return np.linspace(interfaces[0], interfaces[-1], 200) if lambda_bin_width <= 0: raise ValueError('lambda_bin_width must be positive') stop = interfaces[-1] + 0.5 * lambda_bin_width return np.arange(interfaces[0], stop, lambda_bin_width, dtype=float)
[docs]def _make_cv_edges(path_ensembles, n_cv, cv_bin_widths, cv_ranges=None): """Create configurable histogram edges for CV-space bins.""" widths = np.asarray(cv_bin_widths, dtype=float) if widths.ndim == 0: widths = np.repeat(widths, n_cv) if widths.shape != (n_cv,) or np.any(widths <= 0): raise ValueError('cv_bin_widths must be a positive scalar or one ' 'positive width per CV') if cv_ranges is None: all_cv = np.vstack([path['cv'] for ensemble in path_ensembles for path in ensemble]) low = np.min(all_cv, axis=0) high = np.max(all_cv, axis=0) else: ranges = np.asarray(cv_ranges, dtype=float) if ranges.shape != (n_cv, 2): raise ValueError('cv_ranges must have shape (n_cv, 2)') low = ranges[:, 0] high = ranges[:, 1] if np.any(~np.isfinite(ranges)) or np.any(high <= low): raise ValueError('each CV range must be finite and increasing') edges = [] for dim, width in enumerate(widths): lo = low[dim] hi = high[dim] if cv_ranges is None: if lo == hi: lo -= 0.5 * width hi += 0.5 * width lo = np.floor(lo / width) * width hi = np.ceil(hi / width) * width edge = np.arange(lo, hi + 0.5 * width, width, dtype=float) if edge.size < 2 or edge[-1] < hi: edge = np.append(edge, edge[-1] + width) edges.append(edge) return edges
[docs]def _cv_bin_index(cv_point, cv_edges): """Return the flattened CV-bin index for one CV point.""" indices = [] shape = [] for value, edges in zip(cv_point, cv_edges): shape.append(edges.size - 1) idx = np.searchsorted(edges, value, side='right') - 1 if idx == edges.size - 1 and np.isclose(value, edges[-1]): idx -= 1 if idx < 0 or idx >= edges.size - 1: return None indices.append(idx) return np.ravel_multi_index(tuple(indices), tuple(shape))
[docs]def _k_index(values, interfaces, n_ensembles): """Return K(lambda) from the WHAM expression.""" values = np.asarray(values, dtype=float) k_values = np.searchsorted(interfaces, values, side='left') - 1 return np.clip(k_values, 0, n_ensembles - 1).astype(int)
[docs]def _interface_probabilities(lambda_maxima, interfaces): """Compute WHAM crossing probabilities at the TIS interfaces.""" n_ensembles = len(lambda_maxima) probabilities = np.zeros(len(interfaces), dtype=float) probabilities[0] = 1.0 n_paths = np.array([values.size for values in lambda_maxima], dtype=float) for idx in range(1, len(interfaces)): last = min(idx, n_ensembles) numerator = sum(np.count_nonzero(lambda_maxima[j] >= interfaces[idx]) for j in range(last)) previous = probabilities[:last] denominator_terms = np.divide( n_paths[:last], previous, out=np.full(last, np.inf, dtype=float), where=previous > 0) denominator = np.sum(denominator_terms) if numerator == 0 or not np.isfinite(denominator) or denominator <= 0: probabilities[idx] = 0.0 else: probabilities[idx] = numerator / denominator return probabilities
[docs]def _wham_core(path_ensembles, interfaces, lambda_grid, cv_edges): """Implement path-sampling WHAM after input normalization.""" n_ensembles = len(path_ensembles) lambda_maxima = [np.array([path['lambda_max'] for path in ensemble], dtype=float) for ensemble in path_ensembles] n_paths = np.array([values.size for values in lambda_maxima], dtype=float) interface_prob = _interface_probabilities(lambda_maxima, interfaces) ensemble_prob = interface_prob[:n_ensembles] q_factors = np.zeros(n_ensembles, dtype=float) for idx in range(n_ensembles): terms = np.divide(n_paths[:idx + 1], ensemble_prob[:idx + 1], out=np.full(idx + 1, np.inf, dtype=float), where=ensemble_prob[:idx + 1] > 0) denominator = np.sum(terms) if np.isfinite(denominator) and denominator > 0: q_factors[idx] = 1.0 / denominator k_grid = _k_index(lambda_grid, interfaces, n_ensembles) absolute_crossing = np.zeros(lambda_grid.size, dtype=float) for alpha, lambda_value in enumerate(lambda_grid): k_value = k_grid[alpha] count = sum(np.count_nonzero(lambda_maxima[j] >= lambda_value) for j in range(k_value + 1)) absolute_crossing[alpha] = q_factors[k_value] * count absolute_crossing = np.clip(absolute_crossing, 0.0, 1.0) with np.errstate(divide='ignore', invalid='ignore'): conditional = absolute_crossing[np.newaxis, :] / \ absolute_crossing[:, np.newaxis] valid_pair = lambda_grid[np.newaxis, :] >= lambda_grid[:, np.newaxis] conditional[~valid_pair] = np.nan conditional[~np.isfinite(conditional)] = np.nan cv_shape = tuple(edge.size - 1 for edge in cv_edges) n_bins = int(np.prod(cv_shape)) n_lambda = lambda_grid.size reactive = np.zeros((n_lambda, n_lambda, n_bins), dtype=float) unreactive = np.zeros_like(reactive) all_paths = [path for ensemble in path_ensembles for path in ensemble] for path in all_paths: path_max = path['lambda_max'] path_k = _k_index([path_max], interfaces, n_ensembles)[0] weight = q_factors[path_k] if weight == 0.0: continue alpha_indices = np.flatnonzero(lambda_grid <= path_max) for alpha in alpha_indices: crossing = _first_crossing_index(path['lambda'], lambda_grid[alpha]) if crossing is None: continue bin_index = _cv_bin_index(path['cv'][crossing], cv_edges) if bin_index is None: continue beta_mask = lambda_grid >= lambda_grid[alpha] reactive_beta = beta_mask & (lambda_grid <= path_max) unreactive_beta = beta_mask & (lambda_grid > path_max) reactive[alpha, reactive_beta, bin_index] += weight unreactive[alpha, unreactive_beta, bin_index] += weight normalizers = absolute_crossing.copy() with np.errstate(divide='ignore', invalid='ignore'): reactive = np.divide( reactive, normalizers[:, np.newaxis, np.newaxis], out=np.zeros_like(reactive), where=normalizers[:, np.newaxis, np.newaxis] > 0) unreactive = np.divide( unreactive, normalizers[:, np.newaxis, np.newaxis], out=np.zeros_like(unreactive), where=normalizers[:, np.newaxis, np.newaxis] > 0) reactive[~valid_pair, :] = 0.0 unreactive[~valid_pair, :] = 0.0 total = reactive + unreactive with np.errstate(divide='ignore', invalid='ignore'): overlap_bins = np.divide(reactive * unreactive, total, out=np.zeros_like(total), where=total > 0) reactive_probability = np.sum(reactive, axis=2) overlap = np.divide(np.sum(overlap_bins, axis=2), reactive_probability, out=np.full_like(reactive_probability, np.nan), where=reactive_probability > 0) predictive_capacity = 1.0 - overlap predictive_capacity[~valid_pair] = np.nan positive_reactive = valid_pair & (reactive_probability > 0) predictive_capacity[positive_reactive] = np.maximum( predictive_capacity[positive_reactive], reactive_probability[positive_reactive]) predictive_capacity = np.clip(predictive_capacity, 0.0, 1.0) enhancement = np.divide(predictive_capacity, reactive_probability, out=np.full_like(predictive_capacity, np.nan), where=reactive_probability > 0) enhancement[positive_reactive] = np.maximum(enhancement[positive_reactive], 1.0) result_shape = (n_lambda, n_lambda) + cv_shape return { 'interfaces': interfaces, 'lambda_grid': lambda_grid, 'cv_edges': cv_edges, 'cv_centers': [0.5 * (edge[:-1] + edge[1:]) for edge in cv_edges], 'interface_probabilities': interface_prob, 'absolute_crossing': absolute_crossing, 'crossing_probabilities': conditional, 'reactive': reactive.reshape(result_shape), 'unreactive': unreactive.reshape(result_shape), 'total': total.reshape(result_shape), 'overlap': overlap, 'predictive_capacity': predictive_capacity, 'enhancement': enhancement, }
[docs]def _bootstrap_wham(path_ensembles, interfaces, lambda_grid, cv_edges, n_bootstrap, random_state): """Return bootstrap standard errors by resampling trajectories.""" rng = np.random.default_rng(random_state) samples = {'absolute_crossing': [], 'crossing_probabilities': [], 'predictive_capacity': [], 'enhancement': []} for _ in range(n_bootstrap): resampled = [] for ensemble in path_ensembles: indices = rng.integers(0, len(ensemble), size=len(ensemble)) resampled.append([ensemble[idx] for idx in indices]) result = _wham_core(resampled, interfaces, lambda_grid, cv_edges) for key in samples: samples[key].append(result[key]) bootstrap = {} for key, values in samples.items(): values = np.asarray(values) finite = np.isfinite(values) count = np.sum(finite, axis=0) total = np.sum(np.where(finite, values, 0.0), axis=0) mean = np.divide(total, count, out=np.full_like(total, np.nan), where=count > 0) squared = np.where(finite, (values - mean) ** 2, 0.0) variance = np.divide(np.sum(squared, axis=0), count - 1, out=np.full_like(mean, np.nan), where=count > 1) bootstrap[f'{key}_stderr'] = np.sqrt(variance) return bootstrap
[docs]def wham(path_ensembles, interfaces, cv_bin_widths, lambda_grid=None, lambda_bin_width=None, cv_ranges=None, n_bootstrap=0, random_state=None): r"""Path-sampling weighted histogram analysis method (WHAM). This implements the WHAM reweighting described by van Erp et al. for path-sampling trajectories. It computes WHAM-reweighted crossing probabilities, first-crossing distributions in arbitrary CV space, predictive capacity, enhancement, and optional bootstrap uncertainty estimates over trajectories. Parameters ---------- path_ensembles : sequence Nested sequence of path ensembles ``[0+], [1+], ...``. Each trajectory may be a dictionary with lambda data under one of ``'lambda'``, ``'order_parameter'`` or ``'progress_coordinate'`` and CV data under ``'cv'``/``'cvs'``; or a ``(lambda, cv)`` pair. Lambda arrays must have shape ``(n_frames,)``. CV arrays may have shape ``(n_frames,)`` or ``(n_frames, n_cv)``. interfaces : array_like, shape (n_ensembles + 1,) TIS/RETIS/FFS interfaces, including the reactant and product interfaces. The number of path ensembles must be one fewer than the number of interfaces. cv_bin_widths : float or array_like Histogram bin width for each CV dimension. lambda_grid : array_like, optional Fine lambda grid for output. It does not need to coincide with the TIS interfaces. lambda_bin_width : float, optional Used to create ``lambda_grid`` when ``lambda_grid`` is not given. cv_ranges : array_like, optional Explicit CV histogram ranges with shape ``(n_cv, 2)``. n_bootstrap : int, optional Number of bootstrap samples. Resampling is done within each path ensemble. random_state : int, optional Seed for bootstrap resampling. Returns ------- result : dict Dictionary containing ``crossing_probabilities`` for :math:`P_A(\\lambda_r | \\lambda_c)`, ``reactive``, ``unreactive`` and ``total`` first-crossing distributions, ``predictive_capacity``, ``enhancement`` and optional ``bootstrap`` standard errors. Notes ----- Empty CV bins are handled by zero-contribution divisions. Values with ``lambda_r < lambda_c`` are outside the intended domain and are returned as ``nan`` for scalar probability-like outputs. """ path_ensembles, n_cv = _prepare_path_ensembles(path_ensembles) interfaces = _validate_grid(interfaces, 'interfaces') if len(path_ensembles) != interfaces.size - 1: raise ValueError('the number of path ensembles must be one fewer ' 'than the number of interfaces') lambda_grid = _make_lambda_grid(interfaces, lambda_grid, lambda_bin_width) if lambda_grid[0] < interfaces[0] or lambda_grid[-1] > interfaces[-1]: raise ValueError('lambda_grid must stay within the first and last ' 'interfaces') cv_edges = _make_cv_edges(path_ensembles, n_cv, cv_bin_widths, cv_ranges=cv_ranges) result = _wham_core(path_ensembles, interfaces, lambda_grid, cv_edges) if n_bootstrap: if n_bootstrap < 2: raise ValueError('n_bootstrap must be 0 or at least 2') result['bootstrap'] = _bootstrap_wham(path_ensembles, interfaces, lambda_grid, cv_edges, n_bootstrap, random_state) return result
[docs]def _marginal_distribution(distribution, cv_axis): """Return a 1D marginal distribution along one CV axis.""" axes = tuple(axis for axis in range(distribution.ndim) if axis != cv_axis) return np.sum(distribution, axis=axes)
[docs]def plot_wham(result, lambda_c_index=None, lambda_r_index=None, cv_axis=0, cmap='viridis', show=True): """Create publication-style PyVisA plots for path-sampling WHAM. The figure contains heat maps for crossing probability, predictive capacity and enhancement as functions of ``lambda_c`` and ``lambda_r``. The fourth panel shows the reactive, unreactive and total first-crossing distributions for one selected ``(lambda_c, lambda_r)`` pair, marginalized onto one CV when the CV space is multidimensional. Parameters ---------- result : dict Result dictionary returned by :func:`wham`. lambda_c_index, lambda_r_index : int, optional Indices selecting the distribution panel. Defaults to the middle valid lambda-c value and the final lambda-r value. cv_axis : int, optional CV axis used for the marginal distribution panel. cmap : string, optional Matplotlib colormap. show : bool, optional If True, call :func:`matplotlib.pyplot.show`. Returns ------- figure, axes : tuple Matplotlib figure and axes. """ lambda_grid = result['lambda_grid'] if lambda_c_index is None: lambda_c_index = max(0, len(lambda_grid) // 2 - 1) if lambda_r_index is None: lambda_r_index = len(lambda_grid) - 1 if lambda_r_index < lambda_c_index: raise ValueError('lambda_r_index must be >= lambda_c_index') if cv_axis < 0 or cv_axis >= len(result['cv_centers']): raise ValueError('cv_axis is out of range') figure, axes = plt.subplots(2, 2, constrained_layout=True) heatmaps = [('Crossing probability', result['crossing_probabilities']), ('Predictive capacity', result['predictive_capacity']), ('Enhancement', result['enhancement'])] extent = [lambda_grid[0], lambda_grid[-1], lambda_grid[0], lambda_grid[-1]] for axis, (title, data) in zip(axes.flat[:3], heatmaps): image = axis.imshow(data.T, origin='lower', extent=extent, aspect='auto', cmap=cmap) figure.colorbar(image, ax=axis) axis.set_title(title) axis.set_xlabel(r'$\lambda_c$') axis.set_ylabel(r'$\lambda_r$') dist_axis = axes.flat[3] centers = result['cv_centers'][cv_axis] selectors = (lambda_c_index, lambda_r_index) reactive = _marginal_distribution(result['reactive'][selectors], cv_axis) unreactive = _marginal_distribution(result['unreactive'][selectors], cv_axis) total = _marginal_distribution(result['total'][selectors], cv_axis) dist_axis.plot(centers, total, label='total', color='black') dist_axis.plot(centers, reactive, label='reactive', color='tab:green') dist_axis.plot(centers, unreactive, label='unreactive', color='tab:orange') dist_axis.set_title('First-crossing distributions') dist_axis.set_xlabel(f'CV {cv_axis}') dist_axis.set_ylabel('Probability') dist_axis.legend(frameon=False) if show: plt.show() return figure, axes
[docs]def _plot_pca_results(n_pca, principal_df, loadings, pca_info, cmap): """Plot the results of a PCA analysis. Parameters ---------- n_pca : integer Number of principal components. principal_df : object like pandas.DataFrame Principal components data. loadings : object like pandas.DataFrame PCA loadings. pca_info : dict Dict with keys 'model' (fitted PCA), 'cols' (column labels), 'features' (feature labels). cmap : string Matplotlib colormap. """ pca_model = pca_info['model'] cols = pca_info['cols'] features = pca_info['features'] # Plot PC1 vs PC2 if n_pca > 1: fig_1 = plt.figure() axis_1 = fig_1.add_subplot(111) axis_1.scatter(principal_df['PC1'], principal_df['PC2'], edgecolors=None, alpha=0.5, cmap=cmap) axis_1.set_xlabel('PC1') axis_1.set_ylabel('PC2') # Plot the loadings fig_2 = plt.figure() axis_2 = fig_2.add_subplot(111) fig_2.colorbar(axis_2.matshow(loadings)) axis_2.set_xticks(np.arange(len(cols))) axis_2.set_yticks(np.arange(len(features))) axis_2.set_xticklabels(cols) axis_2.set_yticklabels(features) # Plot Cumulative explained variance fig_3 = plt.figure() axis_3 = fig_3.add_subplot(111) axis_3.plot(np.cumsum(pca_model.explained_variance_ratio_)) axis_3.set_xlabel('Principal components') axis_3.set_ylabel('Cumulative explained variance') plt.show()
[docs]def pyvisa_pca(n_pca, settings, data, cmap): """Perform PCA on ensemble data. This function performs PCA on data from the selected ensembles and stores the data to a hdf-file for further analysis, plots the first two principal components against each other, the cumulative explained variance and the loadings. The hdf5 file contains the keywords: * `sim_data`: pandas Dataframe containing the simulation data. * `PC`: Dataframe containing the data from the principal components. * `loadings`: pandas Dataframe containing the loadings. * `explained`: pandas Series containing the simulation data. * `correlation_matrix`: Dataframe containing the correlation matrix. Parameters ---------- n_pca : integer Number of clusters. settings : dict Settings from GUI. data : object like pandas.DataFrame Simulation data from chosen ensembles. cmap : string Matplotlib colormap. """ basename = f'PCA_data_{settings["fol"]}_{n_pca}.hdf' data.to_hdf(basename, key='sim_data', mode='w') features = data.columns data = StandardScaler().fit_transform(data) cols = [f'PC{i}' for i in range(1, n_pca + 1)] pca_model = PCA(n_components=n_pca) principal_comps = pca_model.fit_transform(data) principal_df = pd.DataFrame(principal_comps, columns=cols) explained = pd.Series(pca_model.explained_variance_ratio_) explained.index += 1 loadings = pd.DataFrame(pca_model.components_.T, columns=cols, index=features) load_corr = pca_model.components_.T * np.sqrt( pca_model.explained_variance_) load_corr_mat = pd.DataFrame(load_corr, columns=cols, index=features) principal_df.to_hdf(basename, key='PC', mode='a') loadings.to_hdf(basename, key='loadings', mode='a') load_corr_mat.to_hdf(basename, key='correlation_matrix', mode='a') explained.to_hdf(basename, key='explained', mode='a') pca_info = {'model': pca_model, 'cols': cols, 'features': features} _plot_pca_results(n_pca, principal_df, loadings, pca_info, cmap)
[docs]def k_means(n_clusters, data, settings, cmap): """Perform K-means clustering. This function performs K-means clustering on simulation data with a chosen amount of clusters, and plots the results. Parameters ---------- n_clusters : integer Number of clusters. data : object like numpy.ndarray Simulation data from chosen ensembles. settings : dict Settings from GUI. cmap : string Matplotlib colormap. """ model = MiniBatchKMeans(n_clusters=n_clusters) labels = model.fit_predict(data) fig = plt.figure() axis = fig.add_subplot(111) axis.scatter(data[:, 0], data[:, 1], c=labels, s=5, cmap=cmap) axis.set_title('Kmeans clustering') axis.set_xlabel(settings['op1']) axis.set_ylabel(settings['op2']) plt.show()
[docs]def hierarchical(n_clusters, data, settings, cmap): """Perform hierarchical clustering. This function performs hierarchical clustering on simulation data with a chosen amount of clusters, and plots the results. Parameters ---------- n_clusters : integer Number of clusters. data : object like numpy.ndarray Simulation data from chosen ensembles. settings : dict Settings from GUI. cmap : string Matplotlib colormap. """ model = AgglomerativeClustering(n_clusters=n_clusters, linkage='average') labels = model.fit_predict(data) fig = plt.figure() axis = fig.add_subplot(111) axis.scatter(data[:, 0], data[:, 1], c=labels, s=5, cmap=cmap) axis.set_title('Hierarchical clustering') axis.set_xlabel(settings['op1']) axis.set_ylabel(settings['op2']) plt.show()
[docs]def spectral(n_clusters, data, settings, cmap): """Perform spectral clustering. This function performs spectral clustering on simulation data with a chosen amount of clusters, and plots the results. Parameters ---------- n_clusters : integer Number of clusters. data : object like numpy.ndarray Simulation data from chosen ensembles. settings : dict Settings from GUI. cmap : string Matplotlib colormap. """ model = SpectralClustering( n_clusters=n_clusters, affinity='nearest_neighbors', n_neighbors=30, assign_labels='kmeans') labels = model.fit_predict(data) fig = plt.figure() axis = fig.add_subplot(111) axis.scatter(data[:, 0], data[:, 1], c=labels, s=5, cmap=cmap) axis.set_title('Spectral clustering') axis.set_xlabel(settings['op1']) axis.set_ylabel(settings['op2']) plt.show()
[docs]def gaussian_mixture(n_clusters, data, settings, cmap): """Perform gaussian mixture clustering. This function performs gaussian mixture clustering on simulation data with a chosen amount of clusters, and plots the results. Parameters ---------- n_clusters : integer Number of clusters. data : object like numpy.ndarray Simulation data from chosen ensembles. settings : dict Settings from GUI. cmap : string Matplotlib colormap. """ model = GaussianMixture(n_components=n_clusters) labels = model.fit_predict(data) fig = plt.figure() axis = fig.add_subplot(111) axis.scatter(data[:, 0], data[:, 1], c=labels, s=5, cmap=cmap) axis.set_title('Gaussian mixture clustering') axis.set_xlabel(settings['op1']) axis.set_ylabel(settings['op2']) plt.show()
[docs]def correlation_matrix(dataframe): """Show correlation matrix of simulation variables. Function which computes the Pearson correlations between parameters and shows it in a correlation plot. Parameters ---------- dataframe : object like pandas.DataFrame Dataframe containing simulation data. """ titles = list(dataframe.columns) fig = plt.figure() axis = fig.add_subplot(111) corr = axis.matshow(dataframe.corr()) fig.colorbar(corr) axis.set_xticks(np.arange(len(titles))) axis.set_yticks(np.arange(len(titles))) axis.set_xticklabels(titles) axis.set_yticklabels(titles) plt.show()
[docs]def random_forest(xdata, ydata, depth): """Create random forest classification. Parameters ---------- xdata : object like pandas.DataFrame Pandas dataframe containing the op and cv data from selected frames in the simulation. ydata : object like pandas.DataFrame Pandas dataframe containing True/False for each frame in the selected ensembles. True if the frame is reactive, else False. depth : integer Depth of random forest model. """ rf_model = RandomForestClassifier(n_estimators=500, max_depth=depth, random_state=16) ydata = ydata.values.ravel() rf_model.fit(xdata, ydata) important_features = np.argsort(rf_model.feature_importances_) feature_imp = pd.Series(rf_model.feature_importances_, index=xdata.columns).sort_values(ascending=False) figure = plt.figure() axis = figure.add_subplot(111) x_ticks = [i + 0.5 for i in range(len(important_features))] axis.bar(x_ticks, feature_imp) axis.set_xticks(x_ticks) axis.set_xticklabels(feature_imp.keys(), rotation=90, ha='center') axis.set_ylabel('Gini feature importance') plt.show()
[docs]def support_vector_machine(xdata, ydata, kernel='rbf', c_value=1.0, gamma='scale', cmap='viridis'): """Create support vector machine classification. Parameters ---------- xdata : object like pandas.DataFrame Pandas dataframe containing the op and cv data from selected frames in the simulation. ydata : object like pandas.DataFrame Pandas dataframe containing True/False for each frame in the selected ensembles. True if the frame is reactive, else False. kernel : string, optional Kernel type to be used in the SVM model. c_value : float, optional Regularization parameter for the SVM model. gamma : string or float, optional Kernel coefficient for 'rbf', 'poly' and 'sigmoid' kernels. cmap : string, optional Matplotlib colormap. Returns ------- svm_model : object like sklearn.pipeline.Pipeline Fitted support vector machine model. """ ydata = ydata.values.ravel() svm_model = make_pipeline( StandardScaler(), SVC(kernel=kernel, C=c_value, gamma=gamma, random_state=16)) labels = svm_model.fit(xdata, ydata).predict(xdata) data = np.asarray(xdata) feature_names = xdata.columns.astype(str).tolist() y_plot = data[:, 1] if data.shape[1] > 1 else np.zeros(data.shape[0]) y_label = feature_names[1] if data.shape[1] > 1 else 'Classification' figure = plt.figure() axis = figure.add_subplot(111) axis.scatter(data[:, 0], y_plot, c=labels, s=5, cmap=cmap) axis.set_title('Support vector machine classification') axis.set_xlabel(feature_names[0]) axis.set_ylabel(y_label) plt.show() return svm_model
[docs]def decision_tree(xdata, ydata, depth): """Create a decision tree. Parameters ---------- xdata : object like pandas.DataFrame Pandas dataframe containing the op and cv data from selected frames i the simulation. ydata : object like pandas.DataFrame Pandas dataframe containing True/False for each frame in the selected ensembles. True if the frame is reactive, else False. depth : integer Depth of the decision tree. """ names = xdata.columns.astype(str).tolist() x_train, _, y_train, _ = train_test_split(xdata, ydata, test_size=0.2, stratify=ydata, random_state=16, shuffle=True) clf = DecisionTreeClassifier(criterion='entropy', max_depth=depth) clf.fit(x_train, y_train) dot_data = export_graphviz(clf, out_file=None, feature_names=names, class_names=['reactive', 'unreactive'], filled=True, rounded=True, special_characters=True) graph = graphviz.Source(dot_data) graph.render('decision_tree', view=True, format='png', cleanup=True)