Source code for pyretis.inout.fileio

# -*- coding: utf-8 -*-
# Copyright (c) 2019, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Module defining the base classes for the PyRETIS output.

Important classes defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

FileIO (:py:class:`.FileIO`)
    A generic class for handling input & output with files.

Important methods defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

read_some_lines (:py:func:`.read_some_lines`)
    Method to read lines from PyRETIS data files.

"""
from datetime import datetime
import os
import logging
from pyretis.inout.common import OutputBase, create_backup


logger = logging.getLogger(__name__)  # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())


__all__ = ['FileIO', 'read_some_lines']


[docs]class FileIO(OutputBase): """A generic class for handling IO with files. This class defines how PyRETIS stores and reads data. Formatting is handled by an object like :py:class:`.OutputFormatter` Attributes ---------- filename : string Name (e.g. path) to the file to read or write. file_mode : string Specifies the mode in which the file is opened. backup : boolean Determines the behavior if we want to write to a file that is already existing. fileh : object like :py:class:`io.IOBase` The file handle we are interacting with. last_flush : object like :py:class:`datetime.datetime` The previous time for flushing to the file. FILE_FLUSH : integer The interval for flushing to the file. That is, we will flush if the time since the last flush is larger than this value. Note that this is only checked in relation to writing. """ target = 'file' FILE_FLUSH = 10 # Interval for flushing files in seconds.
[docs] def __init__(self, filename, file_mode, formatter, backup=True): """Set up the file object. Parameters ---------- filename : string The path to the file to open or read. file_mode : string Specifies the mode for opening the file. formatter : object like py:class:`.OutputFormatter` The object responsible for formatting output. backup : boolean, optional Defines how we handle cases where we write to a file which is already existing. """ super().__init__(formatter) self.filename = filename self.file_mode = file_mode if backup not in (True, False): logger.info('Setting backup to default: True') self.backup = True else: self.backup = backup self.fileh = None if self.file_mode.startswith('a') and self.formatter is not None: self.formatter.print_header = False self.last_flush = None
[docs] def open_file_read(self): """Open a file for reading.""" if not self.file_mode.startswith('r'): raise ValueError( ('Inconsistent file mode "{}" ' 'for reading').format(self.file_mode) ) try: self.fileh = open(self.filename, self.file_mode) except (OSError, IOError) as error: logger.critical( 'Could not open file "%s" for reading', self.filename ) logger.critical( 'I/O error ({%d}): {%s}', error.errno, error.strerror ) return self.fileh
[docs] def open_file_write(self): """Open a file for writing. In this method, we also handle the possible backup settings. """ if not self.file_mode[0] in ('a', 'w'): raise ValueError( ('Inconsistent file mode "{}" ' 'for writing').format(self.file_mode) ) msg = [] try: if os.path.isfile(self.filename): msg = '' if self.file_mode.startswith('a'): logger.info( 'Appending to existing file "%s"', self.filename ) else: if self.backup: msg = create_backup(self.filename) logger.debug(msg) else: logger.debug( 'Overwriting existing file "%s"', self.filename ) self.fileh = open(self.filename, self.file_mode) except (OSError, IOError) as error: # pragma: no cover logger.critical( 'Could not open file "%s" for writing', self.filename ) logger.critical( 'I/O error (%d): %d', error.errno, error.strerror ) return self.fileh
[docs] def open(self): """Open a file for reading or writing.""" if self.fileh is not None: logger.debug( '%s asked to open file, but it has already opened a file.', self.__class__.__name__ ) return self.fileh if self.file_mode[0] in ('r',): return self.open_file_read() if self.file_mode[0] in ('a', 'w'): return self.open_file_write() raise ValueError('Unknown file mode "{}"'.format(self.file_mode))
[docs] def load(self): """Read blocks or lines from the file.""" return self.formatter.load(self.filename)
[docs] def write(self, towrite, end='\n'): """Write a string to the file. Parameters ---------- towrite : string The string to output to the file. end : string, optional Appended to `towrite` when writing, can be used to print a new line after the input `towrite`. Returns ------- status : boolean True if we managed to write, False otherwise. """ status = False if towrite is None: return status if self.fileh is not None and not self.fileh.closed: try: if end is not None: self.fileh.write('{}{}'.format(towrite, end)) status = True else: self.fileh.write(towrite) status = True except (OSError, IOError) as error: # pragma: no cover msg = 'Write I/O error ({}): {}'.format(error.errno, error.strerror) logger.critical(msg) if self.last_flush is None: self.flush() self.last_flush = datetime.now() delta = datetime.now() - self.last_flush if delta.total_seconds() > self.FILE_FLUSH: # pragma: no cover self.flush() self.last_flush = datetime.now() return status if self.fileh is not None and self.fileh.closed: logger.warning('Ignored writing to closed file %s', self.filename) if self.fileh is None: logger.critical( 'Attempting to write to empty file handle for file %s', self.filename ) return status
[docs] def close(self): """Close the file.""" if self.fileh is not None and not self.fileh.closed: try: self.flush() finally: self.fileh.close()
[docs] def flush(self): """Flush file buffers to file.""" if self.fileh is not None and not self.fileh.closed: self.fileh.flush() os.fsync(self.fileh.fileno())
[docs] def output(self, step, data): """Open file before first write.""" if self.first_write: self.open() return super().output(step, data)
[docs] def __del__(self): """Close the file in case the object is deleted.""" self.close()
[docs] def __enter__(self): """Context manager for opening the file.""" self.open() return self
[docs] def __exit__(self, *args): """Context manager for closing the file.""" self.close()
[docs] def __iter__(self): """Make it possible to iterate over lines in the file.""" return self
[docs] def __next__(self): """Let the file object handle the iteration.""" if self.fileh is None: raise StopIteration if self.fileh.closed: raise StopIteration return next(self.fileh)
[docs] def __str__(self): """Return basic info.""" msg = ['FileIO (file: "{}")'.format(self.filename)] if self.fileh is not None and not self.fileh.closed: msg += ['\t* File is open'] msg += ['\t* Mode: {}'.format(self.fileh.mode)] msg += ['\t* Formatter: {}'.format(self.formatter)] return '\n'.join(msg)
def _read_line_data(ncol, stripline, line_parser): """Read data for :py:func:`.read_some_lines.`. Parameters ---------- ncol : integer The expected number of columns to read. If this is less than 1 it is not yet set. Note that we skip data which appear inconsistent. A warning will be issued about this. stripline : string The line to read. Note that we assume that leading and trailing spaces have been removed. line_parser : callable A method we use to parse a single line. """ if line_parser is None: # Just return data without any parsing: return stripline, True, ncol try: linedata = line_parser(stripline) except (ValueError, IndexError): return None, False, -1 newcol = len(linedata) if ncol == -1: # first item ncol = newcol if newcol == ncol: return linedata, True, ncol # We assume that this is line is malformed --- skip it! return None, False, -1
[docs]def read_some_lines(filename, line_parser, block_label='#'): """Open a file and try to read as many lines as possible. This method will read a file using the given `line_parser`. If the given `line_parser` fails at a line in the file, `read_some_lines` will stop here. Further, this method will read data in blocks and yield a block when a new block is found. A special string (`block_label`) is assumed to identify the start of blocks. Parameters ---------- filename : string This is the name/path of the file to open and read. line_parser : function, optional This is a function which knows how to translate a given line to a desired internal format. If not given, a simple float will be used. block_label : string, optional This string is used to identify blocks. Yields ------ data : list The data read from the file, arranged in dicts. """ ncol = -1 # The number of columns new_block = {'comment': [], 'data': []} yield_block = False read_comment = False with open(filename, 'r') as fileh: for i, line in enumerate(fileh): stripline = line.strip() if stripline.startswith(block_label): # this is a comment, then a new block will follow, # unless this is a multi-line comment. if read_comment: # part of multi-line comment... new_block['comment'].append(stripline) else: if yield_block: # Yield the current block yield_block = False yield new_block new_block = {'comment': [stripline], 'data': []} yield_block = True # Data has been added ncol = -1 read_comment = True else: read_comment = False data, _yieldb, _ncol = _read_line_data(ncol, stripline, line_parser) if data: new_block['data'].append(data) ncol = _ncol yield_block = _yieldb else: logger.warning('Skipped malformed data in "%s", line: %i', filename, i) # if the block has not been yielded, yield it if yield_block: yield_block = False yield new_block