Source code for nbodykit.algorithms.survey_paircount

from .sim_paircount import PairCountBase, verify_input_sources, MissingCorrfuncError
from nbodykit.binned_statistic import BinnedStatistic

import logging
import numpy

[docs]class SurveyPairCountBase(PairCountBase): """ An abstract base class for pair count algorithms from surveys. The input data is assumed to be of the form (ra, dec, redshift) for these algorithms. """ def _decompose(self, rmax, angular=False): """ An internal function to perform a domain decomposition on the two sources we wish to correlate, returning the position/weights for each object in the correlating pair. The domain decomposition is based on the Cartesian coordinates of the input data (assumed to be in sky coordinates). The implementation follows: 1. Decompose the first source and balance the particle load, such that the first source is evenly distributed across all ranks and the objects are spatially tight on a given rank. 2. Decompose the second source, ensuring a given rank holds all particles within the desired maximum separation. Parameters ---------- rmax : float the maximum Cartesian separation implied by the user's binning angular : bool, optional if ``True``, the Cartesian positions used in the domain decomposition are on the unit sphere Returns ------- (pos1, w1), (pos2, w2) : array_like the (decomposed) set of positions and weights to correlate """ from nbodykit.transform import StackColumns from pmesh.domain import GridND from nbodykit.utils import split_size_3d # either (ra,dec) or (ra,dec,redshift) poscols = [self.attrs['ra'], self.attrs['dec']] if not angular: poscols += [self.attrs['redshift']] # determine processor division for domain decomposition np = split_size_3d(self.comm.size) if self.comm.rank == 0: self.logger.info("using cpu grid decomposition: %s" %str(np)) # stack position and compute pos1 = StackColumns(*[self.first[col] for col in poscols]) pos1, w1 = self.first.compute(pos1, self.first[self.attrs['weight']]) N1 = self.comm.allreduce(len(pos1)) # get comoving dist and boxsize for first cosmo = self.attrs.get('cosmo', None) cpos1, boxsize1, rdist1 = get_cartesian(self.comm, pos1, cosmo=cosmo) # pass in comoving dist to Corrfunc instead of redshift if not angular: pos1[:,2] = rdist1 # set up position for second too if self.second is not None: # stack position and compute for "second" pos2 = StackColumns(*[self.second[col] for col in poscols]) pos2, w2 = self.second.compute(pos2, self.second[self.attrs['weight']]) N2 = self.comm.allreduce(len(pos2)) # get comoving dist and boxsize cpos2, boxsize2, rdist2 = get_cartesian(self.comm, pos2, cosmo=cosmo) # pass in comoving distance instead of redshift if not angular: pos2[:,2] = rdist2 else: pos2 = pos1 w2 = w1 N2 = N1 boxsize2 = boxsize1 cpos2 = cpos1 # determine global boxsize if self.second is None: boxsize = boxsize1 else: boxsizes = numpy.vstack([boxsize1, boxsize2]) argmax = numpy.argmax(boxsizes, axis=0) boxsize = boxsizes[argmax, [0,1,2]] # initialize the domain # NOTE: over-decompose by factor of 2 to trigger load balancing grid = [ numpy.linspace(0, boxsize[0], 2*np[0] + 1, endpoint=True), numpy.linspace(0, boxsize[1], 2*np[1] + 1, endpoint=True), numpy.linspace(0, boxsize[2], 2*np[2] + 1, endpoint=True), ] domain = GridND(grid, comm=self.comm) # balance the load domain.loadbalance(domain.load(cpos1)) # decompose based on cartesian positions layout = domain.decompose(cpos1, smoothing=0) pos1 = layout.exchange(pos1) w1 = layout.exchange(w1) # get the position/weight of the secondaries if rmax > boxsize.max() * 0.25: pos2 = numpy.concatenate(self.comm.allgather(pos2), axis=0) w2 = numpy.concatenate(self.comm.allgather(w2), axis=0) else: layout = domain.decompose(cpos2, smoothing=rmax) pos2 = layout.exchange(pos2) w2 = layout.exchange(w2) # log the sizes of the trees self._log(N1, N2, pos1, pos2) return (pos1, w1), (pos2, w2)
[docs]class SurveyDataPairCount(SurveyPairCountBase): """ Count (weighted) pairs of objects from a survey data catalog using the :mod:`Corrfunc` package. This uses the:func:`Corrfunc.mocks.DDsmu_mocks.DDsmu_mocks` function to count pairs. Results are computed when the object is inititalized. See the documenation of :func:`~SurveyDataPairCount.run` for the attributes storing the results. .. note:: The algorithm expects the positions of particles from a survey catalog be the sky coordinates, right ascension and declination, and redshift. To compute pair counts in a simulation box, using the Cartesian coordinate vectors, see :class:`SimulationBoxPairCount`. .. warning:: The right ascension and declination columns should be specified in degrees. Parameters ---------- mode : {'1d', '2d'} compute paircounts as a function of ``r`` and ``mu`` or just ``r`` first : CatalogSource the first source of particles, providing the 'Position' column redges : array_like the radius bin edges; length of nbins+1 cosmo : :class:`~nbodykit.cosmology.core.Cosmology` the cosmology instance used to convert redshift into comoving distance second : CatalogSource, optional the second source of particles to cross-correlate Nmu : int, optional the number of ``mu`` bins, ranging from 0 to 1 ra : str, optional the name of the column in the source specifying the right ascension coordinates in units of degrees; default is 'RA' dec : str, optional the name of the column in the source specifying the declination coordinates; default is 'DEC' redshift : str, optional the name of the column in the source specifying the redshift coordinates; default is 'Redshift' weight : str, optional the name of the column in the source specifying the object weights show_progress : bool, optional if ``True``, perform the pair counting calculation in 10 iterations, logging the progress after each iteration; this is useful for understanding the scaling of the code **config : key/value pairs additional keywords to pass to the :mod:`Corrfunc` function """ logger = logging.getLogger('SurveyDataPairCount') def __init__(self, mode, first, redges, cosmo, second=None, Nmu=5, ra='RA', dec='DEC', redshift='Redshift', weight='Weight', show_progress=True, **config): # check input 'mode' assert mode in ['1d', '2d'], "PairCount mode must be '1d' or '2d'" # check rmin if numpy.min(redges) <= 0.: raise ValueError(("the lower edge of the 1st separation bin must " "greater than zero (no self-pairs)")) # verify the input sources required_cols = [ra, dec, redshift, weight] verify_input_sources(first, second, None, required_cols, inspect_boxsize=False) # init the base class SurveyPairCountBase.__init__(self, first, second, show_progress) # save the meta-data self.attrs['mode'] = mode self.attrs['redges'] = redges self.attrs['Nmu'] = Nmu self.attrs['cosmo'] = cosmo self.attrs['weight'] = weight self.attrs['ra'] = ra self.attrs['dec'] = dec self.attrs['redshift'] = redshift self.attrs['config'] = config # run the algorithm self.run()
[docs] def run(self): """ Calculate the 3D pair-counts of a survey data catalog as a function of separation ``r`` or separation and angle to line-of-sight (``r``, ``mu``). This adds the following attribute: - :attr:`SurveyDataPairCount.result` Attributes ---------- result : :class:`~nbodykit.binned_statistic.BinnedStatistic` a BinnedStatistic object holding the pair count results. The coordinate grid is either ``r`` or ``r`` and ``mu``. It stores the following variables: - ``r``: the mean separation value in the bin - ``npairs``: the number of pairs in the bin - ``weightavg``: the average weight value in the bin; each pair contributes the product of the individual weight values """ try: from Corrfunc.mocks import DDsmu_mocks except ImportError: raise MissingCorrfuncError() # some setup redges = self.attrs['redges'] Nmu = 1 if self.attrs['mode'] == '1d' else self.attrs['Nmu'] # maximum separation value rmax = numpy.max(self.attrs['redges']) # domain decompose the data # NOTE: pos has 3 columns: (ra, dec, redshift) (pos1, w1), (pos2, w2) = self._decompose(rmax, angular=False) # do the pair counting kws = {} kws['autocorr'] = 0 kws['cosmology'] = 1 kws['nthreads'] = 1 kws['nmu_bins'] = Nmu kws['mu_max'] = 1.0 kws['binfile'] = self.attrs['redges'] kws['RA2'] = pos2[:,0] kws['DEC2'] = pos2[:,1] kws['CZ2'] = pos2[:,2] kws['weights2'] = w2.astype(pos2.dtype) kws['weight_type'] = 'pair_product' kws['output_savg'] = True kws['is_comoving_dist'] = True kws.update(self.attrs['config']) def callback(kws, chunk): kws['RA1'] = pos1[chunk][:,0] # ra kws['DEC1'] = pos1[chunk][:,1] # dec kws['CZ1'] = pos1[chunk][:,2] # comoving distance kws['weights1'] = w1[chunk].astype(pos1.dtype) # run sizes = self.comm.allgather(len(pos1)) pc = self._run(DDsmu_mocks, kws, sizes, callback=callback) pc = pc.reshape((-1, Nmu)) # sum results over all ranks pc = self.comm.allreduce(pc) # make a new structured array dtype = numpy.dtype([('r', 'f8'), ('npairs', 'u8'), ('weightavg', 'f8')]) data = numpy.zeros(pc.shape, dtype=dtype) # copy over main results data['r'] = pc['savg'] data['npairs'] = pc['npairs'] data['weightavg'] = pc['weightavg'] # make the BinnedStatistic if self.attrs['mode'] == '1d': self.result = BinnedStatistic(['r'], [redges], numpy.squeeze(data), fields_to_sum=['npairs']) else: muedges = numpy.linspace(0, 1., Nmu+1) self.result = BinnedStatistic(['r','mu'], [redges,muedges], data, fields_to_sum=['npairs'])
[docs]class AngularPairCount(SurveyPairCountBase): """ Count (weighted) angular pairs of objects from a survey data catalog using the :mod:`Corrfunc` package. This uses the:func:`Corrfunc.mocks.DDtheta_mocks.DDtheta_mocks` function to count pairs. Results are computed when the object is inititalized. See the documenation of :func:`~AngularPairCount.run` for the attributes storing the results. .. note:: The algorithm expects the positions of particles from the survey catalog be the right ascension and declination angular coordinates. .. warning:: The right ascension and declination columns should be specified in degrees. Parameters ---------- first : CatalogSource the first source of particles, providing the 'Position' column edges : array_like the angular separation bin edges (in degrees); length of ``nbins+1`` second : CatalogSource, optional the second source of particles to cross-correlate ra : str, optional the name of the column in the source specifying the right ascension coordinates in units of degrees; default is 'RA' dec : str, optional the name of the column in the source specifying the declination coordinates; default is 'DEC' weight : str, optional the name of the column in the source specifying the object weights show_progress : bool, optional if ``True``, perform the pair counting calculation in 10 iterations, logging the progress after each iteration; this is useful for understanding the scaling of the code **config : key/value pairs additional keywords to pass to the :mod:`Corrfunc` function """ logger = logging.getLogger('AngularPairCount') def __init__(self, first, edges, second=None, ra='RA', dec='DEC', weight='Weight', show_progress=True, **config): # check theta min if numpy.min(edges) <= 0.: raise ValueError(("the lower edge of the 1st separation bin must " "greater than zero (no self-pairs)")) # verify the input sources verify_input_sources(first, second, None, [ra, dec, weight], inspect_boxsize=False) # init the base class SurveyPairCountBase.__init__(self, first, second, show_progress) # save the meta-data self.attrs['edges'] = edges self.attrs['ra'] = ra self.attrs['dec'] = dec self.attrs['weight'] = weight self.attrs['config'] = config # run the algorithm self.run() def __setstate__(self, state): self.__dict__.update(state) edges = self.attrs['edges'] self.result = BinnedStatistic(['theta'], [edges], self.result, fields_to_sum=['npairs'])
[docs] def run(self): """ Calculate the angular pair counts of a survey data catalog as a function of separation angle on the sky. This adds the following attribute: - :attr:`AngularPairCount.result` Attributes ---------- result : :class:`~nbodykit.binned_statistic.BinnedStatistic` a BinnedStatistic object holding the pair count results. The coordinate grid is ``theta``, the angular separation bins. It stores the following variables: - ``theta``: the mean separation value in the bin - ``npairs``: the number of pairs in the bin - ``weightavg``: the average weight value in the bin; each pair contributes the product of the individual weight values """ try: from Corrfunc.mocks import DDtheta_mocks except ImportError: raise MissingCorrfuncError() # maximum R separation from angular separation theta_max = numpy.max(self.attrs['edges']) rmax = 2 * numpy.sin(0.5 * numpy.deg2rad(theta_max)) # domain decompose the data # NOTE: pos is angular and only has 2 columns: (ra, dec) (pos1, w1), (pos2, w2) = self._decompose(rmax, angular=True) # do the pair counting kws = {} kws['autocorr'] = 0 kws['nthreads'] = 1 kws['binfile'] = self.attrs['edges'] kws['RA2'] = pos2[:,0] kws['DEC2'] = pos2[:,1] kws['weights2'] = w2.astype(pos2.dtype) kws['weight_type'] = 'pair_product' kws['output_thetaavg'] = True kws.update(self.attrs['config']) def callback(kws, chunk): kws['RA1'] = pos1[chunk][:,0] # ra kws['DEC1'] = pos1[chunk][:,1] # dec kws['weights1'] = w1[chunk].astype(pos1.dtype) # run sizes = self.comm.allgather(len(pos1)) pc = self._run(DDtheta_mocks, kws, sizes, callback=callback) # sum results over all ranks pc = self.comm.allreduce(pc) # make a new structured array dtype = numpy.dtype([('theta', 'f8'), ('npairs', 'u8'), ('weightavg', 'f8')]) data = numpy.zeros(pc.shape, dtype=dtype) # copy over main results data['theta'] = pc['thetaavg'] data['npairs'] = pc['npairs'] data['weightavg'] = pc['weightavg'] # make the BinnedStatistic edges = [self.attrs['edges']] self.result = BinnedStatistic(['theta'], edges, data, fields_to_sum=['npairs'])
[docs]def get_cartesian(comm, pos, cosmo=None): """ Convert sky coordinates to Cartesian coordinates and return the implied box size from the position bounds. If ``cosmo`` is not provided, return coordinates on the unit sphere. """ from nbodykit.utils import get_data_bounds # get RA,DEC in degrees ra, dec = numpy.deg2rad(pos[:,0]), numpy.deg2rad(pos[:,1]) # cartesian position x = numpy.cos( dec ) * numpy.cos( ra ) y = numpy.cos( dec ) * numpy.sin( ra ) z = numpy.sin( dec ) cpos = numpy.vstack([x,y,z]).T # multiply by comoving distance? if cosmo is not None: assert pos.shape[-1] == 3 rdist = cosmo.comoving_distance(pos[:,2]) # in Mpc/h cpos = rdist[:,None] * cpos else: rdist = None # min/max of position cpos_min, cpos_max = get_data_bounds(cpos, comm) boxsize = numpy.ceil(abs(cpos_max - cpos_min)) return cpos, boxsize, rdist