import numpy
import logging
from six import string_types
from nbodykit import CurrentMPIComm
from nbodykit.binned_statistic import BinnedStatistic
class MissingCorrfuncError(Exception):
def __init__(self):
msg = "use either ``conda install -c bccp corrfunc`` "
msg += "or ``pip install git+git://github.com/nickhand/Corrfunc``"
self.args = (msg,)
[docs]class CorrfuncResult(object):
"""
A class used internally holding the array like result of a
pair counting algorithm from :mod:`Corrfunc`.
This class is useful for summing pair count results, accounting for
columns that are pair-weighted.
Parameters
----------
data : numpy.ndarray
the numpy structured array result from Corrfunc
"""
valid = ['weightavg', 'npairs', 'savg', 'ravg', 'thetaavg']
def __init__(self, data):
# copy over the valid colums from the input result
dtype = [(col, data.dtype[col]) for col in data.dtype.names if col in self.valid]
self.data = numpy.zeros(data.shape, dtype=dtype)
self.columns = self.data.dtype.names
for col in self.columns:
self.data[col] = data[col]
def __getitem__(self, col):
return self.data[col]
@property
def shape(self):
return self.data.shape
@property
def dtype(self):
return self.data.dtype
def reshape(self, *args, **kwargs):
self.data = self.data.reshape(*args, **kwargs)
return self
def __radd__(self, other):
return self + other
def __add__(self, other):
# all columns are pair-weighted except for "npairs"
data = numpy.empty_like(self.data)
for col in self.columns:
if col == 'npairs': continue
data[col] = self[col]*self['npairs'] + other[col]*other['npairs']
# just sum up "npairs"
data['npairs'] = self['npairs'] + other['npairs']
# normalize by total "npairs"
idx = data['npairs'] > 0.
for col in self.columns:
if col == 'npairs': continue
data[col][idx] /= data['npairs'][idx]
return CorrfuncResult(data)
[docs]class PairCountBase(object):
"""
An abstract base class for pair counting algorithms, either for a
simulation box or survey data.
Users should use one of the subclasses of this class.
Parameters
----------
first : CatalogSource
the first source of particles
second : CatalogSource, optional
the second source of particles to cross-correlate
"""
def __init__(self, first, second, show_progress=True):
self.first = first
self.second = second
self.comm = first.comm
self.attrs = {}
self.attrs['show_progress'] = show_progress
def __getstate__(self):
return {'result':self.result.data, 'attrs':self.attrs}
def __setstate__(self, state):
self.__dict__.update(state)
redges = self.attrs['redges']
if self.attrs['mode'] == '1d':
self.result = BinnedStatistic(['r'], [redges], self.result, fields_to_sum=['npairs'])
else:
muedges = numpy.linspace(0, 1., self.attrs['Nmu']+1)
self.result = BinnedStatistic(['r', 'mu'], [redges, muedges], self.result, fields_to_sum=['npairs'])
def _log(self, N1, N2, pos1, pos2):
"""
Internal function to log the distribution of particles to correlate
across ranks.
Parameters
----------
pos1 : array_like
the first set of particle positions this rank is correlating
pos2 : array_like
the second set of particle positions this rank is correlating
"""
# global counts
if self.comm.rank == 0:
self.logger.info('correlating %d x %d objects in total' %(N1, N2))
# sizes for all ranks
sizes1 = self.comm.gather(len(pos1), root=0)
sizes2 = self.comm.gather(len(pos2), root=0)
# rank 0 logs
if self.comm.rank == 0:
args = (numpy.median(sizes1), numpy.median(sizes2))
self.logger.info("correlating A x B = %d x %d objects (median) per rank" % args)
global_min = numpy.min(sizes1)
self.logger.info("min A load per rank = %d" % global_min)
global_max = numpy.max(sizes1)
self.logger.info("max A load per rank = %d" % global_max)
args = (N1//self.comm.size, N2)
self.logger.info("(even distribution would result in %d x %d)" % args)
def _decompose(self):
"""
An internal function to perform a domain decomposition on the two
sources we wish to correlate, returning the position/weights for each
object in the correlating pair.
The particles are assumed to be in a cubical box, and no load
balancing is required.
The implementation follows:
1. Decompose the first source such that the objects are spatially
tight on a given rank.
2. Decompose the second source, ensuring a given rank holds all
particles within the desired maximum separation.
Returns
-------
(pos1, w1), (pos2, w2) : array_like
the (decomposed) set of positions and weights to correlate
"""
from pmesh.domain import GridND
from nbodykit.utils import split_size_3d
# determine processor division for domain decomposition
np = split_size_3d(self.comm.size)
if self.comm.rank == 0:
self.logger.info("using cpu grid decomposition: %s" %str(np))
# get the (periodic-enforced) position for first
pos1 = self.first['Position']
if self.attrs['periodic']:
pos1 %= self.attrs['BoxSize']
pos1, w1 = self.first.compute(pos1, self.first[self.attrs['weight']])
N1 = self.comm.allreduce(len(pos1))
# get the (periodic-enforced) position for second
if self.second is not None:
pos2 = self.second['Position']
if self.attrs['periodic']:
pos2 %= self.attrs['BoxSize']
pos2, w2 = self.second.compute(pos2, self.second[self.attrs['weight']])
N2 = self.comm.allreduce(len(pos2))
else:
pos2 = pos1
w2 = w1
N2 = N1
# domain decomposition
grid = [
numpy.linspace(0, self.attrs['BoxSize'][0], np[0] + 1, endpoint=True),
numpy.linspace(0, self.attrs['BoxSize'][1], np[1] + 1, endpoint=True),
numpy.linspace(0, self.attrs['BoxSize'][2], np[2] + 1, endpoint=True),
]
domain = GridND(grid, comm=self.comm)
# exchange first particles
layout = domain.decompose(pos1, smoothing=0)
pos1 = layout.exchange(pos1)
w1 = layout.exchange(w1)
# exchange second particles
rmax = numpy.max(self.attrs['redges'])
if rmax > self.attrs['BoxSize'].max() * 0.25:
pos2 = numpy.concatenate(self.comm.allgather(pos2), axis=0)
w2 = numpy.concatenate(self.comm.allgather(w2), axis=0)
else:
layout = domain.decompose(pos2, smoothing=rmax)
pos2 = layout.exchange(pos2)
w2 = layout.exchange(w2)
# log the sizes of the trees
self._log(N1, N2, pos1, pos2)
return (pos1, w1), (pos2, w2)
def _run(self, func, kwargs, loads, callback=None):
"""
Internal function that calls ``func`` in ``N`` iterations, optionally
calling ``callback`` before each iteration.
This allows the Corrfunc function ``func`` to be called in chunks,
giving the user a progress report after each iteration.
Parameters
----------
func : callable
the Corrfunc callable -- ``kwargs`` will be passed ot this function
kwargs : dict
the dictionary of arguments to pass to ``func``
loads : list of int
the list of loads for every rank; this corresponds to the number
of particles in the in ``A`` if we are correlating ``A`` x ``B``
callback : callable, optional
a callable takings ``kwargs`` as its first argument and a slice
object as its second argument; this will be called first during
each iteration
Returns
-------
pc : CorrfuncResult
the total pair count result, summed over all iterations run
"""
# the rank with the largest load
largest_load = numpy.argmax(loads)
# do the pair counting
def run(chunk):
if callback is not None:
callback(kwargs, chunk)
return self._run_corrfunc(func, **kwargs)
# log the function start
if self.comm.rank == 0:
name = func.__module__ + '.' + func.__name__
self.logger.info("calling function '%s'" % name)
# number of iterations
N = 10 if self.attrs['show_progress'] else 1
# run in chunks
pc = None
chunks = numpy.array_split(range(loads[self.comm.rank]), N, axis=0)
for i, chunk in enumerate(chunks):
this_pc = run(chunk)
if self.comm.rank == largest_load and self.attrs['show_progress']:
self.logger.info("%d%% done" % (N*(i+1)))
# sum up the results
pc = this_pc if pc is None else pc + this_pc
return pc
def _run_corrfunc(self, func, **kws):
"""
Run the wrapped Corrfunc function ``func``, passing in the keywords
specified by ``kws``.
This hides all output from the Corrfunc function (stdout, stderr,
and C-level output), unless an exception occurs.
"""
from nbodykit.utils import captured_output
try:
# record progress capture output for everything but root
with captured_output(self.comm, root=None) as (out, err):
result = func(**kws)
except Exception as e:
# get the values Corrfunc logged to stdout and stderr
stdout = out.getvalue(); stderr = err.getvalue()
# log all of the output in a new exception
name = func.__module__ + '.' + func.__name__
msg = "calling the function '%s' failed:\n" % name
msg += "exception: %s\n" % str(e)
msg += "stdout: %s\n" % stdout
msg += "stderr: %s" % stderr
raise RuntimeError(msg)
return CorrfuncResult(result)
[docs] def save(self, output):
"""
Save result as a JSON file with name ``output``
"""
import json
from nbodykit.utils import JSONEncoder
# only the master rank writes
if self.comm.rank == 0:
self.logger.info('measurement done; saving result to %s' % output)
with open(output, 'w') as ff:
json.dump(self.__getstate__(), ff, cls=JSONEncoder)
[docs] @classmethod
@CurrentMPIComm.enable
def load(cls, output, comm=None):
"""
Load a result has been saved to disk with :func:`save`.
"""
import json
from nbodykit.utils import JSONDecoder
if comm.rank == 0:
with open(output, 'r') as ff:
state = json.load(ff, cls=JSONDecoder)
else:
state = None
state = comm.bcast(state)
self = object.__new__(cls)
self.__setstate__(state)
self.comm = comm
return self
[docs]class SimulationBoxPairCount(PairCountBase):
"""
Count (weighted) pairs of objects in a simulation box using the
:mod:`Corrfunc` package.
This uses the :func:`Corrfunc.theory.DD.DD` and
:func:`Corrfunc.theory.DDsmu.DDsmu` functions to count pairs.
Results are computed when the object is inititalized. See the documenation
of :func:`~SimulationBoxPairCount.run` for the attributes storing the
results.
.. note::
The algorithm expects the positions of particles in a simulation box to
be the Cartesian ``x``, ``y``, and ``z`` vectors. To compute
pair counts on survey data, using right ascension, declination, and
redshift, see :class:`SurveyDataPairCount`.
Parameters
----------
mode : {'1d', '2d'}
compute pair counts as a function of ``r`` and ``mu`` or just ``r``
first : CatalogSource
the first source of particles, providing the 'Position' column
redges : array_like
the radius bin edges; length of nbins+1
BoxSize : float, 3-vector, optional
the size of the box; if 'BoxSize' is not provided in the source
'attrs', it must be provided here
second : CatalogSource, optional
the second source of particles to cross-correlate
Nmu : int, optional
the number of ``mu`` bins, ranging from 0 to 1
los : {'x', 'y', 'z'}, int, optional
the axis of the simulation box to treat as the line-of-sight direction;
this can be provided as string identifying one of 'x', 'y', 'z' or
the equivalent integer number of the axis
periodic : bool, optional
whether to use periodic boundary conditions
weight : str, optional
the name of the column in the source specifying the particle weights
show_progress : bool, optional
if ``True``, perform the pair counting calculation in 10 iterations,
logging the progress after each iteration; this is useful for
understanding the scaling of the code
**config : key/value pairs
additional keywords to pass to the :mod:`Corrfunc` function
"""
logger = logging.getLogger('SimulationBoxPairCount')
def __init__(self, mode, first, redges, BoxSize=None, periodic=True,
second=None, Nmu=5, los='z', weight='Weight',
show_progress=True, **config):
# check input 'mode'
assert mode in ['1d', '2d'], "PairCount mode must be '1d' or '2d'"
# check rmin
if numpy.min(redges) <= 0.:
raise ValueError(("the lower edge of the 1st separation bin must "
"greater than zero (no self-pairs)"))
# check input 'los'
if isinstance(los, string_types):
assert los in 'xyz', "``los`` should be one of 'x', 'y', 'z'"
los = 'xyz'.index(los)
elif isinstance(los, int):
if los < 0: los += 3
assert los in [0,1,2], "``los`` should be one of 0, 1, 2"
else:
raise ValueError("``los`` should be either ['x', 'y', 'z'] or [0,1,2]")
# verify the input sources
BoxSize = verify_input_sources(first, second, BoxSize, ['Position', weight])
# init the base class
PairCountBase.__init__(self, first, second, show_progress)
# save the meta-data
self.attrs['mode'] = mode
self.attrs['redges'] = redges
self.attrs['Nmu'] = Nmu
self.attrs['BoxSize'] = BoxSize
self.attrs['periodic'] = periodic
self.attrs['weight'] = weight
self.attrs['config'] = config
self.attrs['los'] = los
# test Rmax for PBC
if periodic and numpy.amax(redges) > 0.5*self.attrs['BoxSize'].min():
raise ValueError(("periodic pair counts cannot be computed for "
"Rmax > 0.5 * BoxSize"))
# run the algorithm
self.run()
[docs] def run(self):
"""
Calculate the 3D pair-counts in a simulation box as a function
of separation ``r`` or separation and angle to line-of-sight
(``r``, ``mu``). This adds the following attributes to the class:
- :attr:`SimulationBoxPairCount.result`
Attributes
----------
result : :class:`~nbodykit.binned_statistic.BinnedStatistic`
a BinnedStatistic object holding the pair count and correlation
function results. The coordinate grid is either ``r`` or
``r`` and ``mu``. It stores the following variables:
- ``r``: the mean separation value in the bin
- ``xi``: the mean correlation function value in the bin, computed as
:math:`DD/RR - 1`, where :math:`RR` is the number of random pairs
in the bin
- ``npairs``: the number of pairs in the bin
- ``weightavg``: the average weight value in the bin; each pair
contributes the product of the individual weight values
"""
try:
from Corrfunc.theory import DD, DDsmu
except ImportError:
raise MissingCorrfuncError()
# some setup
redges = self.attrs['redges']
boxsize = self.attrs['BoxSize']
L = None
# determine axes perp and paralell to
los = self.attrs['los']
perp_axes = [0,1,2]
perp_axes.remove(los)
# cubic box required by Corrunc
if self.attrs['periodic']:
if numpy.all(boxsize == boxsize[0]):
L = boxsize[0]
else:
raise ValueError(("``Corrfunc`` does not currently support "
"periodic wrapping with non-cubic boxes"))
# domain decompose the data
(pos1, w1), (pos2, w2) = self._decompose()
# total sizes for first and second
N1 = self.comm.allreduce(len(pos1))
N2 = self.comm.allreduce(len(pos2))
# initialize the keywords to pass to Corrfunc
kws = {}
kws['autocorr'] = 0
kws['nthreads'] = 1
kws['binfile'] = redges
kws['X2'] = pos2[:,perp_axes[0]]
kws['Y2'] = pos2[:,perp_axes[1]]
kws['Z2'] = pos2[:,los]
kws['weights2'] = w2.astype(pos2.dtype)
kws['weight_type'] = 'pair_product'
kws['periodic'] = self.attrs['periodic']
if L is not None: kws['boxsize'] = L
# 1D calculation
if self.attrs['mode'] == '1d':
kws['output_ravg'] = True
func = DD
rcol = 'ravg'
# 2D calculation
else:
kws['output_savg'] = True
kws['mu_max'] = 1.0
kws['nmu_bins'] = self.attrs['Nmu']
func = DDsmu
rcol = 'savg'
# update with user config
kws.update(self.attrs['config'])
def callback(kws, chunk):
kws['X1'] = pos1[chunk][:,perp_axes[0]]
kws['Y1'] = pos1[chunk][:,perp_axes[1]]
kws['Z1'] = pos1[chunk][:,los] # LOS defined with respect to this axis
kws['weights1'] = w1[chunk].astype(pos1.dtype)
# run
sizes = self.comm.allgather(len(pos1))
pc = self._run(func, kws, sizes, callback=callback)
# make 2D results shape of (Nr, Nmu)
if self.attrs['mode'] == '2d':
pc = pc.reshape((-1, self.attrs['Nmu']))
# sum results over all ranks
pc = self.comm.allreduce(pc)
# the output data
dtype = numpy.dtype([('r', 'f8'), ('xi', 'f8'), ('npairs', 'u8'), ('weightavg', 'f8')])
data = numpy.empty(pc.shape, dtype=dtype)
# copy over main results
data['r'] = pc[rcol]
data['npairs'] = pc['npairs']
data['weightavg'] = pc['weightavg']
# compute the random pairs from the fractional volume
RR = 1.*N1*N2 / boxsize.prod()
if self.attrs['mode'] == '2d':
dr3 = numpy.diff(redges**3)
muedges = numpy.linspace(0, 1, self.attrs['Nmu']+1)
dmu = numpy.diff(muedges)
RR *= 2. / 3. * numpy.pi * dr3[:,None] * dmu[None,:]
else:
RR *= 4. / 3. * numpy.pi * numpy.diff(redges**3)
# add in the correlation function value
data['xi'] = (1. * data['npairs'] / RR) - 1.0
# make the BinnedStatistic
if self.attrs['mode'] == '1d':
self.result = BinnedStatistic(['r'], [redges], data, fields_to_sum=['npairs'])
else:
self.result = BinnedStatistic(['r','mu'], [redges,muedges], data, fields_to_sum=['npairs'])