Source code for nbodykit.source.mesh.catalog

from nbodykit.base.mesh import MeshSource
from nbodykit import _global_options
import numpy
import logging
import warnings

# for converting from particle to mesh
from pmesh import window
from pmesh.pm import RealField, ComplexField

[docs]class CatalogMesh(MeshSource): """ A mesh generated by resampling a Catalog with the given parameters. The original CatalogSource object is stored as the :attr:`base` attribute. Parameters ---------- source : CatalogSource the input catalog that we are viewing as a mesh BoxSize : the size of the box Nmesh : int, 3-vector the number of cells per mesh side dtype : str the data type of the values stored on mesh Weight : array_like, Column, None column that specifies the weight value for each particle in the ``source`` to use when gridding Value : array_like, Column, None column that specifies the field value for each particle; the mesh stores a weighted average of this column Selection : array_like, Column, None column that selects the subset of particles to grid to the mesh Position : array_like, Column, None column in ``source`` specifying the position coordinates; default is ``Position`` interlaced : bool, optional use the interlacing technique of Sefusatti et al. 2015 to reduce the effects of aliasing on Fourier space quantities computed from the mesh compensated : bool, optional whether to correct for the window introduced by the grid interpolation scheme window : str, optional the string specifying which window interpolation scheme to use; see ``pmesh.window.methods`` 'Weight', 'Value', 'Selection', 'Position' are items of the collection that can be reassigned after the creation of the object. """ logger = logging.getLogger('CatalogMesh') def __repr__(self): return "(%s as CatalogMesh)" % repr(self.source) def __init__(self, source, Nmesh, BoxSize, Position, dtype='f4', resampler='cic', compensated=False, interlaced=False, Value=None, Selection=None, Weight=None, **kwargs): from nbodykit.base.catalog import CatalogSourceBase assert isinstance(source, CatalogSourceBase) self._columns = {} # copy meta-data from source too self.attrs.update(source.attrs) MeshSource.__init__(self, source.comm, Nmesh, BoxSize, dtype) self.source = source # store others as straight attributes self.dtype = dtype self.Position = Position self.Weight = Weight self.Value = Value self.Selection = Selection self.attrs['interlaced'] = interlaced self.attrs['compensated'] = compensated self.attrs['resampler'] = str(resampler) @property def interlaced(self): """ Whether to use interlacing when interpolating the density field. See :ref:`the documentation <interlacing>` for further details. See also: Section 3.1 of `Sefusatti et al. 2015 <https://arxiv.org/abs/1512.07295>`_ """ return self.attrs['interlaced'] @interlaced.setter def interlaced(self, interlaced): self.attrs['interlaced'] = interlaced @property def window(self): return self.attrs['resampler'] @window.setter def window(self, value): self.resampler = value @property def resampler(self): """ String specifying the name of the interpolation kernel when gridding the density field. See :ref:`the documentation <resampler-kernel>` for further details. .. note:: Valid values must be in :attr:`pmesh.resampler.methods` """ return self.attrs['resampler'] @resampler.setter def resampler(self, value): assert value in window.methods self.attrs['resampler'] = value.lower() # lower to compare with compensation @property def compensated(self): """ Boolean flag to indicate whether to correct for the windowing kernel introduced when interpolating the discrete particles to a continuous field. See :ref:`the documentation <compensation>` for further details. """ return self.attrs['compensated'] @compensated.setter def compensated(self, value): self.attrs['compensated'] = value
[docs] def to_real_field(self, out=None, normalize=True): """ Paint the density field, by interpolating the position column on to the mesh. This computes the following meta-data attributes in the process of painting, returned in the :attr:`attrs` attributes of the returned RealField object: - N : int the (unweighted) total number of objects painted to the mesh - W : float the weighted number of total objects, equal to the collective sum of the 'weight' column - shotnoise : float the Poisson shot noise, equal to the volume divided by ``N`` - num_per_cell : float the mean number of weighted objects per cell .. note:: The density field on the mesh is normalized as :math:`1+\delta`, such that the collective mean of the field is unity. See the :ref:`documentation <painting-mesh>` on painting for more details on painting catalogs to a mesh. Returns ------- real : :class:`pmesh.pm.RealField` the painted real field; this has a ``attrs`` dict storing meta-data """ pm = self.pm Nlocal = 0 # (unweighted) number of particles read on local rank Wlocal = 0 # (weighted) number of particles read on local rank W2local = 0 # sum of weight square. This is used to estimate shotnoise. # the paint brush window resampler = window.methods[self.resampler] # initialize the RealField to return if out is not None: assert isinstance(out, RealField), "output of to_real_field must be a RealField" numpy.testing.assert_array_equal(out.pm.Nmesh, pm.Nmesh) toret = out else: toret = RealField(pm) toret[:] = 0 # for interlacing, we need two empty meshes if out was provided # since out may have non-zero elements, messing up our interlacing sum if self.interlaced: real1 = RealField(pm) real1[:] = 0 # the second, shifted mesh (always needed) real2 = RealField(pm) real2[:] = 0 Position = self.Position Weight = self.Weight Value = self.Value Selection = self.Selection # ensure the slices are synced, since decomposition is collective Nlocalmax = max(pm.comm.allgather(len(Position))) H = pm.BoxSize / pm.Nmesh # paint data in chunks on each rank; # we do this by chunk 8 million is pretty big anyways. max_chunksize = _global_options['paint_chunk_size'] # use a local scope to avoid having two copies of data in memory def dochunk(s): if len(Position) != 0: # selection has to be computed many times when data is `large`. columns = [Position[s]] if Weight is not None: columns.append(Weight[s]) if Value is not None: columns.append(Value[s]) if Selection is not None: columns.append(Selection[s]) # be sure to use the source to compute data = self.source.compute(columns) sel = Ellipsis if Selection is None else data.pop() value = None if Value is None else data.pop()[sel] weight = None if Weight is None else data.pop()[sel] position = data.pop()[sel] else: # workaround a potential dask issue on empty dask arrays position = numpy.empty((0, 3), dtype=Position.dtype) weight = None value = None if weight is None: weight = numpy.ones(len(position)) if value is None: value = numpy.ones(len(position)) # track total (selected) number and sum of weights Nlocal = len(position) Wlocal = weight.sum() W2local = (weight ** 2).sum() # no interlacing if not self.interlaced: lay = pm.decompose(position, smoothing=0.5 * resampler.support) else: lay = pm.decompose(position, smoothing=1.0 * resampler.support) # if we are receiving too many particles, abort and retry with a smaller chunksize recvlengths = pm.comm.allgather(lay.recvlength) if any([recvlength > 2 * max_chunksize for recvlength in recvlengths]): if pm.comm.rank == 0: self.logger.info("Throttling chunksize as some ranks will receive too many particles. (%d > %d)" % (max(recvlengths), max_chunksize * 2)) raise StopIteration p = lay.exchange(position) w = lay.exchange(weight) v = lay.exchange(value) if not self.interlaced: pm.paint(p, mass=w * v, resampler=resampler, hold=True, out=toret) # interlacing: use 2 meshes separated by 1/2 cell size else: # in mesh units shifted = pm.affine.shift(0.5) # paint to two shifted meshes pm.paint(p, mass=w * v, resampler=resampler, hold=True, out=real1) pm.paint(p, mass=w * v, resampler=resampler, transform=shifted, hold=True, out=real2) return Nlocal, Wlocal, W2local import gc i = 0 chunksize = max_chunksize while i < Nlocalmax: s = slice(i, i + chunksize) if pm.comm.rank == 0: self.logger.info("Chunk %d ~ %d / %d " % (i, i + chunksize, Nlocalmax)) try: Nlocal1, Wlocal1, W2local1 = dochunk(s) except StopIteration: chunksize = chunksize // 2 if chunksize < 1: raise RuntimeError("Cannot find a chunksize that fits into memory.") continue finally: # collect unfreed items gc.collect() Nlocal += Nlocal1 Wlocal += Wlocal1 W2local += W2local1 Nglobal = pm.comm.allreduce(Nlocal) if pm.comm.rank == 0: self.logger.info("painted %d out of %d objects to mesh" % (Nglobal, self.source.csize)) i = i + chunksize chunksize = min(max_chunksize, int(chunksize * 1.5)) # now the loop over particles is done if not self.interlaced: # nothing to do, toret is already filled. pass else: # compose the two interlaced fields into the final result. c1 = real1.r2c() c2 = real2.r2c() # and then combine for k, s1, s2 in zip(c1.slabs.x, c1.slabs, c2.slabs): kH = sum(k[i] * H[i] for i in range(3)) s1[...] = s1[...] * 0.5 + s2[...] * 0.5 * numpy.exp(0.5 * 1j * kH) # FFT back to real-space # NOTE: cannot use "toret" here in case user supplied "out" c1.c2r(real1) # need to add to the returned mesh if user supplied "out" toret[:] += real1[:] # unweighted number of objects N = pm.comm.allreduce(Nlocal) # weighted number of objects W = pm.comm.allreduce(Wlocal) # weighted number of objects W2 = pm.comm.allreduce(W2local) # weighted number density (objs/cell) nbar = 1. * W / numpy.prod(pm.Nmesh) # make sure we painted something or nbar is nan; in which case # we set the density to uniform everywhere. if N == 0: warnings.warn(("trying to paint particle source to mesh, " "but no particles were found!"), RuntimeWarning ) # shot noise is volume / un-weighted number shotnoise = numpy.prod(pm.BoxSize) * W2 / W ** 2 # save some meta-data toret.attrs = {} toret.attrs['shotnoise'] = shotnoise toret.attrs['N'] = N toret.attrs['W'] = W toret.attrs['W2'] = W2 toret.attrs['num_per_cell'] = nbar csum = toret.csum() if pm.comm.rank == 0: self.logger.info("painted %d out of %d objects to mesh" %(N, self.source.csize)) self.logger.info("mean particles per cell is %g", nbar) self.logger.info("sum is %g ", csum) if normalize: if nbar > 0: toret[...] /= nbar else: toret[...] = 1 if pm.comm.rank == 0: self.logger.info("normalized the convention to 1 + delta") return toret
@property def actions(self): """ The actions to apply to the interpolated density field, optionally included the compensation correction. """ actions = MeshSource.actions.fget(self) if self.compensated: actions = self._get_compensation() + actions return actions def _get_compensation(self): return get_compensation(self.interlaced, self.resampler)
[docs]def get_compensation(interlaced, resampler): """ Return the compensation function, which corrects for the windowing kernel. The compensation function is computed as: - if ``interlaced = True``: - :func:`CompensateCIC` if using CIC window - :func:`CompensateTSC` if using TSC window - :func:`CompensatePCS` if using PCS window - if ``interlaced = False``: - :func:`CompensateCICShotnoise` if using CIC window - :func:`CompensateTSCShotnoise` if using TSC window - :func:`CompensatePCSShotnoise` if using PCS window """ if interlaced: d = {'cic' : CompensateCIC, 'tsc' : CompensateTSC, 'pcs' : CompensatePCS, } else: d = {'cic' : CompensateCICShotnoise, 'tsc' : CompensateTSCShotnoise, 'pcs' : CompensatePCSShotnoise, } if not resampler in d: raise ValueError("compensation for window %s is not defined" % resampler) filter = d[resampler] return [('complex', filter, "circular")]
[docs]def CompensateTSC(w, v): """ Return the Fourier-space kernel that accounts for the convolution of the gridded field with the TSC window function in configuration space. .. note:: see equation 18 (with p=3) of `Jing et al 2005 <https://arxiv.org/abs/astro-ph/0409240>`_ Parameters ---------- w : list of arrays the list of "circular" coordinate arrays, ranging from :math:`[-\pi, \pi)`. v : array_like the field array """ for i in range(3): wi = w[i] tmp = (numpy.sinc(0.5 * wi / numpy.pi) ) ** 3 v = v / tmp return v
[docs]def CompensatePCS(w, v): """ Return the Fourier-space kernel that accounts for the convolution of the gridded field with the PCS window function in configuration space. .. note:: see equation 18 (with p=4) of `Jing et al 2005 <https://arxiv.org/abs/astro-ph/0409240>`_ Parameters ---------- w : list of arrays the list of "circular" coordinate arrays, ranging from :math:`[-\pi, \pi)`. v : array_like the field array """ for i in range(3): wi = w[i] tmp = (numpy.sinc(0.5 * wi / numpy.pi) ) ** 4 v = v / tmp return v
[docs]def CompensateCIC(w, v): """ Return the Fourier-space kernel that accounts for the convolution of the gridded field with the CIC window function in configuration space .. note:: see equation 18 (with p=2) of `Jing et al 2005 <https://arxiv.org/abs/astro-ph/0409240>`_ Parameters ---------- w : list of arrays the list of "circular" coordinate arrays, ranging from :math:`[-\pi, \pi)`. v : array_like the field array """ for i in range(3): wi = w[i] tmp = (numpy.sinc(0.5 * wi / numpy.pi) ) ** 2 tmp[wi == 0.] = 1. v = v / tmp return v
[docs]def CompensateTSCShotnoise(w, v): """ Return the Fourier-space kernel that accounts for the convolution of the gridded field with the TSC window function in configuration space, as well as the approximate aliasing correction to the first order .. note:: see equation 20 of `Jing et al 2005 <https://arxiv.org/abs/astro-ph/0409240>`_ Parameters ---------- w : list of arrays the list of "circular" coordinate arrays, ranging from :math:`[-\pi, \pi)`. v : array_like the field array """ for i in range(3): wi = w[i] s = numpy.sin(0.5 * wi)**2 v = v / (1 - s + 2./15 * s**2) ** 0.5 return v
[docs]def CompensatePCSShotnoise(w, v): """ Return the Fourier-space kernel that accounts for the convolution of the gridded field with the PCS window function in configuration space, as well as the approximate aliasing correction to first order .. note:: YF: I derived this by fitting the result to s ** 3 according to the form given in equation 20 of Jing et al. It must be possible to derive this manually. Parameters ---------- w : list of arrays the list of "circular" coordinate arrays, ranging from :math:`[-\pi, \pi)`. v : array_like the field array """ for i in range(3): wi = w[i] s = numpy.sin(0.5 * wi)**2 v = v / (1 - 4./3. * s + 2./5. * s**2 - 4./315. * s**3) ** 0.5 return v
[docs]def CompensateCICShotnoise(w, v): """ Return the Fourier-space kernel that accounts for the convolution of the gridded field with the CIC window function in configuration space, as well as the approximate aliasing correction to the first order .. note:: see equation 20 of `Jing et al 2005 <https://arxiv.org/abs/astro-ph/0409240>`_ Parameters ---------- w : list of arrays the list of "circular" coordinate arrays, ranging from :math:`[-\pi, \pi)`. v : array_like the field array """ for i in range(3): wi = w[i] v = v / (1 - 2. / 3 * numpy.sin(0.5 * wi) ** 2) ** 0.5 return v