# -*- coding: utf-8 -*-
# Copyright (c) 2019, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Module defining the base classes for the PyRETIS output.
Important classes defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
FileIO (:py:class:`.FileIO`)
A generic class for handling input & output with files.
Important methods defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
read_some_lines (:py:func:`.read_some_lines`)
Method to read lines from PyRETIS data files.
"""
from datetime import datetime
import os
import logging
from pyretis.inout.common import OutputBase, create_backup
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())
__all__ = ['FileIO', 'read_some_lines']
[docs]class FileIO(OutputBase):
"""A generic class for handling IO with files.
This class defines how PyRETIS stores and reads data.
Formatting is handled by an object like :py:class:`.OutputFormatter`
Attributes
----------
filename : string
Name (e.g. path) to the file to read or write.
file_mode : string
Specifies the mode in which the file is opened.
backup : boolean
Determines the behavior if we want to write to a file
that is already existing.
fileh : object like :py:class:`io.IOBase`
The file handle we are interacting with.
last_flush : object like :py:class:`datetime.datetime`
The previous time for flushing to the file.
FILE_FLUSH : integer
The interval for flushing to the file. That is, we will
flush if the time since the last flush is larger than this
value. Note that this is only checked in relation to writing.
"""
target = 'file'
FILE_FLUSH = 10 # Interval for flushing files in seconds.
[docs] def __init__(self, filename, file_mode, formatter, backup=True):
"""Set up the file object.
Parameters
----------
filename : string
The path to the file to open or read.
file_mode : string
Specifies the mode for opening the file.
formatter : object like py:class:`.OutputFormatter`
The object responsible for formatting output.
backup : boolean, optional
Defines how we handle cases where we write to a
file which is already existing.
"""
super().__init__(formatter)
self.filename = filename
self.file_mode = file_mode
if backup not in (True, False):
logger.info('Setting backup to default: True')
self.backup = True
else:
self.backup = backup
self.fileh = None
if self.file_mode.startswith('a') and self.formatter is not None:
self.formatter.print_header = False
self.last_flush = None
[docs] def open_file_read(self):
"""Open a file for reading."""
if not self.file_mode.startswith('r'):
raise ValueError(
('Inconsistent file mode "{}" '
'for reading').format(self.file_mode)
)
try:
self.fileh = open(self.filename, self.file_mode)
except (OSError, IOError) as error:
logger.critical(
'Could not open file "%s" for reading', self.filename
)
logger.critical(
'I/O error ({%d}): {%s}', error.errno, error.strerror
)
return self.fileh
[docs] def open_file_write(self):
"""Open a file for writing.
In this method, we also handle the possible backup settings.
"""
if not self.file_mode[0] in ('a', 'w'):
raise ValueError(
('Inconsistent file mode "{}" '
'for writing').format(self.file_mode)
)
msg = []
try:
if os.path.isfile(self.filename):
msg = ''
if self.file_mode.startswith('a'):
logger.info(
'Appending to existing file "%s"', self.filename
)
else:
if self.backup:
msg = create_backup(self.filename)
logger.debug(msg)
else:
logger.debug(
'Overwriting existing file "%s"', self.filename
)
self.fileh = open(self.filename, self.file_mode)
except (OSError, IOError) as error: # pragma: no cover
logger.critical(
'Could not open file "%s" for writing', self.filename
)
logger.critical(
'I/O error (%d): %d', error.errno, error.strerror
)
return self.fileh
[docs] def open(self):
"""Open a file for reading or writing."""
if self.fileh is not None:
logger.debug(
'%s asked to open file, but it has already opened a file.',
self.__class__.__name__
)
return self.fileh
if self.file_mode[0] in ('r',):
return self.open_file_read()
if self.file_mode[0] in ('a', 'w'):
return self.open_file_write()
raise ValueError('Unknown file mode "{}"'.format(self.file_mode))
[docs] def load(self):
"""Read blocks or lines from the file."""
return self.formatter.load(self.filename)
[docs] def write(self, towrite, end='\n'):
"""Write a string to the file.
Parameters
----------
towrite : string
The string to output to the file.
end : string, optional
Appended to `towrite` when writing, can be used to print a
new line after the input `towrite`.
Returns
-------
status : boolean
True if we managed to write, False otherwise.
"""
status = False
if towrite is None:
return status
if self.fileh is not None and not self.fileh.closed:
try:
if end is not None:
self.fileh.write('{}{}'.format(towrite, end))
status = True
else:
self.fileh.write(towrite)
status = True
except (OSError, IOError) as error: # pragma: no cover
msg = 'Write I/O error ({}): {}'.format(error.errno,
error.strerror)
logger.critical(msg)
if self.last_flush is None:
self.flush()
self.last_flush = datetime.now()
delta = datetime.now() - self.last_flush
if delta.total_seconds() > self.FILE_FLUSH: # pragma: no cover
self.flush()
self.last_flush = datetime.now()
return status
if self.fileh is not None and self.fileh.closed:
logger.warning('Ignored writing to closed file %s', self.filename)
if self.fileh is None:
logger.critical(
'Attempting to write to empty file handle for file %s',
self.filename
)
return status
[docs] def close(self):
"""Close the file."""
if self.fileh is not None and not self.fileh.closed:
try:
self.flush()
finally:
self.fileh.close()
[docs] def flush(self):
"""Flush file buffers to file."""
if self.fileh is not None and not self.fileh.closed:
self.fileh.flush()
os.fsync(self.fileh.fileno())
[docs] def output(self, step, data):
"""Open file before first write."""
if self.first_write:
self.open()
return super().output(step, data)
[docs] def __del__(self):
"""Close the file in case the object is deleted."""
self.close()
[docs] def __enter__(self):
"""Context manager for opening the file."""
self.open()
return self
[docs] def __exit__(self, *args):
"""Context manager for closing the file."""
self.close()
[docs] def __iter__(self):
"""Make it possible to iterate over lines in the file."""
return self
[docs] def __next__(self):
"""Let the file object handle the iteration."""
if self.fileh is None:
raise StopIteration
if self.fileh.closed:
raise StopIteration
return next(self.fileh)
[docs] def __str__(self):
"""Return basic info."""
msg = ['FileIO (file: "{}")'.format(self.filename)]
if self.fileh is not None and not self.fileh.closed:
msg += ['\t* File is open']
msg += ['\t* Mode: {}'.format(self.fileh.mode)]
msg += ['\t* Formatter: {}'.format(self.formatter)]
return '\n'.join(msg)
def _read_line_data(ncol, stripline, line_parser):
"""Read data for :py:func:`.read_some_lines.`.
Parameters
----------
ncol : integer
The expected number of columns to read. If this is less than 1
it is not yet set. Note that we skip data which appear
inconsistent. A warning will be issued about this.
stripline : string
The line to read. Note that we assume that leading and
trailing spaces have been removed.
line_parser : callable
A method we use to parse a single line.
"""
if line_parser is None:
# Just return data without any parsing:
return stripline, True, ncol
try:
linedata = line_parser(stripline)
except (ValueError, IndexError):
return None, False, -1
newcol = len(linedata)
if ncol == -1: # first item
ncol = newcol
if newcol == ncol:
return linedata, True, ncol
# We assume that this is line is malformed --- skip it!
return None, False, -1
[docs]def read_some_lines(filename, line_parser, block_label='#'):
"""Open a file and try to read as many lines as possible.
This method will read a file using the given `line_parser`.
If the given `line_parser` fails at a line in the file,
`read_some_lines` will stop here. Further, this method
will read data in blocks and yield a block when a new
block is found. A special string (`block_label`) is assumed to
identify the start of blocks.
Parameters
----------
filename : string
This is the name/path of the file to open and read.
line_parser : function, optional
This is a function which knows how to translate a given line
to a desired internal format. If not given, a simple float
will be used.
block_label : string, optional
This string is used to identify blocks.
Yields
------
data : list
The data read from the file, arranged in dicts.
"""
ncol = -1 # The number of columns
new_block = {'comment': [], 'data': []}
yield_block = False
read_comment = False
with open(filename, 'r') as fileh:
for i, line in enumerate(fileh):
stripline = line.strip()
if stripline.startswith(block_label):
# this is a comment, then a new block will follow,
# unless this is a multi-line comment.
if read_comment: # part of multi-line comment...
new_block['comment'].append(stripline)
else:
if yield_block:
# Yield the current block
yield_block = False
yield new_block
new_block = {'comment': [stripline], 'data': []}
yield_block = True # Data has been added
ncol = -1
read_comment = True
else:
read_comment = False
data, _yieldb, _ncol = _read_line_data(ncol, stripline,
line_parser)
if data:
new_block['data'].append(data)
ncol = _ncol
yield_block = _yieldb
else:
logger.warning('Skipped malformed data in "%s", line: %i',
filename, i)
# if the block has not been yielded, yield it
if yield_block:
yield_block = False
yield new_block