from nbodykit import CurrentMPIComm
from nbodykit.binned_statistic import BinnedStatistic
import numpy
[docs]class PairCountBase(object):
"""
An abstract base class for pair counting algorithms.
Users should use one of the subclasses of this class.
"""
def __init__(self, mode, edges, first, second, Nmu, pimax, weight, show_progress=False):
# check input 'mode'
valid_modes = ['1d', '2d', 'projected', 'angular']
if mode not in valid_modes:
raise ValueError("allowed 'mode' values are: %s" % valid_modes)
# check min edge
if numpy.min(edges) <= 0.:
raise ValueError(("the lower edge of the 1st separation bin must "
"greater than zero (no self-pairs)"))
# check mode requirements
if mode == '2d' and Nmu is None:
raise ValueError("'Nmu' keyword is required when 'mode' is '2d'")
if Nmu is not None and mode != '2d':
raise ValueError("mode should be '2d' if 'Nmu' is specified")
if mode == 'projected' and pimax is None:
raise ValueError("'pimax' keyword is required when 'mode' is 'projected'")
if pimax is not None and mode != 'projected':
raise ValueError("mode should be 'projected' if 'projected' is specified")
if mode == 'projected' and pimax < 1.0:
raise ValueError("'pimax' must be at least 1.0 when 'mode' is 'projected'")
self.first = first
self.second = second
self.comm = first.comm
# store the meta-data
self.attrs = {}
self.attrs['mode'] = mode
self.attrs['edges'] = edges
self.attrs['Nmu'] = Nmu
self.attrs['pimax'] = pimax
self.attrs['show_progress'] = show_progress
# store the total size of the sources
self.attrs['N1'] = first.csize
self.attrs['N2'] = second.csize if second is not None else None
if second is None or second is first:
wpairs1, wpairs2 = self.comm.allreduce(first.compute(first[weight].sum())), self.comm.allreduce(first.compute((first[weight]**2).sum()))
# for auto excluding self pairs to avoid a biased estimator. The factor 0.5 is by convention.
# In the end it will cancel out in two point function estimators.
self.attrs['total_wnpairs'] = 0.5*(wpairs1**2-wpairs2)
self.attrs['is_cross'] = False
else:
wpairs1, wpairs2 = self.comm.allreduce(first.compute(first[weight].sum())), self.comm.allreduce(second.compute(second[weight].sum()))
self.attrs['total_wnpairs'] = 0.5*wpairs1*wpairs2
self.attrs['is_cross'] = True
def __getstate__(self):
return {'pairs':self.pairs.data, 'attrs':self.attrs}
def __setstate__(self, state):
self.__dict__.update(state)
edges = self.attrs['edges']
# reconstruct the result based on mode
kws = {'fields_to_sum' : ['npairs', 'wnpairs']}
if self.attrs['mode'] == '1d':
dims, edges = ['r'], [edges]
elif self.attrs['mode'] == '2d':
muedges = numpy.linspace(0, 1., self.attrs['Nmu']+1)
dims, edges = ['r', 'mu'], [edges, muedges]
elif self.attrs['mode'] == 'projected':
piedges = numpy.linspace(0, self.attrs['pimax'], int(self.attrs['pimax']+1))
dims, edges = ['rp', 'pi'], [edges, piedges]
elif self.attrs['mode'] == 'angular':
dims, edges = ['theta'], [edges]
else:
valid = ['1d','2d','angular','projected']
args = (self.attrs['mode'], valid)
raise ValueError("mode = '%s' should be one of %s" % args)
# save the result as a BinnedStatistic
self.pairs = BinnedStatistic(dims, edges, self.pairs, **kws)
[docs] def save(self, output):
"""
Save result as a JSON file with name ``output``
"""
import json
from nbodykit.utils import JSONEncoder
# only the master rank writes
if self.comm.rank == 0:
self.logger.info('measurement done; saving result to %s' % output)
with open(output, 'w') as ff:
json.dump(self.__getstate__(), ff, cls=JSONEncoder)
[docs] @classmethod
@CurrentMPIComm.enable
def load(cls, output, comm=None):
"""
Load a result has been saved to disk with :func:`save`.
"""
import json
from nbodykit.utils import JSONDecoder
if comm.rank == 0:
with open(output, 'r') as ff:
state = json.load(ff, cls=JSONDecoder)
else:
state = None
state = comm.bcast(state)
self = object.__new__(cls)
self.__setstate__(state)
self.comm = comm
return self