# -*- coding: utf-8 -*-
# Copyright (c) 2019, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""This file contains common functions for the input/output.
It contains some slave functions that are used in the in/output function
of PyRETIS.
Important classes defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
OutputBase (:py:class:`.OutputBase`)
A base class for handling the output.
Important methods defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
check_python_version (:py:func:`.check_python_version`)
A method that will give warnings when we use older and possibly
unsupported Python versions.
create_backup (:py:func:`.create_backup`)
A function to handle the creation of backups of old files.
make_dirs (:py:func:`.make_dirs`)
Create directories (for path simulation).
create_empty_ensembles (:py:func:`.create_ensembles`)
A method to prepare the ensembles inputs in settings
simplify_ensemble_name (:py:func:`.simplify_ensemble_name`)
Simplify the name of ensembles for creating directories.
generate_file_name (:py:func:`.generate_file_name`)
Generate file name for an output task, from settings.
"""
import logging
import errno
import os
import re
import sys
from abc import ABCMeta, abstractmethod
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())
__all__ = [
'check_python_version',
'create_backup',
'create_empty_ensembles',
'make_dirs',
'OutputBase',
'simplify_ensemble_name',
]
# Hard-coded patters for energy analysis output files.
# These are just used to make it simpler to change these default
# names in the future.
ENERFILES = {'energies': 'energies',
'run_energies': 'runenergies',
'temperature': 'temperature',
'run_temp': 'runtemperature',
'block': '{}block',
'dist': '{}dist'}
# hard-coded information for the energy terms:
ENERTITLE = {'vpot': 'Potential energy',
'ekin': 'Kinetic energy',
'etot': 'Total energy',
'ham': 'Hamilt. energy',
'temp': 'Temperature',
'elec': 'Energy (externally computed)'}
# hard-coded patters for flux analysis output files:
FLUXFILES = {'runflux': 'runflux_{}',
'block': 'errflux_{}'}
# order files:
ORDERFILES = {'order': 'order',
'ordervel': 'orderv',
'run_order': 'runorder',
'dist': 'orderdist',
'block': 'ordererror',
'msd': 'ordermsd'}
# hard-coded patters for path analysis output files:
PATHFILES = {'pcross': '{}_pcross',
'prun': '{}_prun',
'perror': '{}_perror',
'lengtherror': '{}_lerror',
'pathlength': '{}_lpath',
'shoots': '{}_shoots',
'shoots_scaled': '{}_shoots_scaled'}
# hard-coded patterns for matched files:
PATH_MATCH = {'total': 'total-probability',
'progress': 'overall-rrun',
'error': 'overall-err',
'match': 'matched-probability'}
[docs]def create_backup(outputfile):
"""Check if a file exist and create backup if requested.
This function will check if the given file name exists and if it
does, it will move that file to a new file name such that the given
one can be used without overwriting.
Parameters
----------
outputfile : string
This is the name of the file we wish to create.
Returns
-------
out : string
This string is None if no backup is made, otherwise, it will
just say what file was moved (and to where).
Note
----
No warning is issued here. This is just in case the `msg` returned
here will be part of some more elaborate message.
"""
filename = '{}'.format(outputfile)
fileid = 0
msg = None
while os.path.isfile(filename) or os.path.isdir(filename):
filename = '{}_{:03d}'.format(outputfile, fileid)
fileid += 1
if fileid > 0:
msg = 'Backup existing file "{}" to "{}"'.format(outputfile, filename)
os.rename(outputfile, filename)
return msg
def _remove_extension(filename):
"""Remove the extension of a given file name.
Parameters
----------
filename : string
The file name to check.
Returns
-------
out : string
The filename with the extension removed.
"""
try:
return os.path.splitext(filename)[0]
except IndexError: # pragma: no cover
return filename
[docs]def make_dirs(dirname):
"""Create directories for path simulations.
This function will create a folder using a specified path.
If the path already exists and if it's a directory, we will do
nothing. If the path exists and is a file we will raise an
`OSError` exception here.
Parameters
----------
dirname : string
This is the directory to create.
Returns
-------
out : string
A string with some info on what this function did. Intended for
output.
"""
try:
os.makedirs(dirname)
msg = 'Created directory: "{}"'.format(dirname)
return msg
except OSError as err:
if err.errno != errno.EEXIST: # pragma: no cover
raise err
if os.path.isfile(dirname):
msg = '"{}" is a file. Will abort!'
raise OSError(errno.EEXIST, msg, dirname)
if os.path.isdir(dirname):
msg = 'Directory "{}" already exist.'.format(dirname)
return msg
[docs]def simplify_ensemble_name(ensemble, fmt='{:03d}'):
"""Simplify path names for file/directory names.
Here, we are basically translating ensemble names to more friendly
names for directories and files that is:
- ``[0^-]`` returns ``000``,
- ``[0^+]`` returns ``001``,
- ``[1^+]`` returns ``002``, etc.
Parameters
----------
ensemble : string
This is the string to simplify.
fmt : string. optional
This is a format to use for the directories.
"""
match_ensemble = re.search(r'(?<=\[)(\d+)(?=\^)', ensemble)
if match_ensemble:
ens = int(match_ensemble.group())
else:
match_ensemble = re.search(r'(?<=\[)(\d+)(?=\])', ensemble)
if match_ensemble:
ens = int(match_ensemble.group())
else:
return ensemble # Assume that the ensemble is OK as it is.
match_dir = re.search(r'(?<=\^)(.)(?=\])', ensemble)
if match_dir:
dire = match_dir.group()
if dire == '-':
ens = ens
else:
ens += 1
else:
msg = ['Could not get direction for ensemble {}.'.format(ensemble),
'We assume "+", note that this might overwrite files']
logger.warning('\n'.join(msg))
ens += 1
return fmt.format(ens)
def add_dirname(filename, dirname):
"""Add a directory as a prefix to a filename, i.e. `dirname/filename`.
Parameters
----------
filename : string
The filename.
dirname : string
The directory we want to prefix. It can be None, in which
case we ignore it.
Returns
-------
out : string
The path to the resulting file.
"""
if dirname is not None:
return os.path.join(dirname, filename)
return filename
def name_file(name, extension, path=None):
"""Return a file name by joining a name and an file extension.
This function is used to create file names. It will use
`os.extsep` to create the file names and `os.path.join` to add a
path name if the `path` is given. The returned file name will be of
form (example for posix): ``path/name.extension``.
Parameters
----------
name : string
This is the name, without extension, for the file.
extension : string
The extension to use for the file name.
path : string, optional
An optional path to add to the file name.
Returns
-------
out : string
The resulting file name.
"""
return add_dirname(os.extsep.join([name, extension]), path)
def generate_file_name(basename, directory, settings):
"""Generate file name for an output task, from settings.
Parameters
----------
basename : string
The base file name to use.
directory : string
A directory to output to. Can be None to output to the
current working directory.
settings : dict
The input settings
Returns
-------
filename : string
The file name to use.
"""
prefix = settings['output'].get('prefix', None)
if prefix is not None:
filename = '{}{}'.format(prefix, basename)
else:
filename = basename
filename = add_dirname(filename, directory)
return filename
[docs]def check_python_version(): # pragma: no cover
"""Give a warning about old python version(s)."""
pyversion = sys.version.split()[0]
if sys.version_info < (3, 0):
msgtxt = ('Please upgrade to Python 3.'
'\nPython {} is not supported!')
msgtxt = msgtxt.format(pyversion)
logger.error(msgtxt)
raise SystemExit(msgtxt)
[docs]class OutputBase(metaclass=ABCMeta):
"""A generic class for handling output.
Attributes
----------
formatter : object like py:class:`.OutputFormatter`
The object responsible for formatting output.
target : string
Determines where the target for the output, for
instance "screen" or "file".
first_write : boolean
Determines if we have written something yet, or
if this is the first write.
"""
target = None
[docs] def __init__(self, formatter):
"""Create the object and attach a formatter."""
self.formatter = formatter
self.first_write = True
[docs] def output(self, step, data):
"""Use the formatter to write data to the file.
Parameters
----------
step : int
The current step number.
data : list
The data we are going to output.
"""
if self.first_write and self.formatter.print_header:
self.first_write = False
self.write(self.formatter.header)
for line in self.formatter.format(step, data):
self.write(line)
[docs] @abstractmethod
def write(self, towrite, end='\n'):
"""Write a string to the output defined by this class.
Parameters
----------
towrite : string
The string to write.
end : string, optional
A "terminator" for the given string.
Returns
-------
status : boolean
True if we managed to write, False otherwise.
"""
return
[docs] def __str__(self):
"""Return basic info."""
return '{}\n\t* Formatter: {}'.format(self.__class__.__name__,
self.formatter)
[docs]def create_empty_ensembles(settings):
"""Create missing ensembles in the settings.
Checks the input and allocate it to the right ensemble. In theory
inouts shall include all these info, but it is not practical.
Parameters
----------
settings : dict
The current input settings.
Returns
-------
None, but this method might add data to the input settings.
"""
ints = settings['simulation']['interfaces']
# Determine how many ensembles are needed.
# (In PyRETIS 2 the flux and zero ensemble are always considered)
add0 = 1
# if some ensembles have inputs, they need to be kept.
if 'ensemble' in settings:
orig_set = settings['ensemble'].copy()
else:
orig_set = []
settings['ensemble'] = []
add = add0
# if in the main settings an ensemble_number is defined, then only
# that ensemble will be considered.
if 'tis' in settings:
idx = settings['tis'].get('ensemble_number')
if idx is not None:
settings['ensemble'].append({'interface': ints[1],
'tis': {'ensemble_number': idx}})
for sav in orig_set:
settings['ensemble'][0] = {**settings['ensemble'][0], **sav}
return
# if one wants to compute the flux, the 000 ensemble is for it
# todo remove this labelling mismatch, and give to the flux
# a flux name folder (instead of 000), and leave 000 for the O^+ ens.
settings['ensemble'].append({'interface': ints[0],
'tis': {'ensemble_number': 0}})
for i in range(add, len(ints)):
settings['ensemble'].append({'interface': ints[i - 1],
'tis': {'ensemble_number': i}})
# create the ensembles in setting, keeping eventual inputs.
# nb. in the settings, specific input for an ensemble can be now given.
for i_ens, ens in enumerate(settings['ensemble']):
for sav in orig_set:
if 'tis' in sav and 'ensemble_number' in sav['tis']:
if ens['tis']['ensemble_number'] ==\
sav['tis']['ensemble_number']:
settings['ensemble'][i_ens].update(sav)
elif ens['interface'] == sav['interface']:
settings['ensemble'][i_ens].update(sav)
return