Source code for nbodykit.algorithms.pair_counters.corrfunc.base

import numpy
import logging
from nbodykit import CurrentMPIComm
from nbodykit.binned_statistic import BinnedStatistic

[docs]class MissingCorrfuncError(Exception): def __init__(self): msg = "use either ``conda install -c bccp corrfunc`` " msg += "or ``pip install git+git://github.com/nickhand/Corrfunc``" self.args = (msg,)
[docs]class CorrfuncResult(object): """ A class used internally to hold the array-like result of a pair counting algorithm from :mod:`Corrfunc`. This class is useful for reducing pair count results in parallel while accounting for columns that are pair-weighted. Parameters ---------- data : numpy.ndarray the numpy structured array result from :mod:`Corrfunc` """ valid = ['weightavg', 'npairs', 'savg', 'ravg', 'thetaavg', 'rpavg'] def __init__(self, data): # copy over the valid colums from the input result dtype = [(col, data.dtype[col]) for col in data.dtype.names if col in self.valid] self.data = numpy.zeros(data.shape, dtype=dtype) self.columns = self.data.dtype.names for col in self.columns: self.data[col] = data[col] def __getitem__(self, col): return self.data[col] @property def shape(self): return self.data.shape @property def dtype(self): return self.data.dtype def reshape(self, *args, **kwargs): self.data = self.data.reshape(*args, **kwargs) return self def __radd__(self, other): return self + other def __add__(self, other): # all columns are pair-weighted except for "npairs" data = numpy.empty_like(self.data) for col in self.columns: if col == 'npairs': continue data[col] = self[col]*self['npairs'] + other[col]*other['npairs'] # just sum up "npairs" data['npairs'] = self['npairs'] + other['npairs'] # normalize by total "npairs" idx = data['npairs'] > 0. for col in self.columns: if col == 'npairs': continue data[col][idx] /= data['npairs'][idx] return CorrfuncResult(data)
def tonativeendian(arr): arr = arr.astype(arr.dtype.newbyteorder('=')) return arr
[docs]class MPICorrfuncCallable(object): """ A base class to represent an MPI-enabled :mod:`Corrfunc` callable. This class adds the following functionality to the Corrfunc code: - If ``show_progress`` is ``True``, execute the function in chunks, logging to screen the progress along the way. This is useful for potentially long running pair counting jobs. - When calling the function, capture stdout/stderr and C-level output and if an error occurs, raise an exception with all generated output for the user. The relevant subclasses of this class are in :mod:`mocks` and :mod:`theory`. """ binning_dims = None logger = logging.getLogger("MPICorrfuncCallable") def __init__(self, callable, comm, show_progress=True): self.callable = callable self.comm = comm self.show_progress = show_progress
[docs] def __call__(self, loads, kwargs, callback=None): """ Calls :attr:`callable` in iterations, optionally calling ``callback`` before each iteration. This allows the :mod:`Corrfunc` function :attr:`callable` to be called in chunks, giving the user a progress report after each iteration. Parameters ---------- loads : list of int the list of loads for every rank; this corresponds to the number of particles in the in ``A`` if we are correlating ``A`` x ``B`` kwargs : dict the dictionary of arguments to pass to ``func`` callback : callable, optional a callable takings ``kwargs`` as its first argument and a slice object as its second argument; this will be called first during each iteration Returns ------- result : BinnedStatistic the total binned pair counting result """ # the rank with the largest load largest_load = numpy.argmax(loads) # do the pair counting def run(chunk): if callback is not None: callback(kwargs, chunk) return self._run(self.callable, kwargs) # log the function start if self.comm.rank == 0: name = self.callable.__module__ + '.' + self.callable.__name__ self.logger.info("calling function '%s'" % name) # number of iterations N = 10 if self.show_progress else 1 # run in chunks pc = None chunks = numpy.array_split(numpy.arange(loads[self.comm.rank],dtype='intp'), N, axis=0) for i, chunk in enumerate(chunks): this_pc = run(chunk) if self.comm.rank == largest_load and self.show_progress: self.logger.info("%d%% done" % (N*(i+1))) # sum up the results pc = this_pc if pc is None else pc + this_pc # convert flattened 1D results to 2D array if len(self.edges) > 1: pc = pc.reshape((-1, len(self.edges[1])-1)) # reduce the result across all ranks pc = self.comm.allreduce(pc) # the dimension names (use "r" instead of "s") dims = list(self.binning_dims) # make a copy here if 's' in dims: dims[dims.index('s')] = 'r' # make a new structured array dtype = numpy.dtype([(dims[0], 'f8'), ('npairs', 'u8'), ('wnpairs', 'f8')]) data = numpy.zeros(pc.shape, dtype=dtype) # copy over main results data[dims[0]] = pc[self.binning_dims[0]+'avg'] data['npairs'] = pc['npairs'] data['wnpairs'] = pc['weightavg'] * pc['npairs'] # patch up when there is no pair, the weighted value shall be zero as well. data['wnpairs'][pc['npairs'] == 0] = 0 # return the BinnedStatistic return BinnedStatistic(dims, self.edges, data, fields_to_sum=['npairs', 'wnpairs'])
def _run(self, func, kws): """ Internal function to run the wrapped :mod:`Corrfunc` function :attr:`func`, passing in the keywords specified by ``kws``. .. note:: This hides all output from the Corrfunc function (stdout, stderr, and C-level output), unless an exception occurs. """ from nbodykit.utils import captured_output kws = kws.copy() # cast the array items to the native endianness # because corrfunc doesn't understand otherwise. # https://github.com/bccp/nbodykit/issues/467 for key, value in kws.items(): if isinstance(value, numpy.ndarray): kws[key] = tonativeendian(value) try: # record progress capture output for everything but root with captured_output(self.comm, root=None) as (out, err): result = func(**kws) except Exception as e: # get the values Corrfunc logged to stdout and stderr stdout = out.getvalue(); stderr = err.getvalue() # log all of the output in a new exception name = func.__module__ + '.' + func.__name__ msg = "calling the function '%s' failed, " % name msg += "likely due to issues with input data/parameters. " msg += "Open at issue at https://github.com/bccp/nbodykit/issues for further help.\n" msg += "exception: %s\n" % str(e) msg += "stdout: %s\n" % stdout msg += "stderr: %s" % stderr raise RuntimeError(msg) return CorrfuncResult(result)