Source code for nbodykit.base.catalogmesh

from nbodykit.base.mesh import MeshSource
from nbodykit.base.catalog import CatalogSource, CatalogSourceBase
import numpy
import logging
import warnings

# for converting from particle to mesh
from pmesh import window
from pmesh.pm import RealField, ComplexField

[docs]class CatalogMesh(CatalogSource, MeshSource): """ A view of a CatalogSource object which knows how to create a MeshSource object from itself. The original CatalogSource object is stored as the :attr:`base` attribute. Parameters ---------- source : CatalogSource the input catalog that we are viewing as a mesh BoxSize : the size of the box Nmesh : int, 3-vector the number of cells per mesh side dtype : str the data type of the values stored on mesh weight : str column in ``source`` that specifies the weight value for each particle in the ``source`` to use when gridding value : str column in ``source`` that specifies the field value for each particle; the mesh stores a weighted average of this column selection : str column in ``source`` that selects the subset of particles to grid to the mesh position : str, optional column in ``source`` specifying the position coordinates; default is ``Position`` interlaced : bool, optional use the interlacing technique of Sefusatti et al. 2015 to reduce the effects of aliasing on Fourier space quantities computed from the mesh compensated : bool, optional whether to correct for the window introduced by the grid interpolation scheme window : str, optional the string specifying which window interpolation scheme to use; see ``pmesh.window.methods`` """ logger = logging.getLogger('CatalogMesh') def __repr__(self): if isinstance(self.base, CatalogMesh): return repr(self.base) else: return "(%s as CatalogMesh)" % repr(self.base) def __new__(cls, source, BoxSize, Nmesh, dtype, weight, value, selection, position='Position', interlaced=False, compensated=False, window='cic', **kwargs): # source here must be a CatalogSource assert isinstance(source, CatalogSourceBase) # new, empty CatalogSource obj = CatalogSourceBase.__new__(cls, source.comm, source.use_cache) # copy over size from the CatalogSource obj._size = source.size obj._csize = source.csize # copy over the necessary meta-data to attrs obj.attrs['BoxSize'] = BoxSize obj.attrs['Nmesh'] = Nmesh obj.attrs['interlaced'] = interlaced obj.attrs['compensated'] = compensated obj.attrs['window'] = window # copy meta-data from source too obj.attrs.update(source.attrs) # store others as straight attributes obj.dtype = dtype obj.weight = weight obj.value = value obj.selection = selection obj.position = position # add in the Mesh Source attributes MeshSource.__init__(obj, obj.comm, Nmesh, BoxSize, dtype) # finally set the base as the input CatalogSource # NOTE: set this AFTER MeshSource.__init__() obj.base = source return obj
[docs] def gslice(self, start, stop, end=1, redistribute=True): """ Execute a global slice of a CatalogMesh. .. note:: After the global slice is performed, the data is scattered evenly across all ranks. As CatalogMesh objects are views of a CatalogSource, this simply globally slices the underlying CatalogSource. Parameters ---------- start : int the start index of the global slice stop : int the stop index of the global slice step : int, optional the default step size of the global size redistribute : bool, optional if ``True``, evenly re-distribute the sliced data across all ranks, otherwise just return any local data part of the global slice """ # sort the base object newbase = self.base.gslice(start, stop, end=end, redistribute=redistribute) # view this base class as a CatalogMesh (with default CatalogMesh parameters) toret = newbase.view(self.__class__) # attach the meta-data from self to returned sliced CatalogMesh return toret.__finalize__(self)
[docs] def sort(self, keys, reverse=False, usecols=None): """ Sort the CatalogMesh object globally across all MPI ranks in ascending order by the input keys. Sort columns must be floating or integer type. As CatalogMesh objects are views of a CatalogSource, this simply sorts the underlying CatalogSource. Parameters ---------- *keys : the names of columns to sort by. If multiple columns are provided, the data is sorted consecutively in the order provided reverse : bool, optional if ``True``, perform descending sort operations usecols : list, optional the name of the columns to include in the returned CatalogSource """ # sort the base object newbase = self.base.sort(keys, reverse=reverse, usecols=usecols) # view this base class as a CatalogMesh (with default CatalogMesh parameters) toret = newbase.view(self.__class__) # attach the meta-data from self to returned sliced CatalogMesh return toret.__finalize__(self)
[docs] def __slice__(self, index): """ Return a slice of a CatalogMesh object. This slices the CatalogSource object stored as the :attr:`base` attribute, and then views that sliced object as a CatalogMesh. Parameters ---------- index : array_like either a dask or numpy boolean array; this determines which rows are included in the returned object Returns ------- subset the particle source with the same meta-data as ``self``, and with the sliced data arrays """ # this slice of the CatalogSource will be the base of the mesh base = super(CatalogMesh, self).__slice__(index) # view this base class as a CatalogMesh (with default CatalogMesh parameters) toret = base.view(self.__class__) # attach the meta-data from self to returned sliced CatalogMesh return toret.__finalize__(self)
[docs] def copy(self): """ Return a shallow copy of ``self``. .. note:: No copy of data is made. Returns ------- CatalogMesh : a new CatalogMesh that holds all of the data columns of ``self`` """ # copy the base and view it as a CatalogMesh toret = self.base.copy().view(self.__class__) # attach the meta-data from self to returned sliced CatalogMesh return toret.__finalize__(self)
[docs] def __finalize__(self, other): """ Finalize the creation of a CatalogMesh object by copying over attributes from a second CatalogMesh. This also copies over the relevant MeshSource attributes via a call to :func:`MeshSource.__finalize__`. Parameters ---------- obj : CatalogMesh the second CatalogMesh to copy over attributes from """ if isinstance(other, CatalogSourceBase): self = CatalogSourceBase.__finalize__(self, other) if isinstance(other, MeshSource): self = MeshSource.__finalize__(self, other) return self
@property def interlaced(self): """ Whether to use interlacing when interpolating the density field. See :ref:`the documentation <interlacing>` for further details. See also: Section 3.1 of `Sefusatti et al. 2015 <https://arxiv.org/abs/1512.07295>`_ """ return self.attrs['interlaced'] @interlaced.setter def interlaced(self, interlaced): self.attrs['interlaced'] = interlaced @property def window(self): """ String specifying the name of the interpolation kernel when gridding the density field. See :ref:`the documentation <window-kernel>` for further details. .. note:: Valid values must be in :attr:`pmesh.window.methods` """ return self.attrs['window'] @window.setter def window(self, value): assert value in window.methods self.attrs['window'] = value.lower() # lower to compare with compensation @property def compensated(self): """ Boolean flag to indicate whether to correct for the windowing kernel introduced when interpolating the discrete particles to a continuous field. See :ref:`the documentation <compensation>` for further details. """ return self.attrs['compensated'] @compensated.setter def compensated(self, value): self.attrs['compensated'] = value
[docs] def to_real_field(self, out=None, normalize=True): """ Paint the density field, by interpolating the position column on to the mesh. This computes the following meta-data attributes in the process of painting, returned in the :attr:`attrs` attributes of the returned RealField object: - N : int the (unweighted) total number of objects painted to the mesh - W : float the weighted number of total objects, equal to the collective sum of the 'weight' column - shotnoise : float the Poisson shot noise, equal to the volume divided by ``N`` - num_per_cell : float the mean number of weighted objects per cell .. note:: The density field on the mesh is normalized as :math:`1+\delta`, such that the collective mean of the field is unity. See the :ref:`documentation <painting-mesh>` on painting for more details on painting catalogs to a mesh. Returns ------- real : :class:`pmesh.pm.RealField` the painted real field; this has a ``attrs`` dict storing meta-data """ # check for 'Position' column if self.position not in self: msg = "in order to paint a CatalogSource to a RealField, add a " msg += "column named '%s', representing the particle positions" %self.position raise ValueError(msg) pm = self.pm Nlocal = 0 # (unweighted) number of particles read on local rank Wlocal = 0 # (weighted) number of particles read on local rank # the paint brush window paintbrush = window.methods[self.window] # initialize the RealField to return if out is not None: assert isinstance(out, RealField), "output of to_real_field must be a RealField" numpy.testing.assert_array_equal(out.pm.Nmesh, pm.Nmesh) toret = out else: toret = RealField(pm) toret[:] = 0 # for interlacing, we need two empty meshes if out was provided # since out may have non-zero elements, messing up our interlacing sum if self.interlaced: real1 = RealField(pm) real1[:] = 0 # the second, shifted mesh (always needed) real2 = RealField(pm) real2[:] = 0 # read the necessary data (as dask arrays) columns = [self.position, self.weight, self.value, self.selection] Position, Weight, Value, Selection = self.read(columns) # compute first, so we avoid repeated computes sel = self.base.compute(Selection) Position = Position[sel] Weight = Weight[sel] Value = Value[sel] # compute position, weight, value = self.base.compute(Position, Weight, Value) # ensure the slices are synced, since decomposition is collective N = max(pm.comm.allgather(len(Position))) # paint data in chunks on each rank chunksize = 1024 ** 2 for i in range(0, N, chunksize): s = slice(i, i + chunksize) if len(Position) != 0: # be sure to use the source to compute position, weight, value = \ self.base.compute(Position[s], Weight[s], Value[s]) else: # workaround a potential dask issue on empty dask arrays position = numpy.empty((0, 3), dtype=Position.dtype) weight = None value = None selection = None if weight is None: weight = numpy.ones(len(position)) if value is None: value = numpy.ones(len(position)) # track total (selected) number and sum of weights Nlocal += len(position) Wlocal += weight.sum() # no interlacing if not self.interlaced: lay = pm.decompose(position, smoothing=0.5 * paintbrush.support) p = lay.exchange(position) w = lay.exchange(weight) v = lay.exchange(value) pm.paint(p, mass=w * v, resampler=paintbrush, hold=True, out=toret) # interlacing: use 2 meshes separated by 1/2 cell size else: lay = pm.decompose(position, smoothing=1.0 * paintbrush.support) p = lay.exchange(position) w = lay.exchange(weight) v = lay.exchange(value) H = pm.BoxSize / pm.Nmesh # in mesh units shifted = pm.affine.shift(0.5) # paint to two shifted meshes pm.paint(p, mass=w * v, resampler=paintbrush, hold=True, out=real1) pm.paint(p, mass=w * v, resampler=paintbrush, transform=shifted, hold=True, out=real2) # now the loop over particles is done if not self.interlaced: # nothing to do, toret is already filled. pass else: # compose the two interlaced fields into the final result. c1 = real1.r2c() c2 = real2.r2c() # and then combine for k, s1, s2 in zip(c1.slabs.x, c1.slabs, c2.slabs): kH = sum(k[i] * H[i] for i in range(3)) s1[...] = s1[...] * 0.5 + s2[...] * 0.5 * numpy.exp(0.5 * 1j * kH) # FFT back to real-space # NOTE: cannot use "toret" here in case user supplied "out" c1.c2r(real1) # need to add to the returned mesh if user supplied "out" toret[:] += real1[:] # unweighted number of objects N = pm.comm.allreduce(Nlocal) # weighted number of objects W = pm.comm.allreduce(Wlocal) # weighted number density (objs/cell) nbar = 1. * W / numpy.prod(pm.Nmesh) # make sure we painted something! if N == 0: warnings.warn(("trying to paint particle source to mesh, " "but no particles were found!"), RuntimeWarning ) # shot noise is volume / un-weighted number shotnoise = numpy.prod(pm.BoxSize) / N # save some meta-data toret.attrs = {} toret.attrs['shotnoise'] = shotnoise toret.attrs['N'] = N toret.attrs['W'] = W toret.attrs['num_per_cell'] = nbar csum = toret.csum() if pm.comm.rank == 0: self.logger.info("painted %d out of %d objects to mesh" %(N,self.base.csize)) self.logger.info("mean particles per cell is %g", nbar) self.logger.info("sum is %g ", csum) self.logger.info("normalized the convention to 1 + delta") if normalize: if nbar > 0: toret[...] /= nbar else: toret[...] = 1 return toret
@property def actions(self): """ The actions to apply to the interpolated density field, optionally included the compensation correction. """ actions = MeshSource.actions.fget(self) if self.compensated: actions = self._get_compensation() + actions return actions def _get_compensation(self): """ Return the compensation function, which corrects for the windowing kernel. The compensation function is computed as: - if ``interlaced = True``: - :func:`CompensateCIC` if using CIC window - :func:`CompensateTSC` if using TSC window - if ``interlaced = False``: - :func:`CompensateCICAliasing` if using CIC window - :func:`CompensateTSCAliasing` if using TSC window """ if self.interlaced: d = {'cic' : self.CompensateCIC, 'tsc' : self.CompensateTSC} else: d = {'cic' : self.CompensateCICAliasing, 'tsc' : self.CompensateTSCAliasing} if not self.window in d: raise ValueError("compensation for window %s is not defined" % self.window) filter = d[self.window] return [('complex', filter, "circular")]
[docs] @staticmethod def CompensateTSC(w, v): """ Return the Fourier-space kernel that accounts for the convolution of the gridded field with the TSC window function in configuration space. .. note:: see equation 18 (with p=3) of `Jing et al 2005 <https://arxiv.org/abs/astro-ph/0409240>`_ Parameters ---------- w : list of arrays the list of "circular" coordinate arrays, ranging from :math:`[-\pi, \pi)`. v : array_like the field array """ for i in range(3): wi = w[i] tmp = (numpy.sinc(0.5 * wi / numpy.pi) ) ** 3 v = v / tmp return v
[docs] @staticmethod def CompensateCIC(w, v): """ Return the Fourier-space kernel that accounts for the convolution of the gridded field with the CIC window function in configuration space .. note:: see equation 18 (with p=2) of `Jing et al 2005 <https://arxiv.org/abs/astro-ph/0409240>`_ Parameters ---------- w : list of arrays the list of "circular" coordinate arrays, ranging from :math:`[-\pi, \pi)`. v : array_like the field array """ for i in range(3): wi = w[i] tmp = (numpy.sinc(0.5 * wi / numpy.pi) ) ** 2 tmp[wi == 0.] = 1. v = v / tmp return v
[docs] @staticmethod def CompensateTSCAliasing(w, v): """ Return the Fourier-space kernel that accounts for the convolution of the gridded field with the TSC window function in configuration space, as well as the approximate aliasing correction .. note:: see equation 20 of `Jing et al 2005 <https://arxiv.org/abs/astro-ph/0409240>`_ Parameters ---------- w : list of arrays the list of "circular" coordinate arrays, ranging from :math:`[-\pi, \pi)`. v : array_like the field array """ for i in range(3): wi = w[i] s = numpy.sin(0.5 * wi)**2 v = v / (1 - s + 2./15 * s**2) ** 0.5 return v
[docs] @staticmethod def CompensateCICAliasing(w, v): """ Return the Fourier-space kernel that accounts for the convolution of the gridded field with the CIC window function in configuration space, as well as the approximate aliasing correction .. note:: see equation 20 of `Jing et al 2005 <https://arxiv.org/abs/astro-ph/0409240>`_ Parameters ---------- w : list of arrays the list of "circular" coordinate arrays, ranging from :math:`[-\pi, \pi)`. v : array_like the field array """ for i in range(3): wi = w[i] v = v / (1 - 2. / 3 * numpy.sin(0.5 * wi) ** 2) ** 0.5 return v
[docs] def save(self, output, dataset='Field', mode='real'): """ Save the mesh as a :class:`~nbodykit.source.mesh.bigfile.BigFileMesh` on disk, either in real or complex space. Parameters ---------- output : str name of the bigfile file dataset : str, optional name of the bigfile data set where the field is stored mode : str, optional real or complex; the form of the field to store """ return MeshSource.save(self, output, dataset=dataset, mode=mode)