Source code for nbodykit.algorithms.threeptcf

from nbodykit import CurrentMPIComm
from nbodykit.binned_statistic import BinnedStatistic
from collections import OrderedDict
import numpy
import logging
import kdcount

[docs]class Base3PCF(object): """ Base class for implementing common 3PCF calculations. Users should use :class:`SimulationBox3PCF` or :class:`SurveyData3PCF`. """ def __init__(self, source, poles, edges, required_cols, BoxSize=None, periodic=None): from .pair_counters.base import verify_input_sources # verify the input sources inspect = periodic is not None BoxSize = verify_input_sources(source, None, BoxSize, required_cols, inspect_boxsize=inspect) self.source = source self.comm = self.source.comm # save the meta-data self.attrs = {} self.attrs['poles'] = poles self.attrs['edges'] = edges # store periodic/BoxSize for SimulationBox if periodic is not None: self.attrs['BoxSize'] = BoxSize self.attrs['periodic'] = periodic def _run(self, pos, w, pos_sec, w_sec, boxsize=None, bunchsize=10000): """ Internal function to run the 3PCF algorithm on the input data and weights. The input data/weights have already been domain-decomposed, and the loads should be balanced on all ranks. """ # maximum radius rmax = numpy.max(self.attrs['edges']) # the array to hold output values nbins = len(self.attrs['edges'])-1 Nell = len(self.attrs['poles']) zeta = numpy.zeros((Nell,nbins,nbins), dtype='f8') alms = {} walms = {} # compute the Ylm expressions we need if self.comm.rank == 0: self.logger.info("computing Ylm expressions...") Ylm_cache = YlmCache(self.attrs['poles'], self.comm) if self.comm.rank == 0: self.logger.info("...done") # make the KD-tree holding the secondaries tree_sec = kdcount.KDTree(pos_sec, boxsize=boxsize).root def callback(r, i, j, iprim=None): # remove self pairs valid = r > 0. r = r[valid]; i = i[valid] # normalized, re-centered position array (periodic) dpos = (pos_sec[i] - pos[iprim]) # enforce periodicity in dpos if boxsize is not None: for axis, col in enumerate(dpos.T): col[col > boxsize[axis]*0.5] -= boxsize[axis] col[col <= -boxsize[axis]*0.5] += boxsize[axis] recen_pos = dpos / r[:,numpy.newaxis] # find the mapping of r to rbins dig = numpy.searchsorted(self.attrs['edges'], r, side='left') # evaluate all Ylms Ylms = Ylm_cache(recen_pos[:,0]+1j*recen_pos[:,1], recen_pos[:,2]) # sqrt of primary weight w0 = w[iprim] # loop over each (l,m) pair for (l,m) in Ylms: # the Ylm evaluated at galaxy positions weights = Ylms[(l,m)] * w_sec[i] # sum over for each radial bin alm = alms.setdefault((l, m), numpy.zeros(nbins, dtype='c16')) walm = walms.setdefault((l, m), numpy.zeros(nbins, dtype='c16')) r1 = numpy.bincount(dig, weights=weights.real, minlength=nbins+2)[1:-1] alm[...] += r1 walm[...] += w0 * r1 if m != 0: i1 = numpy.bincount(dig, weights=weights.imag, minlength=nbins+2)[1:-1] alm[...] += 1j*i1 walm[...] += w0*1j*i1 # determine rank with largest load loads = self.comm.allgather(len(pos)) largest_load = numpy.argmax(loads) chunk_size = max(loads) // 10 # compute multipoles for each primary (s vector in the paper) for iprim in range(len(pos)): # alms must be clean for each primary particle; (s) in eq 15 and 8 of arXiv:1506.02040v2 alms.clear() walms.clear() tree_prim = kdcount.KDTree(numpy.atleast_2d(pos[iprim]), boxsize=boxsize).root tree_sec.enum(tree_prim, rmax, process=callback, iprim=iprim, bunch=bunchsize) if self.comm.rank == largest_load and iprim % chunk_size == 0: self.logger.info("%d%% done" % (10*iprim//chunk_size)) # combine alms into zeta(s); # this cannot be done in the callback because # it is a nonlinear function (outer product) of alm. for (l, m) in alms: alm = alms[(l, m)] walm = walms[(l, m)] # compute alm * conjugate(alm) alm_w_alm = numpy.outer(walm, alm.conj()) if m != 0: alm_w_alm += alm_w_alm.T # add in the -m contribution for m != 0 zeta[Ylm_cache.ell_to_iell[l], ...] += alm_w_alm.real # sum across all ranks zeta = self.comm.allreduce(zeta) # normalize according to Eq. 15 of Slepian et al. 2015 # differs by factor of (4 pi)^2 / (2l+1) from the C++ code zeta /= (4*numpy.pi) # make a BinnedStatistic dtype = numpy.dtype([('corr_%d' % ell, zeta.dtype) for ell in self.attrs['poles']]) data = numpy.empty(zeta.shape[-2:], dtype=dtype) for i, ell in enumerate(self.attrs['poles']): data['corr_%d' % ell] = zeta[i] # save the result edges = self.attrs['edges'] poles = BinnedStatistic(['r1', 'r2'], [edges, edges], data) return poles def __getstate__(self): return {'poles':self.poles.data, 'attrs':self.attrs} def __setstate__(self, state): self.__dict__.update(state) self.poles = BinnedStatistic(['r1', 'r2'], [self.attrs['edges']]*2, self.poles)
[docs] def save(self, output): """ Save the :attr:`poles` result to a JSON file with name ``output``. """ import json from nbodykit.utils import JSONEncoder # only the master rank writes if self.comm.rank == 0: self.logger.info('measurement done; saving result to %s' % output) with open(output, 'w') as ff: json.dump(self.__getstate__(), ff, cls=JSONEncoder)
[docs] @classmethod @CurrentMPIComm.enable def load(cls, filename, comm=None): """ Load a result from ``filename`` that has been saved to disk with :func:`save`. """ import json from nbodykit.utils import JSONDecoder if comm.rank == 0: with open(filename, 'r') as ff: state = json.load(ff, cls=JSONDecoder) else: state = None state = comm.bcast(state) self = object.__new__(cls) self.__setstate__(state) self.comm = comm return self
[docs]class SimulationBox3PCF(Base3PCF): """ Compute the multipoles of the isotropic, three-point correlation function in configuration space for data in a simulation box. This uses the algorithm of Slepian and Eisenstein, 2015 which scales as :math:`\mathcal{O}(N^2)`, where :math:`N` is the number of objects. Results are computed when the object is inititalized. See the documenation of :func:`run` for the attributes storing the results. .. note:: The algorithm expects the positions of objects in a simulation box to be the Cartesian ``x``, ``y``, and ``z`` vectors. For survey data, in the form of right ascension, declination, and redshift, see :class:`~nbodykit.algorithms.SurveyData3PCF`. Parameters ---------- source : CatalogSource the input source of particles providing the 'Position' column poles : list of int the list of multipole numbers to compute edges : array_like the edges of the bins of separation to use; length of nbins+1 BoxSize : float, 3-vector, optional the size of the box; if periodic boundary conditions used, and 'BoxSize' not provided in the source :attr:`attrs`, it must be provided here periodic : bool, optional whether to use periodic boundary conditions when computing separations between objects weight : str, optional the name of the column in the source specifying the particle weights References ---------- Slepian and Eisenstein, MNRAS 454, 4142-4158 (2015) """ logger = logging.getLogger("SimulationBox3PCF") def __init__(self, source, poles, edges, BoxSize=None, periodic=True, weight='Weight', position='Position'): # initialize the base class required_cols = [position, weight] Base3PCF.__init__(self, source, poles, edges, required_cols, BoxSize=BoxSize, periodic=periodic) # save the weight column self.attrs['weight'] = weight self.attrs['position'] = position # check largest possible separation if periodic: min_box_side = 0.5*self.attrs['BoxSize'].min() if numpy.amax(edges) > min_box_side: raise ValueError(("periodic pair counts cannot be computed for Rmax > BoxSize/2")) # run the algorithm self.poles = self.run()
[docs] def run(self, pedantic=False): """ Compute the three-point CF multipoles. This attaches the following the attributes to the class: - :attr:`poles` Attributes ---------- poles : :class:`~nbodykit.binned_statistic.BinnedStatistic` a BinnedStatistic object to hold the multipole results; the binned statistics stores the multipoles as variables ``corr_0``, ``corr_1``, etc for :math:`\ell=0,1,` etc. The coordinates of the binned statistic are ``r1`` and ``r2``, which give the separations between the three objects in CF. """ from .pair_counters.domain import decompose_box_data # the box size to use if self.attrs['periodic']: boxsize = self.attrs['BoxSize'] else: boxsize = None # domain decompose the data smoothing = numpy.max(self.attrs['edges']) (pos, w), (pos_sec, w_sec) = decompose_box_data(self.source, None, self.attrs, self.logger, smoothing) # run the algorithm if pedantic: return self._run(pos, w, pos_sec, w_sec, boxsize=boxsize, bunchsize=1) else: return self._run(pos, w, pos_sec, w_sec, boxsize=boxsize)
[docs]class SurveyData3PCF(Base3PCF): """ Compute the multipoles of the isotropic, three-point correlation function in configuration space for observational survey data. This uses the algorithm of Slepian and Eisenstein, 2015 which scales as :math:`\mathcal{O}(N^2)`, where :math:`N` is the number of objects. Results are computed when the object is inititalized. See the documenation of :func:`run` for the attributes storing the results. .. note:: The algorithm expects the positions of objects from a survey catalog be the sky coordinates, right ascension and declination, and redshift. For simulation box data in Cartesian coordinates, see :class:`~nbodykit.algorithms.SimulationBox3PCF`. .. warning:: The right ascension and declination columns should be specified in degrees. Parameters ---------- source : CatalogSource the input source of particles providing the 'Position' column poles : list of int the list of multipole numbers to compute edges : array_like the edges of the bins of separation to use; length of nbins+1 cosmo : :class:`~nbodykit.cosmology.cosmology.Cosmology` the cosmology instance used to convert redshifts into comoving distances ra : str, optional the name of the column in the source specifying the right ascension coordinates in units of degrees; default is 'RA' dec : str, optional the name of the column in the source specifying the declination coordinates; default is 'DEC' redshift : str, optional the name of the column in the source specifying the redshift coordinates; default is 'Redshift' weight : str, optional the name of the column in the source specifying the object weights domain_factor : int, optional the integer value by which to oversubscribe the domain decomposition mesh before balancing loads; this number can affect the distribution of loads on the ranks -- an optimal value will lead to balanced loads References ---------- Slepian and Eisenstein, MNRAS 454, 4142-4158 (2015) """ logger = logging.getLogger("SurveyData3PCF") def __init__(self, source, poles, edges, cosmo, domain_factor=4, ra='RA', dec='DEC', redshift='Redshift', weight='Weight'): # initialize the base class required_cols = [ra, dec, redshift, weight] Base3PCF.__init__(self, source, poles, edges, required_cols) # save meta-data self.attrs['cosmo'] = cosmo self.attrs['weight'] = weight self.attrs['ra'] = ra self.attrs['dec'] = dec self.attrs['redshift'] = redshift self.attrs['domain_factor'] = domain_factor # run the algorithm self.poles = self.run()
[docs] def run(self): """ Compute the three-point CF multipoles. This attaches the following the attributes to the class: - :attr:`poles` Attributes ---------- poles : :class:`~nbodykit.binned_statistic.BinnedStatistic` a BinnedStatistic object to hold the multipole results; the binned statistics stores the multipoles as variables ``corr_0``, ``corr_1``, etc for :math:`\ell=0,1,` etc. The coordinates of the binned statistic are ``r1`` and ``r2``, which give the separations between the three objects in CF. """ from .pair_counters.domain import decompose_survey_data # domain decompose the data # NOTE: pos and pos_sec are Cartesian! smoothing = numpy.max(self.attrs['edges']) (pos, w), (pos_sec, w_sec) = decompose_survey_data(self.source, None, self.attrs, self.logger, smoothing, return_cartesian=True, domain_factor=self.attrs['domain_factor']) # run the algorithm return self._run(pos, w, pos_sec, w_sec)
[docs]class YlmCache(object): """ A class to compute spherical harmonics :math:`Y_{lm}` up to a specified maximum :math:`\ell`. During calculation, the necessary power of Cartesian unit vectors are cached in memory to avoid repeated calculations for separate harmonics. """ def __init__(self, ells, comm): import sympy as sp from sympy.utilities.lambdify import implemented_function from sympy.parsing.sympy_parser import parse_expr from sympy.core import sympify self.ells = numpy.asarray(ells).astype(int) self.max_ell = max(self.ells) # look up table from ell to iell, index for cummulating results. self.ell_to_iell = numpy.empty(self.max_ell + 1, dtype=int) for iell, ell in enumerate(self.ells): self.ell_to_iell[ell] = iell lms = [(l,m) for l in ells for m in range(0, l+1)] # compute the Ylm string expressions in parallel exprs = [] for i in range(comm.rank, len(lms), comm.size): lm = lms[i] exprs.append((lm, str(self._get_Ylm(*lm)))) exprs = [x for sublist in comm.allgather(exprs) for x in sublist] # determine the powers entering into each expression args = {} for lm, expr in exprs: matches = [] for var in ['xpyhat', 'zhat']: for e in range(2, max(ells)+1): name = var + '**' + str(e) if name in expr: matches.append((sympify(name), 'cached_'+var, str(e))) args[lm] = matches # define a function to return cached power def from_cache(name, pow): return self._cache[str(name)+str(pow)] f = implemented_function(sp.Function('from_cache'), from_cache) # arguments to the sympy functions zhat = sp.Symbol('zhat', real=True, positive=True) xpyhat = sp.Symbol('xpyhat', complex=True) self._cache = {} # make the Ylm functions self._Ylms = OrderedDict() for lm, expr in exprs: expr = parse_expr(expr, local_dict={'zhat':zhat, 'xpyhat':xpyhat}) for var in args[lm]: expr = expr.replace(var[0], sympify('from_cache(%s, %s)' %var[1:])) self._Ylms[lm] = sp.lambdify((xpyhat, zhat), expr)
[docs] def __call__(self, xpyhat, zhat): """ Return a dictionary holding Ylm for each (l,m) combination required Parameters ---------- xpyhat : array_like a complex array holding xhat + i * yhat, where xhat and yhat are the two cartesian unit vectors zhat : array_like the third cartesian unit vector """ # fill the cache first self._cache['cached_xpyhat2'] = xpyhat**2 self._cache['cached_zhat2'] = zhat**2 for name,x in zip(['cached_xpyhat', 'cached_zhat'], [xpyhat, zhat]): for i in range(3, self.max_ell+1): self._cache[name+str(i)] = self._cache[name+str(i-1)]*x # return a dictionary for each (l,m) tuple toret = {} for lm in self._Ylms: toret[lm] = self._Ylms[lm](xpyhat, zhat) return toret
def _get_Ylm(self, l, m): """ Compute an expression for spherical harmonic of order (l,m) in terms of Cartesian unit vectors, :math:`\hat{z}` and :math:`\hat{x} + i \hat{y}` Parameters ---------- l : int the degree of the harmonic m : int the order of the harmonic; |m| < l Returns ------- expr : a sympy expression that corresponds to the requested Ylm References ---------- https://en.wikipedia.org/wiki/Spherical_harmonics """ import sympy as sp # the relevant cartesian and spherical symbols x, y, z, r = sp.symbols('x y z r', real=True, positive=True) xhat, yhat, zhat = sp.symbols('xhat yhat zhat', real=True, positive=True) xpyhat = sp.Symbol('xpyhat', complex=True) phi, theta = sp.symbols('phi theta') defs = [(sp.sin(phi), y/sp.sqrt(x**2+y**2)), (sp.cos(phi), x/sp.sqrt(x**2+y**2)), (sp.cos(theta), z/sp.sqrt(x**2 + y**2 + z**2)) ] # the cos(theta) dependence encoded by the associated Legendre poly expr = sp.assoc_legendre(l, m, sp.cos(theta)) # the exp(i*m*phi) dependence expr *= sp.expand_trig(sp.cos(m*phi)) + sp.I*sp.expand_trig(sp.sin(m*phi)) # simplifying optimizations expr = sp.together(expr.subs(defs)).subs(x**2 + y**2 + z**2, r**2) expr = expr.expand().subs([(x/r, xhat), (y/r, yhat), (z/r, zhat)]) expr = expr.factor().factor(extension=[sp.I]).subs(xhat+sp.I*yhat, xpyhat) expr = expr.subs(xhat**2 + yhat**2, 1-zhat**2).factor() # and finally add the normalization amp = sp.sqrt((2*l+1) / (4*numpy.pi) * sp.factorial(l-m) / sp.factorial(l+m)) expr *= amp return expr