from six import string_types
import numpy
import logging
from abc import abstractmethod
from nbodykit import _global_options
[docs]class FileType(object):
"""
An abstract base class representing a file object.
Users should subclass this class and implement the :func:`read`
function, responsible for reading data from the specific file type.
"""
logger = logging.getLogger("FileType")
def __init__(self, dtype, size):
self.dtype = numpy.dtype(dtype)
self.size = size
[docs] @abstractmethod
def read(self, columns, start, stop, step=1):
"""
Read the specified column(s) over the given range,
returning a structured numpy array
Parameters
----------
columns : str, list of str
the name of the column(s) to return
start : int
the row integer to start reading at
stop : int
the row integer to stop reading at
step : int, optional
the step size to use when reading; default is 1
Returns
-------
data : array_like
a numpy structured array holding the requested data
"""
pass
@property
def columns(self):
"""
A list of the names of the columns in the file.
This defaults to the named fields in the file's :attr:`dtype`
attribute, but differ from this if a view of the file has been
returned with :func:`asarray`
"""
try:
return self._columns
except AttributeError:
return list(self.dtype.names)
@columns.setter
def columns(self, val):
self._columns = val
@property
def ncol(self):
"""
The number of data columns in the file.
"""
return len(self.columns)
@property
def shape(self):
"""
The shape of the file, which defaults to ``(size, )``
Multiple dimensions can be introduced into the shape if
a view of the file has been returned with :func:`asarray`
"""
try:
return self._shape
except AttributeError:
return (self.size,)
@shape.setter
def shape(self, val):
self._shape = val
@property
def ndim(self):
return len(self.shape)
@property
def size(self):
"""
The size of the file, i.e., number of rows
"""
try:
return self._size
except:
name = self.__class__.__name__
raise AttributeError("please set the ``size`` attribute when initializing the '%s' class" %name)
@size.setter
def size(self, val):
self._size = val
@property
def dtype(self):
"""
A :class:`numpy.dtype` object holding the data types of each column in the file.
"""
try:
return self._dtype
except:
name = self.__class__.__name__
raise AttributeError("please set the ``dtype`` attribute when initializing the '%s' class" %name)
@dtype.setter
def dtype(self, val):
self._dtype = val
def __len__(self):
return self.size
def __iter__(self):
return iter(self.keys())
def __repr__(self):
args = (self.__class__.__name__, self.path, self.dataset if hasattr(self, 'dataset') else "None", self.ncol, self.shape)
return "%s(path=%s, dataset=%s, ncolumns=%d, shape=%s>" % args
def __contains__(self, col):
return col in self.columns
[docs] def keys(self):
"""
Aliased function to return :attr:`columns`
"""
return list(self.columns)
[docs] def __getitem__(self, s):
"""
This function provides numpy-like array indexing of the file object.
It supports:
#. integer, slice-indexing similar to arrays
#. string indexing using column names in :func:`keys`
#. array-like indexing using integer lists or boolean arrays
.. note::
If a single column is being returned, a numpy array
holding the data is returned, rather than a structured
array with only a single field.
"""
# dont call asarray unless we have a single string index
asarray = False
if isinstance(s, string_types):
s = [s]
asarray = True
# if index is a list, it should contain a series of column names
# this will return a "view" of the file, slicing the data type
# to include only the requested columns
if isinstance(s, list) and all(isinstance(k, string_types) for k in s):
# empty slice
if not len(s):
raise IndexError("no columns selected in slice")
# crash if the dtype has no fields
if self.dtype.names is None:
raise IndexError(("cannot access view of specific columns after `asarray()` "
"has been called; use integer array indexing instead"))
# all strings must be valid column names
if not all(ss in self.keys() for ss in s):
invalid = [col for col in s if s not in self.keys()]
raise IndexError("invalid string keys: %s; run keys() for valid options" %str(invalid))
# create a new object, with slice of dtype
# FIXME: the new object shall be a different type.
obj = object.__new__(self.__class__)
obj.dtype = numpy.dtype([(col, self.dtype[col]) for col in s])
obj.size = self.size
# set the owner of the underlying memory
if getattr(self, 'base', None) is not None:
obj.base = self.base
else:
obj.base = self
# return the single numpy array if only a
# single column was asked for
if len(s) == 1 and asarray: obj = obj.asarray()
return obj
# tuple for indices in multiple dimensions
# this can either be of length 1 or 2
second_axis_index = None
if isinstance(s, tuple):
# verify the tuple shape
if len(s) > len(self.shape):
args = len(self.shape), len(s)
raise IndexError("file dimension is %d, but you supplied tuple of length %d" %args)
if len(s) == 1:
s = s[0]
elif len(s) == 2:
s, second_axis_index = s
else:
raise IndexError("tuple index '%s' not understood" %str(s))
# call the read function over the desired row range
# if we don't own memory, return from 'base' attribute
if getattr(self, 'base', None) is None:
memown = self
else:
memown = self.base
# a list here means we are dealing with array-like indexing
if isinstance(s, list):
s = numpy.array(s)
# do array-like indexing
if isinstance(s, numpy.ndarray):
# make all integers indexing positive
if s.dtype == numpy.integer:
s[s < 0] += len(self)
# read the full desired slice in consecutive chunks
toret = numpy.concatenate([memown.read(self.keys(),*sl) for sl in find_slice_chunks(s)])
# slice contiguous chunk via (start, stop, step)
else:
# input is integer
if isinstance(s, int):
if s < 0: s += self.size
start, stop, step = s, s+1, 1
# input is a slice
elif isinstance(s, slice):
start, stop, step = s.indices(self.size)
else:
raise IndexError("index '%s' not understood - should be an integer or slice" %str(s))
# call the read function over the desired row range
toret = memown.read(self.keys(), start, stop, step)
# if file has no named fields, then
# try to view the output as a single numpy array
if len(self.dtype) == 0:
try:
# FIXME: shall we ensure read always returns C-contiguous?
toret = numpy.ascontiguousarray(toret).view(self.dtype)
if len(self.shape) > 1:
toret = toret.reshape((-1, self.shape[1]))
except Exception as e:
raise ValueError("error trying to view slice as a single numpy array: %s" %str(e))
# if we have an index for the second dimension
# then slice the return value
if second_axis_index is not None:
toret = toret[:,second_axis_index]
return toret
[docs] def asarray(self):
"""
Return a view of the file, where the fields of the
structured array are stacked in columns of a single
numpy array
Examples
--------
Start with a file object with three named columns,
``ra``, ``dec``, and ``z``
>>> ff.dtype
dtype([('ra', '<f4'), ('dec', '<f4'), ('z', '<f4')])
>>> ff.shape
(1000,)
>>> ff.columns
['ra', 'dec', 'z']
>>> ff[:3]
array([(235.63442993164062, 59.39099884033203, 0.6225500106811523),
(140.36181640625, -1.162310004234314, 0.5026500225067139),
(129.96627807617188, 45.970130920410156, 0.4990200102329254)],
dtype=(numpy.record, [('ra', '<f4'), ('dec', '<f4'), ('z', '<f4')]))
Select a subset of columns and switch the ordering
and convert output to a single numpy array
>>> x = ff[['dec', 'ra']].asarray()
>>> x.dtype
dtype('float32')
>>> x.shape
(1000, 2)
>>> x.columns
['dec', 'ra']
>>> x[:3]
array([[ 59.39099884, 235.63442993],
[ -1.16231 , 140.36181641],
[ 45.97013092, 129.96627808]], dtype=float32)
Now, select only the first column (``dec``)
>>> dec = x[:,0]
>>> dec[:3]
array([ 59.39099884, -1.16231 , 45.97013092], dtype=float32)
Returns
-------
FileType :
a file object that will return a numpy array with
the columns representing the fields
"""
# no named fields --> crash
if not len(self.dtype):
raise ValueError("no named dtype fields to convert to numpy array")
# multiple vector dtypes --> crash
if len(self.dtype) > 1 and any(len(self.dtype[col].shape) for col in self.dtype.names):
raise ValueError("cannot convert multiple vector data types to numpy array")
# different dtypes --> crash
if any(self.dtype[col].base != self.dtype[0].base for col in self.dtype.names):
raise ValueError("cannot convert data types of different types to single numpy array")
# create the new object
# FIXME: the new object shall be a different type.
obj = object.__new__(self.__class__)
# the second axis of the shape
if len(self.dtype) == 1:
subshape = self.dtype[0].shape
else:
subshape = (len(self.dtype),)
obj.dtype = self.dtype[0].base
obj.columns = list(self.columns)
obj.shape = (self.size, ) + subshape
obj.size = self.size
if getattr(self, 'base', None) is not None:
obj.base = self.base
else:
obj.base = self
return obj
[docs] def get_dask(self, column, blocksize=None):
"""
Return the specified column as a dask array, which
delays the explicit reading of the data until
:meth:`dask.compute` is called
The dask array is chunked into blocks of size `blocksize`
Parameters
----------
column : str
the name of the column to return
blocksize : int, optional
the size of the chunks in the dask array
Returns
-------
:class:`dask.array.Array` :
the dask array holding the column, which computes the
necessary functions to read the data, but delays evaluating
until the user specifies
"""
if blocksize is None:
blocksize = _global_options['dask_chunk_size']
if column not in self:
raise ValueError("'%s' is not a valid column; run keys() for valid options" %column)
import dask.array as da
return da.from_array(self[column], chunks=blocksize)
[docs]def find_slice_chunks(index):
"""
A generator to yield (start, stop, step) tuples
which will correspond to the input selection index
``index`` can be either a boolen index, or a list of integers
specifying the rows to include
Parameters
----------
index : array_like
either a boolean array, indicating which rows to select,
or integers specifying which rows to include
Yields
------
(start, stop, step) : tuple of int
the slice integers to read, corresponding to a valid spart of the
selection index
"""
from itertools import groupby
from operator import itemgetter
if isinstance(index, list):
index = numpy.array(index)
# handle boolean index
if index.dtype == '?':
vals, N = zip(*[(k, sum(1 for i in g)) for k,g in groupby(index)])
N = numpy.cumsum(N)
for i, v in enumerate(vals):
if v:
ilow = i-1
if ilow < 0: Nlow = 0
else: Nlow = N[ilow]
yield (Nlow, N[i], 1)
# handle integer index
else:
N = []
for k,g in groupby(enumerate(index), lambda x: x[0]-x[1]):
N.append(list(map(itemgetter(1), g)))
for xx in N:
yield (xx[0], xx[-1]+1, 1)