Source code for pyretis.core.path

# -*- coding: utf-8 -*-
# Copyright (c) 2019, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Classes and functions for paths.

The classes and functions defined in this module are useful for
representing paths.


Important classes defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

PathBase (:py:class:`.PathBase`)
    A base class for paths.

Path (:py:class:`.Path`)
    Class for a generic path that stores all possible information.

Important methods defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

paste_paths
    Function for joining two paths, one is in a backward time
    direction and the other is in the forward time direction.
"""
from abc import abstractmethod
import logging
import numpy as np
from pyretis.core.system import System
logger = logging.getLogger(__name__)  # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())


__all__ = ['PathBase', 'Path', 'paste_paths']

# The following defines a human-readable form of the possible path status:
_STATUS = {
    'ACC': 'The path has been accepted',
    'MCR': 'Momenta change rejection',
    'BWI': 'Backward trajectory end at wrong interface',
    'BTL': 'Backward trajectory too long (detailed balance condition)',
    'BTX': 'Backward trajectory too long (max-path exceeded)',
    'BTS': 'Backward trajectory too short',
    'KOB': 'Kicked outside of boundaries',
    'FTL': 'Forward trajectory too long (detailed balance condition)',
    'FTX': 'Forward trajectory too long (max-path exceeded)',
    'FTS': 'Forward trajectory too short',
    'NCR': 'No crossing with middle interface',
    'EWI': 'Initial path ends at wrong interface',
    'SWI': 'Initial path starts at wrong interface',
    '0-L': 'Path in the {0-} ensemble ends at the left interface',
    'NSG': 'Path has no positive segments for Web Throwing shooting move',
    'NSS': 'No one-step crossing in stone skipping',
    'XSS': 'SS sub path too long in stone skipping',
    'HAS': 'High Acceptance Swap rejection for SS detailed balance',
    'SSA': 'Stone Skipping super detailed balance rejection',
    'WTA': 'Web Throwing super detailed balance rejection',
}


# The following defines a human-readable form of the possible moves:
_GENERATED = {
    'sh': 'Path was generated with a shooting move',
    'tr': 'Path was generated with a time-reversal move',
    'ki': 'Path was generated by integration after kicking',
    'ld': 'Path was loaded from a external file(s)',
    're': 'Path was restarted from an external file',
    's+': 'Path was generated by a swapping move from +',
    's-': 'Path was generated by a Swapping move from -',
    'ss': 'Path was generated by Stone Skipping',
    'wt': 'Path was generated by Web Throwing',
    '00': 'Path was generated by a null move'
}


# Short versions of the moves:
_GENERATED_SHORT = {
    'sh': 'Shoot',
    'tr': 'Time-reversal',
    'ki': '"Kick" initiation',
    'ld': 'Loaded from ext file(s)',
    're': 'Loaded/restart ext file',
    's+': 'Swap from +',
    's-': 'Swap from -',
    'ss': 'Stone skipping',
    'wt': 'Web Throwing',
    '00': 'Null'
}


[docs]def paste_paths(path_back, path_forw, overlap=True, maxlen=None): """Merge a backward with a forward path into a new path. The resulting path is equal to the two paths stacked, in correct time. Note that the ordering is important here so that: ``paste_paths(path1, path2) != paste_paths(path2, path1)``. There are two things we need to take care of here: - `path_back` must be iterated in reverse (it is assumed to be a backward trajectory). - we may have to remove one point in `path2` (if the paths overlap). Parameters ---------- path_back : object like :py:class:`.PathBase` This is the backward trajectory. path_forw : object like :py:class:`.PathBase` This is the forward trajectory. overlap : boolean, optional If True, `path_back` and `path_forw` have a common starting-point, that is, the first point in `path_forw` is identical to the first point in `path_back`. In time-space, this means that the *first* point in `path_forw` is identical to the *last* point in `path_back` (the backward and forward path started at the same location in space). maxlen : float, optional This is the maximum length for the new path. If it's not given, it will just be set to the largest of the `maxlen` of the two given paths. Note ---- Some information about the path will not be set here. This must be set elsewhere. This includes how the path was generated (`path.generated`) and the status of the path (`path.status`). """ if maxlen is None: if path_back.maxlen == path_forw.maxlen: maxlen = path_back.maxlen else: # They are unequal and both is not None, just pick the largest. # In case one is None, the other will be picked. # Note that now there is a chance of truncating the path while # pasting! maxlen = max(path_back.maxlen, path_forw.maxlen) msg = 'Unequal length: Using {} for the new path!'.format(maxlen) logger.warning(msg) time_origin = path_back.time_origin - path_back.length + 1 new_path = path_back.empty_path(maxlen=maxlen, time_origin=time_origin) for phasepoint in reversed(path_back.phasepoints): app = new_path.append(phasepoint) if not app: msg = 'Truncated while pasting backwards at: {}' msg = msg.format(new_path.length) logger.warning(msg) return new_path first = True for phasepoint in path_forw.phasepoints: if first and overlap: first = False continue app = new_path.append(phasepoint) if not app: msg = 'Truncated path at: {}'.format(new_path.length) logger.warning(msg) return new_path return new_path
def check_crossing(cycle, orderp, interfaces, leftside_prev): """Check if we have crossed an interface during the last step. This function is useful for checking if an interface was crossed from the previous step till the current one. This is for instance used in the MD simulations for the initial flux. If will use a variable to store the previous positions with respect to the interfaces and check if interfaces were crossed here. Parameters ---------- cycle : int This is the current simulation cycle number. orderp : float The current order parameter. interfaces : list of floats These are the interfaces to check. leftside_prev : list of booleans or None These are used to store the previous positions with respect to the interfaces. Returns ------- leftside_curr : list of booleans These are the updated positions with respect to the interfaces. cross : list of tuples If a certain interface is crossed, a tuple will be added to this list. The tuple is of form (cycle number, interface number, direction) where the direction is '-' for a crossing in the negative direction and '+' for a crossing in the positive direction. """ cross = [] if leftside_prev is None: leftside_curr = [orderp < interf for interf in interfaces] else: leftside_curr = [i for i in leftside_prev] for i, (left, interf) in enumerate(zip(leftside_prev, interfaces)): if left and orderp > interf: # Was on the left side, moved to the right side. leftside_curr[i] = False cross.append((cycle, i, '+')) elif not left and orderp < interf: # Was on the right side, moved to the left side. leftside_curr[i] = True cross.append((cycle, i, '-')) return leftside_curr, cross
[docs]class PathBase: """Base class for representation of paths. This class represents a path. A path consists of a series of consecutive snapshots (the trajectory) with the corresponding order parameter. Attributes ---------- generated : tuple This contains information on how the path was generated. `generated[0]` : string, as defined in the variable `_GENERATED` `generated[1:]` : additional information: For ``generated[0] == 'sh'`` the additional information is the index of the shooting point on the old path, the new path and the corresponding order parameter. maxlen : int This is the maximum path length. Some algorithms require this to be set. Others don't, which is indicated by setting `maxlen` equal to None. ordermax : tuple This is the (current) maximum order parameter for the path. `ordermax[0]` is the value, `ordermax[1]` is the index in `self.path`. ordermin : tuple This is the (current) minimum order parameter for the path. `ordermin[0]` is the value, `ordermin[1]` is the index in `self.path`. phasepoints : list of objects like :py:class:`.System` The phase points the path is made up of. rgen : object like :py:class:`.RandomGenerator` This is the random generator that will be used for the paths that required random numbers. time_origin : int This is the location of the phase point `path[0]` relative to its parent. This might be useful for plotting. status : str or None The status of the path. The possibilities are defined in the variable `_STATUS`. weight : real The statistical weight of the path. """
[docs] def __init__(self, rgen=None, maxlen=None, time_origin=0): """Initialise the PathBase object. Parameters ---------- rgen : object like :py:class:`.RandomGenerator`, optional This is the random generator that will be used. maxlen : int, optional This is the max-length of the path. The default value, None, is just a path of arbitrary length. time_origin : int, optional This can be used to store the shooting point of a parent trajectory. """ self.maxlen = maxlen self.weight = 1. self.time_origin = time_origin self.status = None self.generated = None if rgen is None: self.rgen = np.random.RandomState() else: self.rgen = rgen self.phasepoints = []
@property def length(self): """Compute the length of the path.""" return len(self.phasepoints) @property def ordermin(self): """Compute the minimum order parameter of the path.""" idx = np.argmin([i.order[0] for i in self.phasepoints]) return (self.phasepoints[idx].order[0], idx) @property def ordermax(self): """Compute the maximum order parameter of the path.""" idx = np.argmax([i.order[0] for i in self.phasepoints]) return (self.phasepoints[idx].order[0], idx)
[docs] def check_interfaces(self, interfaces): """Check current status of the path. Get the current status of the path with respect to the interfaces. This is intended to determine if we have crossed certain interfaces or not. Parameters ---------- interfaces : list of floats This list is assumed to contain the three interface values left, middle and right. Returns ------- out[0] : str, 'L' or 'R' or None Start condition: did the trajectory start at the left ('L') or right ('R') interface. out[1] : str, 'L' or 'R' or None Ending condition: did the trajectory end at the left ('L') or right ('R') interface or None of them. out[2] str, 'M' or '*' 'M' if the middle interface is crossed, '*' otherwise. out[3] : list of boolean These values are given by `ordermin < interfaces[i] <= ordermax`. """ if self.length < 1: logger.warning('Path is empty!') return None, None, None, None ordermax, ordermin = self.ordermax[0], self.ordermin[0] cross = [ordermin < interpos <= ordermax for interpos in interfaces] left, right = min(interfaces), max(interfaces) # Check end & start point: end = self.get_end_point(left, right) start = self.get_start_point(left, right) middle = 'M' if cross[1] else '*' return start, end, middle, cross
[docs] def get_end_point(self, left, right=None): """Return the end point of the path as a string. The end point is either to the left of the `left` interface or to the right of the `right` interface, or somewhere in between. Parameters ---------- left : float The left interface. right : float, optional The right interface, equal to left if not specified. Returns ------- out : string A string representing where the end point is ('L' - left, 'R' - right or None). """ if right is None: right = left if self.phasepoints[-1].order[0] <= left: end = 'L' elif self.phasepoints[-1].order[0] >= right: end = 'R' else: end = None logger.debug('Undefined end point.') return end
[docs] def get_start_point(self, left, right=None): """Return the start point of the path as a string. The start point is either to the left of the `left` interface or to the right of the `right` interface. Parameters ---------- left : float The left interface. right : float, optional The right interface, equal to left if not specified. Returns ------- out : string A string representing where the start point is ('L' - left, 'R' - right or None). """ if right is None: right = left if self.phasepoints[0].order[0] <= left: start = 'L' elif self.phasepoints[0].order[0] >= right: start = 'R' else: start = None logger.debug('Undefined starting point.') return start
[docs] @abstractmethod def get_shooting_point(self): """Return a shooting point from the path. Returns ------- phasepoint : object like :py:class:`.System` A phase point which will be the state to shoot from. idx : int The index of the shooting point. """ return
[docs] def append(self, phasepoint): """Append a new phase point to the path. Parameters ---------- out : object like :py:class:`.System` The system information we add to the path. """ if self.maxlen is None or self.length < self.maxlen: self.phasepoints.append(phasepoint) return True logger.debug('Max length exceeded. Could not append to path.') return False
[docs] def get_path_data(self, status, interfaces): """Return information about the path. This information can be stored in a object like :py:class:`.PathEnsemble`. Parameters ---------- status : string This represents the current status of the path. interfaces : list These are just the interfaces we are currently considering. """ path_info = { 'generated': self.generated, 'status': status, 'length': self.length, 'ordermax': self.ordermax, 'ordermin': self.ordermin, 'weight': self.weight, } start, end, middle, _ = self.check_interfaces(interfaces) path_info['interface'] = (start, middle, end) return path_info
[docs] def set_move(self, move): """Update the path move. The path move is a short string that represents how the path was generated. It should preferably match one of the moves defined in `_GENERATED`. Parameters ---------- move : string A short description of the move. """ if self.generated is None: self.generated = (move, 0, 0, 0) else: self.generated = (move, self.generated[1], self.generated[2], self.generated[3])
[docs] def get_move(self): """Return the move used to generate the path.""" if self.generated is None: return None return self.generated[0]
[docs] def success(self, detect): """Check if the path is successful. The check is based on the maximum order parameter and the value of `detect`. It is successful if the maximum order parameter is greater than `detect`. Parameters ---------- detect : float The value for which the path is successful, i.e. the "detect" interface. """ return self.ordermax[0] > detect
[docs] def __iadd__(self, other): """Add path data to a path from another path, i.e. ``self += other``. This will simply append the phase points from `other`. Parameters ---------- other : object of type `Path` The object to add path data from. Returns ------- self : object of type `Path` The updated path object. """ for phasepoint in other.phasepoints: app = self.append(phasepoint.copy()) if not app: logger.warning( 'Truncated path at %d while adding paths', self.length ) return self return self
[docs] def copy(self): """Return a copy of the path.""" new_path = self.empty_path() for phasepoint in self.phasepoints: new_path.append(phasepoint.copy()) new_path.status = self.status new_path.time_origin = self.time_origin new_path.generated = self.generated new_path.maxlen = self.maxlen new_path.weight = self.weight return new_path
[docs] @staticmethod def reverse_velocities(system): """Reverse the velocities in the phase points.""" system.particles.reverse_velocities()
[docs] def reverse(self, order_function): """Reverse a path and return the reverse path as a new path. This will reverse a path and return the reversed path as a new object like :py:class:`.PathBase` object. Returns ------- new_path : object like :py:class:`.PathBase` The time reversed path. order_function : object like :py:class:`.OrderParameter` The method to use to re-calculate the order parameter, if it is velocity dependent. """ new_path = self.empty_path() for phasepoint in reversed(self.phasepoints): new_point = phasepoint.copy() self.reverse_velocities(new_point) app = new_path.append(new_point) if not app: # pragma: no cover msg = 'Could not reverse path' logger.error(msg) return None if order_function and order_function.velocity_dependent: for phasepoint in new_path.phasepoints: phasepoint.order = order_function.calculate(phasepoint) return new_path
[docs] def __str__(self): """Return a simple string representation of the Path.""" msg = ['Path with length {} (max: {})'.format(self.length, self.maxlen)] msg += ['Order parameter max: {}'.format(self.ordermax)] msg += ['Order parameter min: {}'.format(self.ordermin)] if self.length > 0: msg += ['Start {}'.format(self.phasepoints[0].order[0])] msg += ['End {}'.format(self.phasepoints[-1].order[0])] if self.status: msg += ['Status: {}'.format(_STATUS[self.status])] if self.generated: move = self.generated[0] txtmove = _GENERATED.get(move, 'unknown move') msg += ['Generated: {}'.format(txtmove)] return '\n'.join(msg)
[docs] @abstractmethod def restart_info(self): """Return a dictionary with restart information.""" return
[docs] @abstractmethod def empty_path(self, **kwargs): """Return an empty path of same class as the current one. This function is intended to spawn sibling paths that share some properties and also some characteristics of the current path. The idea here is that a path of a certain class should only be able to create paths of the same class. Returns ------- out : object like :py:class:`.PathBase` A new empty path. """ return
[docs] def __eq__(self, other): """Check if two paths are equal.""" if self.__class__ != other.__class__: return False if set(self.__dict__) != set(other.__dict__): return False # Compare phasepoints: if not len(self.phasepoints) == len(other.phasepoints): return False for i, j in zip(self.phasepoints, other.phasepoints): if not i == j: return False # Compare other attributes: for i in ('maxlen', 'time_origin', 'status', 'generated', 'rgen', 'length', 'ordermax', 'ordermin'): attr_self = hasattr(self, i) attr_other = hasattr(other, i) if attr_self ^ attr_other: # pragma: no cover logger.warning('Failed comparing path due to missing "%s"', i) return False if not attr_self and not attr_other: logger.warning( 'Skipping comparison of missing path attribute "%s"', i ) continue if getattr(self, i) != getattr(other, i): return False return True
[docs] def __ne__(self, other): """Check if two paths are not equal.""" return not self == other
[docs] def delete(self, idx): """Remove a phase point from the path. Parameters ---------- idx : integer The index of the frame to remove. """ del self.phasepoints[idx]
[docs] def sorting(self, key, reverse=False): """Re-order the phase points according to the given key. Parameters ---------- key : string The attribute we will sort according to. reverse : boolean, optional If this is False, the sorting is from big to small. Yields ------ out : object like :py:class:`.System` The ordered phase points from the path. """ if key in ('ekin', 'vpot'): sort_after = [getattr(i.particles, key) for i in self.phasepoints] elif key == 'order': sort_after = [getattr(i, key)[0] for i in self.phasepoints] else: sort_after = [getattr(i, key) for i in self.phasepoints] idx = np.argsort(sort_after) if reverse: idx = idx[::-1] self.phasepoints = [self.phasepoints[i] for i in idx]
[docs] def update_energies(self, ekin, vpot): """Update the energies for the phase points. This method is useful in cases where the energies are read from external engines and returned as a list of floats. Parameters ---------- ekin : list of floats The kinetic energy to set. vpot : list of floats The potential energies to set. """ if len(ekin) != len(vpot): logger.debug( 'Kinetic and potential energies have different length.' ) if len(ekin) != len(self.phasepoints): logger.debug( 'Length of kinetic energy and phase points differ %d != %d.', len(ekin), len(self.phasepoints) ) if len(vpot) != len(self.phasepoints): logger.debug( 'Length of potential energy and phase points differ %d != %d.', len(vpot), len(self.phasepoints) ) for i, phasepoint in enumerate(self.phasepoints): try: vpoti = vpot[i] except IndexError: logger.warning( 'Ran out of potential energies, setting to None.' ) vpoti = None try: ekini = ekin[i] except IndexError: logger.warning( 'Ran out of kinetic energies, setting to None.' ) ekini = None phasepoint.particles.vpot = vpoti phasepoint.particles.ekin = ekini
[docs]class Path(PathBase): """A path where the full trajectory is stored in memory. This class represents a path. A path consists of a series of consecutive snapshots (the trajectory) with the corresponding order parameter. Here we store all information for all phase points on the path. """
[docs] def get_shooting_point(self): """Return a shooting point from the path. This will simply draw a shooting point from the path at random. All points can be selected with equal probability with the exception of the end points which are not considered. Returns ------- out[0] : object like :py:class:`.System` The phase point we selected. out[1] : int The shooting point index. """ idx = self.rgen.random_integers(1, self.length - 2) return self.phasepoints[idx], idx
[docs] def empty_path(self, **kwargs): """Return an empty path of same class as the current one. Returns ------- out : object like :py:class:`.PathBase` A new empty path. """ maxlen = kwargs.get('maxlen', None) time_origin = kwargs.get('time_origin', 0) return self.__class__(self.rgen, maxlen=maxlen, time_origin=time_origin)
[docs] def restart_info(self): """Return a dictionary with restart information.""" info = { 'generated': self.generated, 'maxlen': self.maxlen, 'time_origin': self.time_origin, 'status': self.status, 'phasepoints': [i.restart_info() for i in self.phasepoints] } return info
[docs] def load_restart_info(self, info): """Set up the path using restart information.""" for key, val in info.items(): # For phasepoints, create new System objects # and load the information for these: if key == 'phasepoints': for i in val: system = System() system.load_restart_info(i) self.append(system) else: if hasattr(self, key): setattr(self, key, val)