Source code for nbodykit.base.catalog

from nbodykit.transform import ConstantArray
from nbodykit import _global_options, CurrentMPIComm

from six import string_types, add_metaclass
import numpy
import logging
import warnings
import abc
import inspect
import dask.array as da

[docs]class ColumnAccessor(da.Array): """ Provides access to a Column from a Catalog. This is a thin subclass of :class:`dask.array.Array` to provide a reference to the catalog object, an additional ``attrs`` attribute (for recording the reproducible meta-data), and some pretty print support. Due to particularity of :mod:`dask`, any transformation that is not explicitly in-place will return a :class:`dask.array.Array`, and losing the pointer to the original catalog and the meta data attrs. Parameters ---------- catalog : CatalogSource the catalog from which the column was accessed daskarray : dask.array.Array the column in dask array form is_default : bool, optional whether this column is a default column; default columns are not serialized to disk, as they are automatically available as columns """ def __new__(cls, catalog, daskarray, is_default=False): self = da.Array.__new__(ColumnAccessor, dask=daskarray.dask, name=daskarray.name, chunks=daskarray.chunks, dtype=daskarray.dtype, shape=daskarray.shape) self.catalog = catalog self.is_default = is_default self.attrs = {} return self def __getitem__(self, key): # compute dask index b/c they are not fully supported if isinstance(key, da.Array): key = self.catalog.compute(key) # base class behavior d = da.Array.__getitem__(self, key) # return a ColumnAccessor (okay b/c __setitem__ checks for circular references) toret = ColumnAccessor(self.catalog, d) toret.attrs.update(self.attrs) return toret def as_daskarray(self): return da.Array( self.dask, self.name, self.chunks, self.dtype, self.shape)
[docs] @staticmethod def __dask_optimize__(dsk, keys, **kwargs): """ Optimize the dask object. .. note:: The dask default optimizer induces too many (unnecesarry) IO calls. We turn this feature off by default, and only apply a culling. """ from dask.optimization import cull dsk2, dependencies = cull(dsk, keys) return dsk2
[docs] def compute(self): return self.catalog.compute(self)
def __str__(self): r = da.Array.__str__(self) if len(self) > 0: r = r + " first: %s" % str(self[0].compute()) if len(self) > 1: r = r + " last: %s" % str(self[-1].compute()) return r
[docs]def column(name=None, is_default=False): """ Decorator that defines the decorated function as a column in a CatalogSource. This can be used as a decorator with or without arguments. If no ``name`` is specified, the function name is used. Parameters ---------- name : str, optional the name of the column; if not provided, the name of the function being decorated is used is_default : bool, optional whether the column is a default column; default columns are not serialized to disk """ def decorator(getter, name=name): getter.column_name = getter.__name__ if name is None else name getter.is_default = is_default return getter # handle the case when decorator was called without arguments # in that case "name" is actually the function we are decorating if hasattr(name, '__call__'): getter = name return decorator(getter, name=getter.__name__) else: return decorator
[docs]class ColumnFinder(abc.ABCMeta): """ A meta-class that will register all columns of a class that have been marked with the :func:`column` decorator. This adds the following attributes to the class definition: 1. ``_defaults`` : default columns, specified by passing ``default=True`` to the :func:`column` decorator. 2. ``_hardcolumns`` : non-default, hard-coded columns .. note:: This is a subclass of :class:`abc.ABCMeta` so subclasses can define abstract properties, if they need to. """ def __init__(cls, clsname, bases, attrs): # attach the registry attributes cls._defaults = set() cls._hardcolumns = set() # loop over class and its bases classes = inspect.getmro(cls) for c in reversed(classes): # loop over each attribute for name in c.__dict__: value = c.__dict__[name] # if it's member function implementing a column, # record it and check if its a default if getattr(value, 'column_name', None): if value.is_default: cls._defaults.add(value.column_name) else: cls._hardcolumns.add(value.column_name)
[docs]@add_metaclass(ColumnFinder) class CatalogSourceBase(object): """ An abstract base class that implements most of the functionality in :class:`CatalogSource`. The main difference between this class and :class:`CatalogSource` is that this base class does not assume the object has a :attr:`size` attribute. .. note:: See the docstring for :class:`CatalogSource`. Most often, users should implement custom sources as subclasses of :class:`CatalogSource`. The names of hard-coded columns, i.e., those defined through member functions of the class, are stored in the :attr:`_defaults` and :attr:`_hardcolumns` attributes. These attributes are computed by the :class:`ColumnFinder` meta-class. Parameters ---------- comm : the MPI communicator to use for this object """ logger = logging.getLogger('CatalogSourceBase')
[docs] @staticmethod def make_column(array): """ Utility function to convert an array-like object to a :class:`dask.array.Array`. .. note:: The dask array chunk size is controlled via the ``dask_chunk_size`` global option. See :class:`~nbodykit.set_options`. Parameters ---------- array : array_like an array-like object; can be a dask array, numpy array, ColumnAccessor, or other non-scalar array-like object Returns ------- :class:`dask.array.Array` : a dask array initialized from ``array`` """ if isinstance(array, da.Array): return array elif isinstance(array, ColumnAccessor): # important to get the accessor as a dask array to avoid circular # references return array.as_daskarray() else: return da.from_array(array, chunks=_global_options['dask_chunk_size'])
@staticmethod def create_instance(cls, comm): obj = object.__new__(cls) CatalogSourceBase.__init__(obj, comm) return obj def __init__(self, comm): # user-provided overrides and defaults for columns self._overrides = {} # stores memory owner self.base = None self.comm = comm
[docs] def __finalize__(self, other): """ Finalize the creation of a CatalogSource object by copying over any additional attributes from a second CatalogSource. The idea here is to only copy over attributes that are similar to meta-data, so we do not copy some of the core attributes of the :class:`CatalogSource` object. Parameters ---------- other : the second object to copy over attributes from; it needs to be a subclass of CatalogSourcBase for attributes to be copied Returns ------- CatalogSource : return ``self``, with the added attributes """ if isinstance(other, CatalogSourceBase): d = other.__dict__.copy() nocopy = ['base', '_overrides', '_hardcolumns', '_defaults', 'comm', '_size', '_csize'] for key in d: if key not in nocopy: self.__dict__[key] = d[key] return self
def __iter__(self): return iter(self.columns) def __contains__(self, col): return col in self.columns def _get_slice(self, index): """ Select a subset of ``self`` according to a boolean index array. Returns a new object of the same type as ``self`` holding only the data that satisfies the slice index. Parameters ---------- index : array_like either a dask or numpy boolean array; this determines which rows are included in the returned object """ if index is Ellipsis: return self elif isinstance(index, slice): start, stop, step = index.indices(self.size) # from https://stackoverflow.com/a/36188683 size = max(0, (stop - start + (step - (1 if step > 0 else -1))) // step) else: # compute the index slice if needed and get the size index = CatalogSourceBase.make_column(index) if index.dtype == numpy.dtype('?'): # verify the index is a boolean array if len(index) != self.size: raise KeyError("slice index has length %d; should be %d" %(len(index), self.size)) # new size is just number of True entries size = index.sum().compute() else: if len(index) > 0 and index.dtype != numpy.integer: raise KeyError("slice index has must be boolean, integer. got %s" %(index.dtype)) size = len(index) # initialize subset Source of right size subset_data = {col:self[col][index] for col in self if not self[col].is_default} if size <= 0.51 * self.size: # if the subsample ratio is substential, then always make # a copy to decouple from the original data subset_data = {col:subset_data[col].map_blocks(numpy.copy) for col in subset_data} cls = self.__class__ if self.base is None else self.base.__class__ toret = cls._from_columns(size, self.comm, **subset_data) # attach the needed attributes toret.__finalize__(self) return toret
[docs] def __getitem__(self, sel): """ The following types of indexing are supported: #. strings specifying a column in the CatalogSource; returns a dask array holding the column data #. boolean arrays specifying a slice of the CatalogSource; returns a CatalogSource holding only the revelant slice #. slice object specifying which particles to select #. list of strings specifying column names; returns a CatalogSource holding only the selected columns Notes ----- - Slicing is a **collective** operation - If the :attr:`base` attribute is set, columns will be returned from :attr:`base` instead of from ``self``. """ # handle boolean array slices if not isinstance(sel, string_types): # size must be implemented if getattr(self, 'size', NotImplemented) is NotImplemented: raise ValueError("cannot perform selection due to NotImplemented size") # convert slices to boolean arrays if isinstance(sel, (list, da.Array, numpy.ndarray)): # select a subset of list of string column names if len(sel) > 0 and all(isinstance(ss, string_types) for ss in sel): invalid = set(sel) - set(self.columns) if len(invalid): msg = "cannot select subset of columns from " msg += "CatalogSource due to invalid columns: %s" % str(invalid) raise KeyError(msg) # return a CatalogSource only holding the selected columns subset_data = {col:self[col] for col in sel} toret = CatalogSource._from_columns(self.size, self.comm, **subset_data) toret.attrs.update(self.attrs) return toret return self._get_slice(sel) else: # owner of the memory (either self or base) if self.base is None: # get the right column is_default = False if sel in self._overrides: r = self._overrides[sel] elif sel in self.hardcolumns: r = self.get_hardcolumn(sel) elif sel in self._defaults: r = getattr(self, sel)() is_default = True else: raise KeyError("column `%s` is not defined in this source; " %sel + \ "try adding column via `source[column] = data`") # return a ColumnAccessor for pretty prints return ColumnAccessor(self, r, is_default=is_default) else: # chain to the memory owner # this will not work if there are overrides return self.base.__getitem__(sel)
[docs] def __setitem__(self, col, value): """ Add new columns to the CatalogSource, overriding any existing columns with the name ``col``. .. note:: If the :attr:`base` attribute is set, columns will be added to :attr:`base` instead of to ``self``. """ if self.base is not None: return self.base.__setitem__(col, value) self._overrides[col] = self.make_column(value)
[docs] def __delitem__(self, col): """ Delete a column; cannot delete a "hard-coded" column. .. note:: If the :attr:`base` attribute is set, columns will be deleted from :attr:`base` instead of from ``self``. """ if self.base is not None: return self.base.__delitem__(col) if col not in self.columns: raise ValueError("no such column, cannot delete it") if col in self.hardcolumns: raise ValueError("cannot delete a hard-coded column") if col in self._overrides: del self._overrides[col] return raise ValueError("unable to delete column '%s' from CatalogSource" %col)
@property def attrs(self): """ A dictionary storing relevant meta-data about the CatalogSource. """ try: return self._attrs except AttributeError: self._attrs = {} return self._attrs @property def hardcolumns(self): """ A list of the hard-coded columns in the CatalogSource. These columns are usually member functions marked by ``@column`` decorator. Subclasses may override this method and use :func:`get_hardcolumn` to bypass the decorator logic. .. note:: If the :attr:`base` attribute is set, the value of ``base.hardcolumns`` will be returned. """ if self.base is not None: return self.base.hardcolumns # return the non-default, hard-coded columns, as determined by # ColumnFinder metaclass return sorted(self._hardcolumns) @property def columns(self): """ All columns in the CatalogSource, including those hard-coded into the class's defintion and override columns provided by the user. .. note:: If the :attr:`base` attribute is set, the value of ``base.columns`` will be returned. """ if self.base is not None: return self.base.columns overrides = list(self._overrides) defaults = list(self._defaults) return sorted(set(self.hardcolumns + overrides + defaults))
[docs] def copy(self): """ Return a shallow copy of the object, where each column is a reference of the corresponding column in ``self``. .. note:: No copy of data is made. .. note:: This is different from view in that the attributes dictionary of the copy no longer related to ``self``. Returns ------- CatalogSource : a new CatalogSource that holds all of the data columns of ``self`` """ # a new empty object with proper size toret = CatalogSourceBase.create_instance(self.__class__, comm=self.comm) toret._size = self.size toret._csize = self.csize # attach attributes from self and return toret = toret.__finalize__(self) # finally, add the data columns from self for col in self.columns: toret[col] = self[col] # copy the attributes too, so they become decoupled # this is different from view. toret._attrs = self._attrs.copy() return toret
[docs] def get_hardcolumn(self, col): """ Construct and return a hard-coded column. These are usually produced by calling member functions marked by the ``@column`` decorator. Subclasses may override this method and the hardcolumns attribute to bypass the decorator logic. .. note:: If the :attr:`base` attribute is set, ``get_hardcolumn()`` will called using :attr:`base` instead of ``self``. """ if self.base is not None: return self.base.get_hardcolumn(col) if col in self._hardcolumns: return getattr(self, col)() else: raise ValueError("no such hard-coded column %s" %col)
[docs] def compute(self, *args, **kwargs): """ Our version of :func:`dask.compute` that computes multiple delayed dask collections at once. This should be called on the return value of :func:`read` to converts any dask arrays to numpy arrays. . note:: If the :attr:`base` attribute is set, ``compute()`` will called using :attr:`base` instead of ``self``. Parameters ----------- args : object Any number of objects. If the object is a dask collection, it's computed and the result is returned. Otherwise it's passed through unchanged. """ import dask # return the base compute if it exists if self.base is not None: return self.base.compute(*args, **kwargs) toret = dask.compute(*args, **kwargs) # do not return tuples of length one if len(toret) == 1: toret = toret[0] return toret
[docs] def save(self, output, columns=None, dataset=None, datasets=None, header='Header', compute=True): """ Save the CatalogSource to a :class:`bigfile.BigFile`. Only the selected columns are saved and :attr:`attrs` are saved in ``header``. The attrs of columns are stored in the datasets. Parameters ---------- output : str the name of the file to write to columns : list of str the names of the columns to save in the file, or None to use all columns dataset : str, optional dataset to store the columns under. datasets : list of str, optional names for the data set where each column is stored; defaults to the name of the column (deprecated) header : str, optional, or None the name of the data set holding the header information, where :attr:`attrs` is stored if header is None, do not save the header. compute : boolean, default True if True, wait till the store operations finish if False, return a dictionary with column name and a future object for the store. use dask.compute() to wait for the store operations on the result. """ import bigfile import json from nbodykit.utils import JSONEncoder # trim out any default columns; these do not need to be saved as # they are automatically available to every Catalog if columns is None: columns = self.columns columns = [col for col in columns if not self[col].is_default] # also make sure no default columns in datasets if datasets is not None: import warnings warnings.warn("datasets argument is deprecated. Specify a single dataset prefix for all columns instead.") else: if dataset is not None: datasets = [ dataset + '/' + col for col in columns ] else: datasets = columns if len(datasets) != len(columns): raise ValueError("`datasets` must have the same length as `columns`") # FIXME: merge this logic into bigfile # the slice writing support in bigfile 0.1.47 does not # support tuple indices. class _ColumnWrapper: def __init__(self, bb): self.bb = bb def __setitem__(self, sl, value): assert len(sl) <= 2 # no array shall be of higher dimension. # use regions argument to pick the offset. start, stop, step = sl[0].indices(self.bb.size) assert step == 1 if len(sl) > 1: start1, stop1, step1 = sl[1].indices(value.shape[1]) assert step1 == 1 assert start1 == 0 assert stop1 == value.shape[1] self.bb.write(start, value) with bigfile.FileMPI(comm=self.comm, filename=output, create=True) as ff: sources = [] targets = [] regions = [] # save meta data and create blocks, prepare for the write. for column, dataset in zip(columns, datasets): array = self[column] # ensure data is only chunked in the first dimension size = self.comm.allreduce(len(array)) offset = numpy.sum(self.comm.allgather(len(array))[:self.comm.rank], dtype='i8') # sane value -- 32 million items per physical file sizeperfile = 32 * 1024 * 1024 Nfile = (size + sizeperfile - 1) // sizeperfile dtype = numpy.dtype((array.dtype, array.shape[1:])) # save column attrs too # first create the block on disk with ff.create(dataset, dtype, size, Nfile) as bb: if hasattr(array, 'attrs'): for key in array.attrs: bb.attrs[key] = array.attrs[key] # first then open it for writing bb = ff.open(dataset) # ensure only the first dimension is chunked # because bigfile only support writing with slices in first dimension. rechunk = dict([(ind, -1) for ind in range(1, array.ndim)]) array = array.rechunk(rechunk) targets.append(_ColumnWrapper(bb)) sources.append(array) regions.append((slice(offset, offset + len(array)),)) # writer header afterwards, such that header can be a block that saves # data. if header is not None: try: bb = ff.open(header) except: bb = ff.create(header) with bb : for key in self.attrs: try: bb.attrs[key] = self.attrs[key] except ValueError: try: json_str = 'json://'+json.dumps(self.attrs[key], cls=JSONEncoder) bb.attrs[key] = json_str except: raise ValueError("cannot save '%s' key in attrs dictionary" % key) # lock=False to avoid dask from pickling the lock with the object. if compute: # write blocks one by one for column, source, target, region in zip(columns, sources, targets, regions): if self.comm.rank == 0: self.logger.info("started writing column %s" % column) source.store(target, regions=region, lock=False, compute=True) target.bb.close() if self.comm.rank == 0: self.logger.info("finished writing column %s" % column) future = None else: # return a future that writes all blocks at the same time. # Note that must pass in lists, not tuples or da.store is confused. # c.f https://github.com/dask/dask/issues/4393 future = da.store(sources, targets, regions=regions, lock=False, compute=False) return future
[docs] def read(self, columns): """ Return the requested columns as dask arrays. Parameters ---------- columns : list of str the names of the requested columns Returns ------- list of :class:`dask.array.Array` : the list of column data, in the form of dask arrays """ missing = set(columns) - set(self.columns) if len(missing) > 0: msg = "source does not contain columns: %s; " %str(missing) msg += "try adding columns via `source[column] = data`" raise ValueError(msg) return [self[col] for col in columns]
[docs] def view(self, type=None): """ Return a "view" of the CatalogSource object, with the returned type set by ``type``. This initializes a new empty class of type ``type`` and attaches attributes to it via the :func:`__finalize__` mechanism. Parameters ---------- type : Python type the desired class type of the returned object. """ # an empty class type = self.__class__ if type is None else type obj = CatalogSourceBase.create_instance(type, comm=self.comm) # propagate the size attributes obj._size = self.size obj._csize = self.csize # the new object's base points to self obj.base = self # attach the necessary attributes from self return obj.__finalize__(self)
[docs] def to_subvolumes(self, domain=None, position='Position', columns=None): """ Domain Decompose a catalog, sending items to the ranks according to the supplied domain object. Using the `position` column as the Position. This will read in the full position array and all of the requested columns. Parameters ---------- domain : :class:`pmesh.domain.GridND` object, or None The domain to distribute the catalog. If None, try to evenly divide spatially. An easiest way to find a domain object is to use `pm.domain`, where `pm` is a :class:`pmesh.pm.ParticleMesh` object. position : string_like column to use to compute the position. columns: list of string_like columns to include in the new catalog, if not supplied, all catalogs will be exchanged. Returns ------- CatalogSource A decomposed catalog source, where each rank only contains objects belongs to the rank as claimed by the domain object. `self.attrs` are carried over as a shallow copy to the returned object. """ from nbodykit.source.catalog import SubVolumesCatalog return SubVolumesCatalog(self, domain=domain, position=position, columns=columns)
[docs] def to_mesh(self, Nmesh=None, BoxSize=None, dtype='f4', interlaced=False, compensated=False, resampler='cic', weight='Weight', value='Value', selection='Selection', position='Position', window=None): """ Convert the CatalogSource to a MeshSource, using the specified parameters. Parameters ---------- Nmesh : int, optional the number of cells per side on the mesh; must be provided if not stored in :attr:`attrs` BoxSize : scalar, 3-vector, optional the size of the box; must be provided if not stored in :attr:`attrs` dtype : string, optional the data type of the mesh array interlaced : bool, optional use the interlacing technique of Sefusatti et al. 2015 to reduce the effects of aliasing on Fourier space quantities computed from the mesh compensated : bool, optional whether to correct for the resampler window introduced by the grid interpolation scheme resampler : str, optional the string specifying which resampler interpolation scheme to use; see `pmesh.resampler.methods` weight : str, optional the name of the column specifying the weight for each particle value: str, optional the name of the column specifying the field value for each particle selection : str, optional the name of the column that specifies which (if any) slice of the CatalogSource to take position : str, optional the name of the column that specifies the position data of the objects in the catalog window : str, deprecated use resampler instead. Returns ------- mesh : CatalogMesh a mesh object that provides an interface for gridding particle data onto a specified mesh """ from nbodykit.source.mesh import CatalogMesh from pmesh.window import methods if window is not None: resampler = window import warnings warnings.warn("The window argument is deprecated. Use `resampler=` instead", DeprecationWarning, stacklevel=2) # make sure all of the columns exist for col in [weight, selection]: if col not in self: raise ValueError("column '%s' missing; cannot create mesh" %col) if resampler not in methods: raise ValueError("valid resampler: %s" %str(methods)) if BoxSize is None: try: BoxSize = self.attrs['BoxSize'] except KeyError: raise ValueError(("cannot convert particle source to a mesh; " "'BoxSize' keyword is not supplied and the CatalogSource " "does not define one in 'attrs'.")) if Nmesh is None: try: Nmesh = self.attrs['Nmesh'] except KeyError: raise ValueError(("cannot convert particle source to a mesh; " "'Nmesh' keyword is not supplied and the CatalogSource " "does not define one in 'attrs'.")) return CatalogMesh(self, Nmesh=Nmesh, BoxSize=BoxSize, dtype=dtype, Weight=self[weight], Selection=self[selection], Value=self[value], Position=self[position], interlaced=interlaced, compensated=compensated, resampler=resampler)
[docs]class CatalogSource(CatalogSourceBase): """ An abstract base class representing a catalog of discrete particles. This objects behaves like a structured numpy array -- it must have a well-defined size when initialized. The ``size`` here represents the number of particles in the source on the local rank. The information about each particle is stored as a series of columns in the format of dask arrays. These columns can be accessed in a dict-like fashion. All subclasses of this class contain the following default columns: #. ``Weight`` #. ``Value`` #. ``Selection`` For a full description of these default columns, see :ref:`the documentation <catalog-source-default-columns>`. .. important:: Subclasses of this class must set the ``_size`` attribute. Parameters ---------- comm : the MPI communicator to use for this object """ logger = logging.getLogger('CatalogSource') @classmethod def _from_columns(cls, size, comm, **columns): """ An internal constructor to create a CatalogSource (or subclass) from a set of columns. This method is used internally by nbodykit to create views of catalogs based on existing catalogs. Use :class:`~nbodykit.source.catalog.array.ArrayCatalog` to adapt a structured array or dictionary to a CatalogSource. .. note:: The returned object is of type ``cls`` and the only attributes set for the returned object are :attr:`size` and :attr:`csize`. """ # the new empty object to return obj = CatalogSourceBase.create_instance(cls, comm=comm) # compute the sizes obj._size = size obj._csize = obj.comm.allreduce(obj._size) # add the columns in for name in columns: obj[name] = columns[name] return obj def __init__(self, comm): CatalogSourceBase.__init__(self, comm) # if size is implemented, compute the csize if self.size is not NotImplemented: self._csize = self.comm.allreduce(self.size) else: self._csize = NotImplemented def __repr__(self): size = "%d" %self.size if self.size is not NotImplemented else "NotImplemented" return "%s(size=%s)" %(self.__class__.__name__, size)
[docs] def __len__(self): """ The local size of the CatalogSource on a given rank. """ return self.size
[docs] def __setitem__(self, col, value): """ Add columns to the CatalogSource, overriding any existing columns with the name ``col``. """ # handle scalar values if numpy.isscalar(value): assert self.size is not NotImplemented, "size is not implemented! cannot set scalar array" value = ConstantArray(value, self.size, chunks=_global_options['dask_chunk_size']) # check the correct size, if we know the size if self.size is not NotImplemented: args = (col, self.size, len(value)) msg = "error setting '%s' column, data must be array of size %d, not %d" % args assert len(value) == self.size, msg # call the base __setitem__ CatalogSourceBase.__setitem__(self, col, value)
@property def size(self): """ The number of objects in the CatalogSource on the local rank. If the :attr:`base` attribute is set, the ``base.size`` attribute will be returned. .. important:: This property must be defined for all subclasses. """ if self.base is not None: return self.base.size if not hasattr(self, '_size'): return NotImplemented return self._size @size.setter def size(self, value): raise RuntimeError(("Property size is read-only. Internally, ``_size`` " "can be set during initialization.")) @property def csize(self): """ The total, collective size of the CatalogSource, i.e., summed across all ranks. It is the sum of :attr:`size` across all available ranks. If the :attr:`base` attribute is set, the ``base.csize`` attribute will be returned. """ if self.base is not None: return self.base.csize if not hasattr(self, '_csize'): return NotImplemented return self._csize
[docs] def gslice(self, start, stop, end=1, redistribute=True): """ Execute a global slice of a CatalogSource. .. note:: After the global slice is performed, the data is scattered evenly across all ranks. .. note:: The current algorithm generates an index on the root rank and does not scale well. Parameters ---------- start : int the start index of the global slice stop : int the stop index of the global slice step : int, optional the default step size of the global size redistribute : bool, optional if ``True``, evenly re-distribute the sliced data across all ranks, otherwise just return any local data part of the global slice """ from nbodykit.utils import ScatterArray, GatherArray # determine the boolean index corresponding to the slice if self.comm.rank == 0: index = numpy.zeros(self.csize, dtype=bool) index[slice(start, stop, end)] = True else: index = None index = self.comm.bcast(index) # scatter the index back to all ranks counts = self.comm.allgather(self.size) index = ScatterArray(index, self.comm, root=0, counts=counts) # perform the needed local slice subset = self[index] # if we don't want to redistribute evenly, just return the slice if not redistribute: return subset # re-distribute each column from the sliced data # NOTE: currently Gather/ScatterArray requires numpy arrays, but # in principle we could pass dask arrays around between ranks and # avoid compute() calls data = self.compute(*[subset[col] for col in subset]) # gather/scatter each column evendata = {} for i, col in enumerate(subset): alldata = GatherArray(data[i], self.comm, root=0) evendata[col] = ScatterArray(alldata, self.comm, root=0) # return a new CatalogSource holding the evenly distributed data size = len(evendata[col]) toret = self.__class__._from_columns(size, self.comm, **evendata) return toret.__finalize__(self)
[docs] def persist(self, columns=None): """ Return a CatalogSource, where the selected columns are computed and persist in memory. """ import dask.array as da if columns is None: columns = self.columns r = {} for key in columns: r[key] = self[key] r = da.compute(r)[0] # particularity of dask from nbodykit.source.catalog.array import ArrayCatalog c = ArrayCatalog(r, comm=self.comm) c.attrs.update(self.attrs) return c
[docs] def sort(self, keys, reverse=False, usecols=None): """ Return a CatalogSource, sorted globally across all MPI ranks in ascending order by the input keys. Sort columns must be floating or integer type. .. note:: After the sort operation, the data is scattered evenly across all ranks. Parameters ---------- keys : list, tuple the names of columns to sort by. If multiple columns are provided, the data is sorted consecutively in the order provided reverse : bool, optional if ``True``, perform descending sort operations usecols : list, optional the name of the columns to include in the returned CatalogSource """ # single string passed as input if isinstance(keys, string_types): keys = [keys] # no duplicated keys if len(set(keys)) != len(keys): raise ValueError("duplicated sort keys") # all keys must be valid bad = set(keys) - set(self.columns) if len(bad): raise ValueError("invalid sort keys: %s" %str(bad)) # check usecols input if usecols is not None: if isinstance(usecols, string_types): usecols = [usecols] if not isinstance(usecols, (list, tuple)): raise ValueError("usecols should be a list or tuple of column names") bad = set(usecols) - set(self.columns) if len(bad): raise ValueError("invalid column names in usecols: %s" %str(bad)) # sort the data data = _sort_data(self.comm, self, list(keys), reverse=reverse, usecols=usecols) # get a dictionary of data cols = {} for col in data.dtype.names: cols[col] = data[col] # make the new CatalogSource from the data dict size = len(data) # NOTE: if we are slicing by columns too, we must return a bare CatalogSource # we cannot guarantee the consistency of the hard column definitions otherwise if usecols is None: toret = self.__class__._from_columns(size, self.comm, **cols) return toret.__finalize__(self) else: toret = CatalogSource._from_columns(size, self.comm, **cols) toret.attrs.update(self.attrs) return toret
[docs] @column(is_default=True) def Selection(self): """ A boolean column that selects a subset slice of the CatalogSource. By default, this column is set to ``True`` for all particles, and all CatalogSource objects will contain this column. """ return ConstantArray(True, self.size, chunks=_global_options['dask_chunk_size'])
[docs] @column(is_default=True) def Weight(self): """ The column giving the weight to use for each particle on the mesh. The mesh field is a weighted average of ``Value``, with the weights given by ``Weight``. By default, this array is set to unity for all particles, and all CatalogSource objects will contain this column. """ return ConstantArray(1.0, self.size, chunks=_global_options['dask_chunk_size'])
@property def Index(self): """ The attribute giving the global index rank of each particle in the list. It is an integer from 0 to ``self.csize``. Note that slicing changes this index value. """ offset = numpy.sum(self.comm.allgather(self.size)[:self.comm.rank], dtype='intp') # do not use u8, because many numpy casting rules case u8 to f8 automatically. # it is ridiculous. return da.arange(offset, offset + self.size, dtype='i8', chunks=_global_options['dask_chunk_size'])
[docs] @column(is_default=True) def Value(self): """ When interpolating a CatalogSource on to a mesh, the value of this array is used as the Value that each particle contributes to a given mesh cell. The mesh field is a weighted average of ``Value``, with the weights given by ``Weight``. By default, this array is set to unity for all particles, and all CatalogSource objects will contain this column. """ return ConstantArray(1.0, self.size, chunks=_global_options['dask_chunk_size'])
def _sort_data(comm, cat, rankby, reverse=False, usecols=None): """ Sort the input data by the specified columns Parameters ---------- comm : the mpi communicator cat : CatalogSource the catalog holding the data to sort rankby : list of str list of columns to sort by reverse : bool, optional if ``True``, sort in descending order usecols : list, optional only sort these data columns """ import mpsort # determine which columns we need if usecols is None: usecols = cat.columns # remove duplicates from usecols usecols = list(set(usecols)) # the columns we need in the sort steps columns = list(set(rankby)|set(usecols)) # make the data to sort dtype = [('_sortkey', 'i8')] for col in cat: if col in columns: dt = (cat[col].dtype.char,) dt += cat[col].shape[1:] if len(dt) == 1: dt = dt[0] dtype.append((col, dt)) dtype = numpy.dtype(dtype) data = numpy.empty(cat.size, dtype=dtype) for col in columns: data[col] = cat[col] # sort the particles by the specified columns and store the # corrected sorted index for col in reversed(rankby): dt = data.dtype[col] rankby_name = col # make an integer key for floating columns # this assumes the lexial order of float as integer is consistant. if issubclass(dt.type, numpy.float32): data['_sortkey'] = numpy.frombuffer(data[col].tobytes(), dtype='i4') if reverse: data['_sortkey'] *= -1 rankby_name = '_sortkey' elif issubclass(dt.type, numpy.float64): data['_sortkey'] = numpy.frombuffer(data[col].tobytes(), dtype='i8') if reverse: data['_sortkey'] *= -1 rankby_name = '_sortkey' elif not issubclass(dt.type, numpy.integer): args = (col, str(dt)) raise ValueError("cannot sort by column '%s' with dtype '%s'; must be integer or floating type" %args) # do the parallel sort mpsort.sort(data, orderby=rankby_name, comm=comm) return data[usecols]