from nbodykit.transform import ConstantArray
from nbodykit import _global_options, CurrentMPIComm
from six import string_types
import numpy
import logging
import warnings
import dask.array as da
[docs]class ColumnAccessor(da.Array):
"""
Provides access to a Column from a Catalog
This is a thin subclass of :class:`dask.array.Array` to
provide a reference to the catalog object,
an additional ``attrs`` attribute (for recording the
reproducible meta-data), and some pretty print support.
Due to particularity of :mod:`dask`, any transformation
that is not explicitly in-place will return
a :class:`dask.array.Array`, and losing the pointer to
the original catalog and the meta data attrs.
"""
def __new__(cls, catalog, daskarray):
self = da.Array.__new__(ColumnAccessor,
daskarray.dask,
daskarray.name,
daskarray.chunks,
daskarray.dtype,
daskarray.shape)
self.catalog = catalog
self.attrs = {}
return self
def __getitem__(self, key):
# compute dask index b/c they are not fully supported
if isinstance(key, da.Array):
key = self.catalog.compute(key)
# base class behavior
d = da.Array.__getitem__(self, key)
# return a ColumnAccessor (okay b/c __setitem__ checks for circular references)
toret = ColumnAccessor(self.catalog, d)
toret.attrs.update(self.attrs)
return toret
def as_daskarray(self):
return da.Array(
self.dask,
self.name,
self.chunks,
self.dtype,
self.shape)
def compute(self):
return self.catalog.compute(self)
def __str__(self):
r = da.Array.__str__(self)
if len(self) > 0:
r = r + " first: %s" % str(self[0].compute())
if len(self) > 1:
r = r + " last: %s" % str(self[-1].compute())
return r
[docs]def column(name=None):
"""
Decorator that defines a function as a column in a CatalogSource
"""
def decorator(getter):
getter.column_name = name
return getter
if hasattr(name, '__call__'):
# a single callable is provided
getter = name
name = getter.__name__
return decorator(getter)
else:
return decorator
[docs]def find_columns(cls):
"""
Find all hard-coded column names associated with the input class
Returns
-------
hardcolumns : set
a set of the names of all hard-coded columns for the
input class ``cls``
"""
hardcolumns = []
# search through the class dict for columns
for key, value in cls.__dict__.items():
if hasattr(value, 'column_name'):
hardcolumns.append(value.column_name)
# recursively search the base classes, too
for base in cls.__bases__:
hardcolumns += find_columns(base)
return list(sorted(set(hardcolumns)))
[docs]def find_column(cls, name):
"""
Find a specific column ``name`` of an input class, or raise
an exception if it does not exist
Returns
-------
column : callable
the callable that returns the column data
"""
# first search through class
for key, value in cls.__dict__.items():
if not hasattr(value, 'column_name'): continue
if value.column_name == name: return value
for base in cls.__bases__:
try: return find_column(base, name)
except: pass
args = (name, str(cls))
raise AttributeError("unable to find column '%s' for '%s'" % args)
[docs]class CatalogSourceBase(object):
"""
An abstract base class that implements most of the functionality in
:class:`CatalogSource`.
The main difference between this class and :class:`CatalogSource` is that
this base class does not assume the object has a :attr:`size` attribute.
.. note::
See the docstring for :class:`CatalogSource`. Most often, users should
implement custom sources as subclasses of :class:`CatalogSource`.
Parameters
----------
comm :
the MPI communicator to use for this object
use_cache : bool, optional
whether to cache intermediate dask task results; default is ``False``
"""
logger = logging.getLogger('CatalogSourceBase')
[docs] @staticmethod
def make_column(array):
"""
Utility function to convert an array-like object to a
:class:`dask.array.Array`.
.. note::
The dask array chunk size is controlled via the ``dask_chunk_size``
global option. See :class:`~nbodykit.set_options`.
Parameters
----------
array : array_like
an array-like object; can be a dask array, numpy array,
ColumnAccessor, or other non-scalar array-like object
Returns
-------
:class:`dask.array.Array` :
a dask array initialized from ``array``
"""
if isinstance(array, da.Array):
return array
elif isinstance(array, ColumnAccessor):
# important to get the accessor as a dask array to avoid circular
# references
return array.as_daskarray()
else:
return da.from_array(array, chunks=_global_options['dask_chunk_size'])
def __new__(cls, *args, **kwargs):
obj = object.__new__(cls)
# ensure self.comm is set, though usually already set by the child.
obj.comm = kwargs.get('comm', CurrentMPIComm.get())
# initialize a cache
obj.use_cache = kwargs.get('use_cache', False)
# user-provided overrides for columns
obj._overrides = {}
# stores memory owner
obj.base = None
return obj
[docs] def __finalize__(self, other):
"""
Finalize the creation of a CatalogSource object by copying over
any additional attributes from a second CatalogSource.
The idea here is to only copy over attributes that are similar
to meta-data, so we do not copy some of the core attributes of the
:class:`CatalogSource` object.
Parameters
----------
other :
the second object to copy over attributes from; it needs to be
a subclass of CatalogSourcBase for attributes to be copied
Returns
-------
CatalogSource :
return ``self``, with the added attributes
"""
if isinstance(other, CatalogSourceBase):
d = other.__dict__.copy()
nocopy = ['base', '_overrides', 'comm', '_cache', '_use_cache', '_size', '_csize']
for key in d:
if key not in nocopy:
self.__dict__[key] = d[key]
return self
def __iter__(self):
return iter(self.columns)
def __contains__(self, col):
return col in self.columns
[docs] def __slice__(self, index):
"""
Select a subset of ``self`` according to a boolean index array.
Returns a new object of the same type as ``selff`` holding only the
data that satisfies the slice index.
Parameters
----------
index : array_like
either a dask or numpy boolean array; this determines which
rows are included in the returned object
"""
# compute the index slice if needed and get the size
if isinstance(index, da.Array):
index = self.compute(index)
elif isinstance(index, list):
index = numpy.array(index)
if getattr(self, 'size', NotImplemented) is NotImplemented:
raise ValueError("cannot make catalog subset; self catalog doest not have a size")
# verify the index is a boolean array
if len(index) != self.size:
raise ValueError("slice index has length %d; should be %d" %(len(index), self.size))
if getattr(index, 'dtype', None) != '?':
raise ValueError("index used to slice CatalogSource must be boolean and array-like")
# new size is just number of True entries
size = index.sum()
# if collective size is unchanged, just return self
if self.comm.allreduce(size) == self.csize:
return self.base if self.base is not None else self
# initialize subset Source of right size
subset_data = {col:self[col][index] for col in self}
cls = self.__class__ if self.base is None else self.base.__class__
toret = cls._from_columns(size, self.comm, use_cache=self.use_cache, **subset_data)
# attach the needed attributes
toret.__finalize__(self)
return toret
[docs] def __getitem__(self, sel):
"""
The following types of indexing are supported:
#. strings specifying a column in the CatalogSource; returns
a dask array holding the column data
#. boolean arrays specifying a slice of the CatalogSource;
returns a CatalogSource holding only the revelant slice
#. slice object specifying which particles to select
#. list of strings specifying column names; returns a CatalogSource
holding only the selected columns
Notes
-----
- Slicing with a boolean array is a **collective** operation
- If the :attr:`base` attribute is set, columns will be returned
from :attr:`base` instead of from ``self``.
"""
# handle boolean array slices
if not isinstance(sel, string_types):
# size must be implemented
if getattr(self, 'size', NotImplemented) is NotImplemented:
raise ValueError("cannot perform selection due to NotImplemented size")
# convert slices to boolean arrays
if isinstance(sel, (slice, list)):
# select a subset of list of string column names
if isinstance(sel, list) and all(isinstance(ss, string_types) for ss in sel):
invalid = set(sel) - set(self.columns)
if len(invalid):
msg = "cannot select subset of columns from "
msg += "CatalogSource due to invalid columns: %s" % str(invalid)
raise KeyError(msg)
# return a CatalogSource only holding the selected columns
subset_data = {col:self[col] for col in sel}
toret = CatalogSource._from_columns(self.size, self.comm, use_cache=self.use_cache, **subset_data)
toret.attrs.update(self.attrs)
return toret
# list must be all integers
if isinstance(sel, list) and not numpy.array(sel).dtype == numpy.integer:
raise KeyError("array like indexing via a list should be a list of integers")
# convert into slice into boolean array
index = numpy.zeros(self.size, dtype='?')
index[sel] = True; sel = index
# do the slicing
if not numpy.isscalar(sel):
return self.__slice__(sel)
else:
raise KeyError("strings and boolean arrays are the only supported indexing methods")
# owner of the memory (either self or base)
memowner = self if self.base is None else self.base
# get the right column
if sel in memowner._overrides:
r = memowner._overrides[sel]
elif sel in memowner.hardcolumns:
r = memowner.get_hardcolumn(sel)
else:
raise KeyError("column `%s` is not defined in this source; " %sel + \
"try adding column via `source[column] = data`")
# evaluate callables if we need to
if callable(r): r = r()
# return a ColumnAccessor for pretty prints
return ColumnAccessor(memowner, r)
[docs] def __setitem__(self, col, value):
"""
Add new columns to the CatalogSource, overriding any existing columns
with the name ``col``.
.. note::
If the :attr:`base` attribute is set, columns will be added to
:attr:`base` instead of to ``self``.
"""
if self.base is not None: return self.base.__setitem__(col, value)
self._overrides[col] = self.make_column(value)
[docs] def __delitem__(self, col):
"""
Delete a column; cannot delete a "hard-coded" column.
.. note::
If the :attr:`base` attribute is set, columns will be deleted
from :attr:`base` instead of from ``self``.
"""
if self.base is not None: return self.base.__delitem__(col)
if col not in self.columns:
raise ValueError("no such column, cannot delete it")
if col in self.hardcolumns:
raise ValueError("cannot delete a hard-coded column")
if col in self._overrides:
del self._overrides[col]
return
raise ValueError("unable to delete column '%s' from CatalogSource" %col)
@property
def use_cache(self):
"""
If set to ``True``, use the built-in caching features of ``dask``
to cache data in memory.
"""
return self._use_cache
@use_cache.setter
def use_cache(self, val):
"""
Initialize a Cache object of size set by the ``dask_cache_size``
global configuration option, which is 1 GB by default.
See :class:`~nbodykit.set_options` to control the value of
``dask_cache_size``.
"""
if val:
try:
from dask.cache import Cache
if not hasattr(self, '_cache'):
self._cache = Cache(_global_options['dask_cache_size'])
except ImportError:
warnings.warn("caching of CatalogSource requires ``cachey`` module; turning cache off")
else:
if hasattr(self, '_cache'): delattr(self, '_cache')
self._use_cache = val
@property
def attrs(self):
"""
A dictionary storing relevant meta-data about the CatalogSource.
"""
try:
return self._attrs
except AttributeError:
self._attrs = {}
return self._attrs
@property
def hardcolumns(self):
"""
A list of the hard-coded columns in the CatalogSource.
These columns are usually member functions marked by ``@column``
decorator. Subclasses may override this method and use
:func:`get_hardcolumn` to bypass the decorator logic.
.. note::
If the :attr:`base` attribute is set, the value of
``base.hardcolumns`` will be returned.
"""
if self.base is not None: return self.base.hardcolumns
try:
self._hardcolumns
except AttributeError:
self._hardcolumns = find_columns(self.__class__)
return self._hardcolumns
@property
def columns(self):
"""
All columns in the CatalogSource, including those hard-coded into
the class's defintion and override columns provided by the user.
.. note::
If the :attr:`base` attribute is set, the value of
``base.columns`` will be returned.
"""
if self.base is not None: return self.base.columns
hcolumns = list(self.hardcolumns)
overrides = list(self._overrides)
return sorted(set(hcolumns + overrides))
[docs] def copy(self):
"""
Return a shallow copy of the object, where each column is a reference
of the corresponding column in ``self``.
.. note::
No copy of data is made.
.. note::
This is different from view in that the attributes dictionary
of the copy no longer related to ``self``.
Returns
-------
CatalogSource :
a new CatalogSource that holds all of the data columns of ``self``
"""
# a new empty object with proper size
toret = CatalogSourceBase.__new__(self.__class__, self.comm, self.use_cache)
toret._size = self.size
toret._csize = self.csize
# attach attributes from self and return
toret = toret.__finalize__(self)
# finally, add the data columns from self
for col in self.columns:
toret[col] = self[col]
# copy the attributes too, so they become decoupled
# this is different from view.
toret._attrs = self._attrs.copy()
return toret
[docs] def get_hardcolumn(self, col):
"""
Construct and return a hard-coded column.
These are usually produced by calling member functions marked by the
``@column`` decorator.
Subclasses may override this method and the hardcolumns attribute to
bypass the decorator logic.
.. note::
If the :attr:`base` attribute is set, ``get_hardcolumn()``
will called using :attr:`base` instead of ``self``.
"""
if self.base is not None: return self.base.get_hardcolumn(col)
return find_column(self.__class__, col)(self)
[docs] def compute(self, *args, **kwargs):
"""
Our version of :func:`dask.compute` that computes
multiple delayed dask collections at once.
This should be called on the return value of :func:`read`
to converts any dask arrays to numpy arrays.
If :attr:`use_cache` is ``True``, this internally caches data, using
dask's built-in cache features.
.. note::
If the :attr:`base` attribute is set, ``compute()``
will called using :attr:`base` instead of ``self``.
Parameters
-----------
args : object
Any number of objects. If the object is a dask
collection, it's computed and the result is returned.
Otherwise it's passed through unchanged.
Notes
-----
The dask default optimizer induces too many (unnecesarry)
IO calls -- we turn this off feature off by default. Eventually we
want our own optimizer probably.
"""
if self.base is not None: return self.base.compute(*args, **kwargs)
import dask
# do not optimize graph (can lead to slower optimizations)
kwargs.setdefault('optimize_graph', False)
# use a cache?
if self.use_cache and hasattr(self, '_cache'):
with self._cache:
toret = dask.compute(*args, **kwargs)
else:
toret = dask.compute(*args, **kwargs)
# do not return tuples of length one
if len(toret) == 1: toret = toret[0]
return toret
[docs] def save(self, output, columns, datasets=None, header='Header'):
"""
Save the CatalogSource to a :class:`bigfile.BigFile`.
Only the selected columns are saved and :attr:`attrs` are saved in
``header``. The attrs of columns are stored in the datasets.
Parameters
----------
output : str
the name of the file to write to
columns : list of str
the names of the columns to save in the file
datasets : list of str, optional
names for the data set where each column is stored; defaults to
the name of the column
header : str, optional
the name of the data set holding the header information, where
:attr:`attrs` is stored
"""
import bigfile
import json
from nbodykit.utils import JSONEncoder
if datasets is None: datasets = columns
if len(datasets) != len(columns):
raise ValueError("`datasets` must have the same length as `columns`")
with bigfile.BigFileMPI(comm=self.comm, filename=output, create=True) as ff:
try:
bb = ff.open(header)
except:
bb = ff.create(header)
with bb :
for key in self.attrs:
try:
bb.attrs[key] = self.attrs[key]
except ValueError:
try:
json_str = 'json://'+json.dumps(self.attrs[key], cls=JSONEncoder)
bb.attrs[key] = json_str
except:
raise ValueError("cannot save '%s' key in attrs dictionary" % key)
for column, dataset in zip(columns, datasets):
c = self[column]
# save column attrs too
with ff.create_from_array(dataset, c) as bb:
if hasattr(c, 'attrs'):
for key in c.attrs:
bb.attrs[key] = c.attrs[key]
[docs] def read(self, columns):
"""
Return the requested columns as dask arrays.
Parameters
----------
columns : list of str
the names of the requested columns
Returns
-------
list of :class:`dask.array.Array` :
the list of column data, in the form of dask arrays
"""
missing = set(columns) - set(self.columns)
if len(missing) > 0:
msg = "source does not contain columns: %s; " %str(missing)
msg += "try adding columns via `source[column] = data`"
raise ValueError(msg)
return [self[col] for col in columns]
[docs] def view(self, type=None):
"""
Return a "view" of the CatalogSource object, with the returned
type set by ``type``.
This initializes a new empty class of type ``type`` and attaches
attributes to it via the :func:`__finalize__` mechanism.
Parameters
----------
type : Python type
the desired class type of the returned object.
"""
# an empty class
type = self.__class__ if type is None else type
obj = CatalogSourceBase.__new__(type, self.comm, self.use_cache)
# propagate the size attributes
obj._size = self.size
obj._csize = self.csize
# the new object's base points to self
obj.base = self
# attach the necessary attributes from self
return obj.__finalize__(self)
[docs] def to_mesh(self, Nmesh=None, BoxSize=None, dtype='f4', interlaced=False,
compensated=False, window='cic', weight='Weight',
value='Value', selection='Selection', position='Position'):
"""
Convert the CatalogSource to a MeshSource, using the specified
parameters.
Parameters
----------
Nmesh : int, optional
the number of cells per side on the mesh; must be provided if
not stored in :attr:`attrs`
BoxSize : scalar, 3-vector, optional
the size of the box; must be provided if
not stored in :attr:`attrs`
dtype : string, optional
the data type of the mesh array
interlaced : bool, optional
use the interlacing technique of Sefusatti et al. 2015 to reduce
the effects of aliasing on Fourier space quantities computed
from the mesh
compensated : bool, optional
whether to correct for the window introduced by the grid
interpolation scheme
window : str, optional
the string specifying which window interpolation scheme to use;
see `pmesh.window.methods`
weight : str, optional
the name of the column specifying the weight for each particle
value: str, optional
the name of the column specifying the field value for each particle
selection : str, optional
the name of the column that specifies which (if any) slice
of the CatalogSource to take
position : str, optional
the name of the column that specifies the position data of the
objects in the catalog
Returns
-------
mesh : CatalogMesh
a mesh object that provides an interface for gridding particle
data onto a specified mesh
"""
from nbodykit.base.catalogmesh import CatalogMesh
from pmesh.window import methods
# make sure all of the columns exist
for col in [weight, selection]:
if col not in self:
raise ValueError("column '%s' missing; cannot create mesh" %col)
if window not in methods:
raise ValueError("valid window methods: %s" %str(methods))
if BoxSize is None:
try:
BoxSize = self.attrs['BoxSize']
except KeyError:
raise ValueError(("cannot convert particle source to a mesh; "
"'BoxSize' keyword is not supplied and the CatalogSource "
"does not define one in 'attrs'."))
if Nmesh is None:
try:
Nmesh = self.attrs['Nmesh']
except KeyError:
raise ValueError(("cannot convert particle source to a mesh; "
"'Nmesh' keyword is not supplied and the CatalogSource "
"does not define one in 'attrs'."))
return CatalogMesh(self, Nmesh=Nmesh,
BoxSize=BoxSize,
dtype=dtype,
weight=weight,
selection=selection,
value=value,
position=position,
interlaced=interlaced,
compensated=compensated,
window=window)
[docs]class CatalogSource(CatalogSourceBase):
"""
An abstract base class representing a catalog of discrete particles.
This objects behaves like a structured numpy array -- it must have a
well-defined size when initialized. The ``size`` here represents the
number of particles in the source on the local rank.
The information about each particle is stored as a series of
columns in the format of dask arrays. These columns can be accessed
in a dict-like fashion.
All subclasses of this class contain the following default columns:
#. ``Weight``
#. ``Value``
#. ``Selection``
For a full description of these default columns, see
:ref:`the documentation <catalog-source-default-columns>`.
.. important::
Subclasses of this class must set the ``_size`` attribute.
Parameters
----------
comm :
the MPI communicator to use for this object
use_cache : bool, optional
whether to cache intermediate dask task results; default is ``False``
"""
logger = logging.getLogger('CatalogSource')
@classmethod
def _from_columns(cls, size, comm, use_cache=False, **columns):
"""
An internal constructor to create a CatalogSource (or subclass)
from a set of columns.
This method is used internally by nbodykit to create
views of catalogs based on existing catalogs.
Use :class:`~nbodykit.source.catalog.array.ArrayCatalog` to adapt
a structured array or dictionary to a CatalogSource.
.. note::
The returned object is of type ``cls`` and the only attributes
set for the returned object are :attr:`size` and :attr:`csize`.
"""
# the new empty object to return
obj = CatalogSourceBase.__new__(cls, comm, use_cache)
# compute the sizes
obj._size = size
obj._csize = obj.comm.allreduce(obj._size)
# add the columns in
for name in columns:
obj[name] = columns[name]
return obj
def __init__(self, *args, **kwargs):
# if size is implemented, compute the csize
if self.size is not NotImplemented:
self._csize = self.comm.allreduce(self.size)
else:
self._csize = NotImplemented
def __repr__(self):
size = "%d" %self.size if self.size is not NotImplemented else "NotImplemented"
return "%s(size=%s)" %(self.__class__.__name__, size)
[docs] def __len__(self):
"""
The local size of the CatalogSource on a given rank.
"""
return self.size
[docs] def __setitem__(self, col, value):
"""
Add columns to the CatalogSource, overriding any existing columns
with the name ``col``.
"""
# handle scalar values
if numpy.isscalar(value):
assert self.size is not NotImplemented, "size is not implemented! cannot set scalar array"
value = ConstantArray(value, self.size, chunks=_global_options['dask_chunk_size'])
# check the correct size, if we know the size
if self.size is not NotImplemented:
args = (col, self.size, len(value))
msg = "error setting '%s' column, data must be array of size %d, not %d" % args
assert len(value) == self.size, msg
# call the base __setitem__
CatalogSourceBase.__setitem__(self, col, value)
@property
def size(self):
"""
The number of objects in the CatalogSource on the local rank.
If the :attr:`base` attribute is set, the ``base.size`` attribute
will be returned.
.. important::
This property must be defined for all subclasses.
"""
if self.base is not None: return self.base.size
if not hasattr(self, '_size'):
return NotImplemented
return self._size
@size.setter
def size(self, value):
raise RuntimeError(("Property size is read-only. Internally, ``_size`` "
"can be set during initialization."))
@property
def csize(self):
"""
The total, collective size of the CatalogSource, i.e., summed across
all ranks.
It is the sum of :attr:`size` across all available ranks.
If the :attr:`base` attribute is set, the ``base.csize`` attribute
will be returned.
"""
if self.base is not None: return self.base.csize
if not hasattr(self, '_csize'):
return NotImplemented
return self._csize
[docs] def gslice(self, start, stop, end=1, redistribute=True):
"""
Execute a global slice of a CatalogSource.
.. note::
After the global slice is performed, the data is scattered
evenly across all ranks.
Parameters
----------
start : int
the start index of the global slice
stop : int
the stop index of the global slice
step : int, optional
the default step size of the global size
redistribute : bool, optional
if ``True``, evenly re-distribute the sliced data across all
ranks, otherwise just return any local data part of the global
slice
"""
from nbodykit.utils import ScatterArray, GatherArray
# determine the boolean index corresponding to the slice
if self.comm.rank == 0:
index = numpy.zeros(self.csize, dtype=bool)
index[slice(start, stop, end)] = True
else:
index = None
index = self.comm.bcast(index)
# scatter the index back to all ranks
counts = self.comm.allgather(self.size)
index = ScatterArray(index, self.comm, root=0, counts=counts)
# perform the needed local slice
subset = self[index]
# if we don't want to redistribute evenly, just return the slice
if not redistribute:
return subset
# re-distribute each column from the sliced data
# NOTE: currently Gather/ScatterArray requires numpy arrays, but
# in principle we could pass dask arrays around between ranks and
# avoid compute() calls
data = self.compute(*[subset[col] for col in subset])
# gather/scatter each column
evendata = {}
for i, col in enumerate(subset):
alldata = GatherArray(data[i], self.comm, root=0)
evendata[col] = ScatterArray(alldata, self.comm, root=0)
# return a new CatalogSource holding the evenly distributed data
size = len(evendata[col])
toret = self.__class__._from_columns(size, self.comm, **evendata)
return toret.__finalize__(self)
[docs] def sort(self, keys, reverse=False, usecols=None):
"""
Return a CatalogSource, sorted globally across all MPI ranks
in ascending order by the input keys.
Sort columns must be floating or integer type.
.. note::
After the sort operation, the data is scattered evenly across
all ranks.
Parameters
----------
keys : list, tuple
the names of columns to sort by. If multiple columns are provided,
the data is sorted consecutively in the order provided
reverse : bool, optional
if ``True``, perform descending sort operations
usecols : list, optional
the name of the columns to include in the returned CatalogSource
"""
# single string passed as input
if isinstance(keys, string_types):
keys = [keys]
# no duplicated keys
if len(set(keys)) != len(keys):
raise ValueError("duplicated sort keys")
# all keys must be valid
bad = set(keys) - set(self.columns)
if len(bad):
raise ValueError("invalid sort keys: %s" %str(bad))
# check usecols input
if usecols is not None:
if isinstance(usecols, string_types):
usecols = [usecols]
if not isinstance(usecols, (list, tuple)):
raise ValueError("usecols should be a list or tuple of column names")
bad = set(usecols) - set(self.columns)
if len(bad):
raise ValueError("invalid column names in usecols: %s" %str(bad))
# sort the data
data = _sort_data(self.comm, self, list(keys), reverse=reverse, usecols=usecols)
# get a dictionary of data
cols = {}
for col in data.dtype.names:
cols[col] = data[col]
# make the new CatalogSource from the data dict
size = len(data)
# NOTE: if we are slicing by columns too, we must return a bare CatalogSource
# we cannot guarantee the consistency of the hard column definitions otherwise
if usecols is None:
toret = self.__class__._from_columns(size, self.comm, **cols)
return toret.__finalize__(self)
else:
toret = CatalogSource._from_columns(size, self.comm, **cols)
toret.attrs.update(self.attrs)
return toret
[docs] @column
def Selection(self):
"""
A boolean column that selects a subset slice of the CatalogSource.
By default, this column is set to ``True`` for all particles.
"""
return ConstantArray(True, self.size, chunks=_global_options['dask_chunk_size'])
[docs] @column
def Weight(self):
"""
The column giving the weight to use for each particle on the mesh.
The mesh field is a weighted average of ``Value``, with the weights
given by ``Weight``.
By default, this array is set to unity for all particles.
"""
return ConstantArray(1.0, self.size, chunks=_global_options['dask_chunk_size'])
@property
def Index(self):
"""
The attribute giving the global index rank of each particle in the
list. It is an integer from 0 to ``self.csize``.
Note that slicing changes this index value.
"""
offset = sum(self.comm.allgather(self.size)[:self.comm.rank])
# do not use u8, because many numpy casting rules case u8 to f8 automatically.
# it is ridiculous.
return da.arange(offset, offset + self.size, dtype='i8',
chunks=_global_options['dask_chunk_size'])
[docs] @column
def Value(self):
"""
When interpolating a CatalogSource on to a mesh, the value of this
array is used as the Value that each particle contributes to a given
mesh cell.
The mesh field is a weighted average of ``Value``, with the weights
given by ``Weight``.
By default, this array is set to unity for all particles.
"""
return ConstantArray(1.0, self.size, chunks=_global_options['dask_chunk_size'])
def _sort_data(comm, cat, rankby, reverse=False, usecols=None):
"""
Sort the input data by the specified columns
Parameters
----------
comm :
the mpi communicator
cat : CatalogSource
the catalog holding the data to sort
rankby : list of str
list of columns to sort by
reverse : bool, optional
if ``True``, sort in descending order
usecols : list, optional
only sort these data columns
"""
import mpsort
# determine which columns we need
if usecols is None:
usecols = cat.columns
# remove duplicates from usecols
usecols = list(set(usecols))
# the columns we need in the sort steps
columns = list(set(rankby)|set(usecols))
# make the data to sort
dtype = [('_sortkey', 'i8')]
for col in cat:
if col in columns:
dt = (cat[col].dtype.char,)
dt += cat[col].shape[1:]
if len(dt) == 1: dt = dt[0]
dtype.append((col, dt))
dtype = numpy.dtype(dtype)
data = numpy.empty(cat.size, dtype=dtype)
for col in columns:
data[col] = cat[col]
# sort the particles by the specified columns and store the
# corrected sorted index
for col in reversed(rankby):
dt = data.dtype[col]
rankby_name = col
# make an integer key for floating columns
if issubclass(dt.type, numpy.floating):
data['_sortkey'] = numpy.fromstring(data[col].tobytes(), dtype='i8')
if reverse:
data['_sortkey'] *= -1
rankby_name = '_sortkey'
elif not issubclass(dt.type, numpy.integer):
args = (col, str(dt))
raise ValueError("cannot sort by column '%s' with dtype '%s'; must be integer or floating type" %args)
# do the parallel sort
mpsort.sort(data, orderby=rankby_name, comm=comm)
return data[usecols]