Source code for nbodykit.io.csv

import numpy
import os
from pandas import read_csv
from six import string_types

from .base import FileType
from . import tools

[docs]class CSVPartition(object): """ A simple class to convert byte strings of data from a CSV file to a pandas DataFrame on demand The DataFrame is cached as :attr:`value`, so only a single call to :func:`pandas.read_csv` is used """
[docs] def __init__(self, filename, offset, blocksize, delimiter, **config): """ Parameters ---------- filename : str the file to read data from offset : int the offset in bytes to start reading at blocksize : int the size of the bytes block to read delimiter : byte str how to distinguish separate lines **config the configuration keywords passed to :func:`pandas.read_csv` """ self.filename = filename self.offset = offset self.blocksize = blocksize self.delimiter = delimiter self.config = config
@property def value(self): """ Return the parsed btye string as a DataFrame """ try: return self._value except AttributeError: from io import BytesIO try: from dask.bytes.core import read_block except ImportError: from dask.bytes.utils import read_block # read the relevant bytes with open(self.filename, 'rb') as f: block = read_block(f, self.offset, self.blocksize, self.delimiter) # parse the byte string b = BytesIO() b.write(block); b.seek(0) self._value = read_csv(b, **self.config) return self._value
[docs]def make_partitions(filename, blocksize, config, delimiter="\n"): """ Partition a CSV file into blocks, using the preferred blocksize in bytes, returning the partititions and number of rows in each partition This divides the input file into partitions with size roughly equal to blocksize, reads the bytes, and counts the number of delimiters to compute the size of each block Parameters ---------- filename : str the name of the CSV file to load blocksize : int the desired number of bytes per block delimiter : str, optional the character separating lines; default is the newline character config : dict any keyword options to pass to :func:`pandas.read_csv` Returns ------- partitions : list of CSVPartition list of objects storing the data content of each file partition, stored as a bytestring sizes : list of int the list of the number of rows in each partition """ try: from dask.bytes.core import read_block except ImportError: from dask.bytes.utils import read_block config = config.copy() # search for lines separated by this character delimiter = delimiter.encode() # size in bytes and byte offsets of each partition size = os.path.getsize(filename) offsets = list(range(0, size, int(blocksize))) # skip blank lines skip_blank_lines = config.get('skip_blank_lines', True) # number of rows to read nrows = config.pop('nrows', None) sizes = []; partitions = [] with open(filename, 'rb') as f: for i, offset in enumerate(offsets): # skiprows only valid for first block if i > 0 and 'skiprows' in config: config.pop('skiprows') # set nrows for this block config['nrows'] = nrows block = read_block(f, offset, blocksize, delimiter) partitions.append(CSVPartition(filename, offset, blocksize, delimiter, **config)) # count delimiter to get size size = block.count(delimiter) # account for blank lines if skip_blank_lines: size -= block.count(delimiter+delimiter) if i == 0 and block.startswith(delimiter): size -= 1 # account for skiprows skiprows = config.get('skiprows', 0) size -= skiprows # account for nrows if nrows is not None and nrows > 0: if nrows < size: sizes.append(nrows) break else: nrows -= size # update for next block # manually increase size if at end of the file and no newline if i == len(offsets)-1 and not block.endswith(delimiter): size += 1 sizes.append(size) return partitions, sizes
[docs]def verify_data(path, names, nrows=10, **config): """ Verify the data by reading the first few lines of the specified CSV file to determine the data type Parameters ---------- path : str the name of the CSV file to load names : list of str the list of the names of the columns in the CSV file nrows : int, optional the number of rows to read from the file in order to infer the data type; default is 10 **config : key, value pairs additional keywords to pass to :func:`pandas.read_csv` Returns ------- dtype : dict dictionary holding the dtype for each name in `names` """ # read the first few lines to get the the dtype try: # first check no columns are missing to avoid silent data loss: # https://github.com/pandas-dev/pandas/issues/26218 config_without_usecols = {} config_without_usecols.update(config) config_without_usecols.pop('usecols', None) df = read_csv(path, nrows=nrows, **config_without_usecols) if len(df.columns) != len(names): raise ValueError("Number of columns does not match, excepting len(names) == %d" % len(df.columns)) df = read_csv(path, nrows=nrows, names=names, **config) if df.isnull().sum().any(): raise ValueError("'NaN' entries found when reading first %d rows; likely configuration error" %nrows) if any(dt == 'O' for dt in df.dtypes): raise ValueError("'object' data types found when reading first %d rows; likely configuration error" %nrows) except: import traceback config['names'] = names msg = ("error trying to read data with pandas.read_csv; ensure that 'names' matches " "the number of columns in the file and the file contains no comments\n") msg += "pandas configuration: %s\n" %str(config) msg += "\n%s" %(traceback.format_exc()) raise ValueError(msg) toret = {} for name in df.columns: toret[name] = df[name].dtype return toret
[docs]class CSVFile(FileType): """ A file object to handle the reading of columns of data from a CSV file. Internally, this class partitions the CSV file into chunks, and data is only read from the relevant chunks of the file, using :func:`pandas.read_csv`. This setup provides a significant speed-up when reading from the end of the file, since the entirety of the data does not need to be read first. The class supports any of the configuration keywords that can be passed to :func:`pandas.read_csv` .. warning:: This assumes the delimiter for separate lines is the newline character and that all columns in the file represent data columns (no "index" column when using ``pandas``) Parameters ---------- path : str the name of the file to load names : list of str the names of the columns of the csv file; this should give names of all the columns in the file -- pass ``usecols`` to select a subset of columns blocksize : int, optional the file will be partitioned into blocks of bytes roughly of this size dtype : dict, str, optional if specified as a string, assume all columns have this dtype, otherwise; each column can have a dtype entry in the dict; if not specified, the data types will be inferred from the file usecols : list, optional a ``pandas.read_csv``; a subset of ``names`` to store, ignoring all other columns delim_whitespace : bool, optional a ``pandas.read_csv`` keyword; if the CSV file is space-separated, set this to ``True`` **config : additional keyword arguments that will be passed to :func:`pandas.read_csv`; see the documentation of that function for a full list of possible options """ def __init__(self, path, names, blocksize=32*1024*1024, dtype={}, usecols=None, delim_whitespace=True, **config): self.path = path self.dataset = "*" self.names = names if usecols is None else usecols self.blocksize = blocksize # ensure that no index column is passed if 'index_col' in config and config['index_col']: raise ValueError("'index_col = False' is not supported in CSVFile") config['index_col'] = False # no index columns in file # manually remove comments if 'comment' in config and config['comment'] is not None: raise ValueError("please manually remove all comments from file") config['comment'] = None # no comments # ensure that no header is passed if 'header' in config and config['header'] is not None: raise ValueError("'header' not equal to None is not supported in CSVFile") config['header'] = None # no header if isinstance(config.get('skiprows', None), list): raise ValueError("only integer values supported for 'skiprows' in CSVFile") if 'skipfooter' in config: raise ValueError("'skipfooter' not supported in CSVFile") # set the read_csv defaults if 'sep' in config or 'delimiter' in config: delim_whitespace = False config['delim_whitespace'] = delim_whitespace config['usecols'] = usecols config.setdefault('engine', 'c') config.setdefault('skip_blank_lines', True) self.pandas_config = config.copy() # verify the data inferred_dtype = verify_data(self.path, names, **self.pandas_config) # dtype can also be a string --> apply to all columns if isinstance(dtype, string_types): dtype = {col:dtype for col in self.names} # store the dtype as a list dtype_ = [] for col in self.names: if col in dtype: dt = dtype[col] if not isinstance(dt, numpy.dtype): dt = numpy.dtype(dt) elif col in inferred_dtype: dt = inferred_dtype[col] else: raise ValueError("data type for column '%s' cannot be inferred from file" %col) dtype_.append((col, dt)) dtype = numpy.dtype(dtype_) # add the dtype and names to the pandas config if config['engine'] == 'c': self.pandas_config['dtype'] = {col:dtype[col] for col in self.names} self.pandas_config['names'] = names # make the partitions self.partitions, self._sizes = make_partitions(path, blocksize, self.pandas_config) size = numpy.sum(self._sizes, dtype='intp') FileType.__init__(self, dtype=dtype, size=int(size))
[docs] def read(self, columns, start, stop, step=1): """ Read the specified column(s) over the given range 'start' and 'stop' should be between 0 and :attr:`size`, which is the total size of the file (in particles) Parameters ---------- columns : str, list of str the name of the column(s) to return start : int the row integer to start reading at stop : int the row integer to stop reading at step : int, optional the step size to use when reading; default is 1 Returns ------- numpy.array structured array holding the requested columns over the specified range of rows """ toret = [] for fnum in tools.get_file_slice(self._sizes, start, stop): # the local slice sl = tools.global_to_local_slice(self._sizes, start, stop, fnum) # access the dataframe of this partition data = self.partitions[fnum].value # slice and convert to a structured array data = data[sl[0]:sl[1]] data = data[columns] toret.append(data.to_records(index=False)) return numpy.concatenate(toret, axis=0)[::step]