Source code for nbodykit.utils

import numpy
from mpi4py import MPI
import warnings
import functools
import contextlib
import os, sys

[docs]def is_structured_array(arr): """ Test if the input array is a structured array by testing for `dtype.names` """ if not isinstance(arr, numpy.ndarray) or not hasattr(arr, 'dtype'): return False return arr.dtype.char == 'V'
[docs]def get_data_bounds(data, comm, selection=None): """ Return the global minimum/maximum of a numpy/dask array along the first axis. This is computed in chunks to avoid memory errors on large data. Parameters ---------- data : numpy.ndarray or dask.array.Array the data to find the bounds of comm : the MPI communicator Returns ------- min, max : the min/max of ``data`` """ import dask.array as da # local min/max on this rank dmin = numpy.ones(data.shape[1:]) * (numpy.inf) dmax = numpy.ones_like(dmin) * (-numpy.inf) # max size Nlocalmax = max(comm.allgather(len(data))) # compute in chunks to avoid memory error chunksize = 1024**2 * 8 for i in range(0, Nlocalmax, chunksize): s = slice(i, i + chunksize) if len(data) != 0: # selection has to be computed many times when data is `large`. if selection is not None: sel = selection[s] if isinstance(selection, da.Array): sel = sel.compute() # be sure to use the source to compute d = data[s] if isinstance(data, da.Array): d = d.compute() # select if selection is not None: d = d[sel] # update min/max on this rank dmin = numpy.min([d.min(axis=0), dmin], axis=0) dmax = numpy.max([d.max(axis=0), dmax], axis=0) # global min/max across all ranks dmin = numpy.asarray(comm.allgather(dmin)).min(axis=0) dmax = numpy.asarray(comm.allgather(dmax)).max(axis=0) return dmin, dmax
[docs]def split_size_3d(s): """ Split `s` into three integers, a, b, c, such that a * b * c == s and a <= b <= c Parameters ----------- s : int integer to split Returns ------- a, b, c: int integers such that a * b * c == s and a <= b <= c """ a = int(s** 0.3333333) + 1 d = s while a > 1: if s % a == 0: s = s // a break a = a - 1 b = int(s ** 0.5) + 1 while b > 1: if s % b == 0: s = s // b break b = b - 1 c = s return a, b, c
[docs]def deprecate(name, alternative, alt_name=None): """ This is a decorator which can be used to mark functions as deprecated. It will result in a warning being emmitted when the function is used. """ alt_name = alt_name or alternative.__name__ def wrapper(*args, **kwargs): warnings.warn("%s is deprecated. Use %s instead" % (name, alt_name), FutureWarning, stacklevel=2) return alternative(*args, **kwargs) return wrapper
[docs]def GatherArray(data, comm, root=0): """ Gather the input data array from all ranks to the specified ``root``. This uses `Gatherv`, which avoids mpi4py pickling, and also avoids the 2 GB mpi4py limit for bytes using a custom datatype Parameters ---------- data : array_like the data on each rank to gather comm : MPI communicator the MPI communicator root : int, or Ellipsis the rank number to gather the data to. If root is Ellipsis, broadcast the result to all ranks. Returns ------- recvbuffer : array_like, None the gathered data on root, and `None` otherwise """ if not isinstance(data, numpy.ndarray): raise ValueError("`data` must by numpy array in GatherArray") # need C-contiguous order if not data.flags['C_CONTIGUOUS']: data = numpy.ascontiguousarray(data) local_length = data.shape[0] # check dtypes and shapes shapes = comm.allgather(data.shape) dtypes = comm.allgather(data.dtype) # check for structured data if dtypes[0].char == 'V': # check for structured data mismatch names = set(dtypes[0].names) if any(set(dt.names) != names for dt in dtypes[1:]): raise ValueError("mismatch between data type fields in structured data") # check for 'O' data types if any(dtypes[0][name] == 'O' for name in dtypes[0].names): raise ValueError("object data types ('O') not allowed in structured data in GatherArray") # compute the new shape for each rank newlength = comm.allreduce(local_length) newshape = list(data.shape) newshape[0] = newlength # the return array if root is Ellipsis or comm.rank == root: recvbuffer = numpy.empty(newshape, dtype=dtypes[0], order='C') else: recvbuffer = None for name in dtypes[0].names: d = GatherArray(data[name], comm, root=root) if root is Ellipsis or comm.rank == root: recvbuffer[name] = d return recvbuffer # check for 'O' data types if dtypes[0] == 'O': raise ValueError("object data types ('O') not allowed in structured data in GatherArray") # check for bad dtypes and bad shapes if root is Ellipsis or comm.rank == root: bad_shape = any(s[1:] != shapes[0][1:] for s in shapes[1:]) bad_dtype = any(dt != dtypes[0] for dt in dtypes[1:]) else: bad_shape = None; bad_dtype = None bad_shape, bad_dtype = comm.bcast((bad_shape, bad_dtype)) if bad_shape: raise ValueError("mismatch between shape[1:] across ranks in GatherArray") if bad_dtype: raise ValueError("mismatch between dtypes across ranks in GatherArray") shape = data.shape dtype = data.dtype # setup the custom dtype duplicity = numpy.product(numpy.array(shape[1:], 'intp')) itemsize = duplicity * dtype.itemsize dt = MPI.BYTE.Create_contiguous(itemsize) dt.Commit() # compute the new shape for each rank newlength = comm.allreduce(local_length) newshape = list(shape) newshape[0] = newlength # the return array if root is Ellipsis or comm.rank == root: recvbuffer = numpy.empty(newshape, dtype=dtype, order='C') else: recvbuffer = None # the recv counts counts = comm.allgather(local_length) counts = numpy.array(counts, order='C') # the recv offsets offsets = numpy.zeros_like(counts, order='C') offsets[1:] = counts.cumsum()[:-1] # gather to root if root is Ellipsis: comm.Allgatherv([data, dt], [recvbuffer, (counts, offsets), dt]) else: comm.Gatherv([data, dt], [recvbuffer, (counts, offsets), dt], root=root) dt.Free() return recvbuffer
[docs]def ScatterArray(data, comm, root=0, counts=None): """ Scatter the input data array across all ranks, assuming `data` is initially only on `root` (and `None` on other ranks). This uses ``Scatterv``, which avoids mpi4py pickling, and also avoids the 2 GB mpi4py limit for bytes using a custom datatype Parameters ---------- data : array_like or None on `root`, this gives the data to split and scatter comm : MPI communicator the MPI communicator root : int the rank number that initially has the data counts : list of int list of the lengths of data to send to each rank Returns ------- recvbuffer : array_like the chunk of `data` that each rank gets """ import logging if counts is not None: counts = numpy.asarray(counts, order='C') if len(counts) != comm.size: raise ValueError("counts array has wrong length!") # check for bad input if comm.rank == root: bad_input = not isinstance(data, numpy.ndarray) else: bad_input = None bad_input = comm.bcast(bad_input) if bad_input: raise ValueError("`data` must by numpy array on root in ScatterArray") if comm.rank == 0: # need C-contiguous order if not data.flags['C_CONTIGUOUS']: data = numpy.ascontiguousarray(data) shape_and_dtype = (data.shape, data.dtype) else: shape_and_dtype = None # each rank needs shape/dtype of input data shape, dtype = comm.bcast(shape_and_dtype) # object dtype is not supported fail = False if dtype.char == 'V': fail = any(dtype[name] == 'O' for name in dtype.names) else: fail = dtype == 'O' if fail: raise ValueError("'object' data type not supported in ScatterArray; please specify specific data type") # initialize empty data on non-root ranks if comm.rank != root: np_dtype = numpy.dtype((dtype, shape[1:])) data = numpy.empty(0, dtype=np_dtype) # setup the custom dtype duplicity = numpy.product(numpy.array(shape[1:], 'intp')) itemsize = duplicity * dtype.itemsize dt = MPI.BYTE.Create_contiguous(itemsize) dt.Commit() # compute the new shape for each rank newshape = list(shape) if counts is None: newlength = shape[0] // comm.size if comm.rank < shape[0] % comm.size: newlength += 1 newshape[0] = newlength else: if counts.sum() != shape[0]: raise ValueError("the sum of the `counts` array needs to be equal to data length") newshape[0] = counts[comm.rank] # the return array recvbuffer = numpy.empty(newshape, dtype=dtype, order='C') # the send counts, if not provided if counts is None: counts = comm.allgather(newlength) counts = numpy.array(counts, order='C') # the send offsets offsets = numpy.zeros_like(counts, order='C') offsets[1:] = counts.cumsum()[:-1] # do the scatter comm.Barrier() comm.Scatterv([data, (counts, offsets), dt], [recvbuffer, dt]) dt.Free() return recvbuffer
[docs]def FrontPadArray(array, front, comm): """ Padding an array in the front with items before this rank. """ N = numpy.array(comm.allgather(len(array)), dtype='intp') offsets = numpy.cumsum(numpy.concatenate([[0], N], axis=0)) mystart = offsets[comm.rank] - front torecv = (offsets[:-1] + N) - mystart torecv[torecv < 0] = 0 # before mystart torecv[torecv > front] = 0 # no more than needed torecv[torecv > N] = N[torecv > N] # fully enclosed if comm.allreduce(torecv.sum() != front, MPI.LOR): raise ValueError("cannot work out a plan to padd items. Some front values are too large. %d %d" % (torecv.sum(), front)) tosend = comm.alltoall(torecv) sendbuf = [ array[-items:] if items > 0 else array[0:0] for i, items in enumerate(tosend)] recvbuf = comm.alltoall(sendbuf) return numpy.concatenate(list(recvbuf) + [array], axis=0)
def attrs_to_dict(obj, prefix): if not hasattr(obj, 'attrs'): return {} d = {} for key, value in obj.attrs.items(): d[prefix + key] = value return d import json from astropy.units import Quantity, Unit from nbodykit.cosmology import Cosmology
[docs]class JSONEncoder(json.JSONEncoder): """ A subclass of :class:`json.JSONEncoder` that can also handle numpy arrays, complex values, and :class:`astropy.units.Quantity` objects. """
[docs] def default(self, obj): # Cosmology object if isinstance(obj, Cosmology): d = {} d['__cosmo__'] = obj.pars.copy() return d # astropy quantity if isinstance(obj, Quantity): d = {} d['__unit__'] = str(obj.unit) value = obj.value if obj.size > 1: d['__dtype__'] = value.dtype.str if value.dtype.names is None else value.dtype.descr d['__shape__'] = value.shape value = value.tolist() d['__data__'] = value return d # complex values elif isinstance(obj, complex): return {'__complex__': [obj.real, obj.imag ]} # numpy arrays elif isinstance(obj, numpy.ndarray): value = obj dtype = obj.dtype d = { '__dtype__' : dtype.str if dtype.names is None else dtype.descr, '__shape__' : value.shape, '__data__': value.tolist(), } return d # explicity convert numpy data types to python types # see: https://bugs.python.org/issue24313 elif isinstance(obj, numpy.floating): return float(obj) elif isinstance(obj, numpy.integer): return int(obj) return json.JSONEncoder.default(self, obj)
[docs]class JSONDecoder(json.JSONDecoder): """ A subclass of :class:`json.JSONDecoder` that can also handle numpy arrays, complex values, and :class:`astropy.units.Quantity` objects. """ @staticmethod def hook(value): def fixdtype(dtype): if isinstance(dtype, list): true_dtype = [] for field in dtype: if len(field) == 3: true_dtype.append((str(field[0]), str(field[1]), field[2])) if len(field) == 2: true_dtype.append((str(field[0]), str(field[1]))) return true_dtype return dtype def fixdata(data, N, dtype): if not isinstance(dtype, list): return data # for structured array, # the last dimension shall be a tuple if N > 0: return [fixdata(i, N - 1, dtype) for i in data] else: assert len(data) == len(dtype) return tuple(data) d = None if '__dtype__' in value: dtype = fixdtype(value['__dtype__']) shape = value['__shape__'] a = fixdata(value['__data__'], len(shape), dtype) d = numpy.array(a, dtype=dtype) if '__unit__' in value: if d is None: d = value['__data__'] d = Quantity(d, Unit(value['__unit__'])) if '__cosmo__' in value: d = Cosmology.from_dict(value['__cosmo__']) if d is not None: return d if '__complex__' in value: real, imag = value['__complex__'] return real + 1j * imag return value def __init__(self, *args, **kwargs): kwargs['object_hook'] = JSONDecoder.hook json.JSONDecoder.__init__(self, *args, **kwargs)
[docs]def timer(start, end): """ Utility function to return a string representing the elapsed time, as computed from the input start and end times Parameters ---------- start : int the start time in seconds end : int the end time in seconds Returns ------- str : the elapsed time as a string, using the format `hours:minutes:seconds` """ hours, rem = divmod(end-start, 3600) minutes, seconds = divmod(rem, 60) return "{:0>2}:{:0>2}:{:05.2f}".format(int(hours),int(minutes),seconds)
[docs]@contextlib.contextmanager def captured_output(comm, root=0): """ Re-direct stdout and stderr to null for every rank but ``root`` """ # keep output on root if root is not None and comm.rank == root: yield sys.stdout, sys.stderr else: from six.moves import StringIO from nbodykit.extern.wurlitzer import sys_pipes # redirect stdout and stderr old_stdout, sys.stdout = sys.stdout, StringIO() old_stderr, sys.stderr = sys.stderr, StringIO() try: with sys_pipes() as (out, err): yield out, err finally: sys.stdout = old_stdout sys.stderr = old_stderr
import mpsort
[docs]class DistributedArray(object): """ Distributed Array Object A distributed array is striped along ranks, along first dimension Attributes ---------- comm : :py:class:`mpi4py.MPI.Comm` the communicator local : array_like the local data """ @staticmethod def _find_dtype(dtype, comm): # guess the dtype dtypes = comm.allgather(dtype) dtypes = set([dtype for dtype in dtypes if dtype is not None]) if len(dtypes) > 1: raise TypeError("Type of local array is inconsistent between ranks; got %s" % dtypes) return next(iter(dtypes)) @staticmethod def _find_cshape(shape, comm): # guess the dtype shapes = comm.allgather(shape) shapes = set(shape[1:] for shape in shapes) if len(shapes) > 1: raise TypeError("Shape of local array is inconsistent between ranks; got %s" % shapes) clen = comm.allreduce(shape[0]) cshape = tuple([clen] + list(shape[1:])) return cshape def __init__(self, local, comm): self.comm = comm shape = numpy.array(local, copy=False).shape dtype = numpy.array(local, copy=False).dtype if len(local) else None self.dtype = DistributedArray._find_dtype(dtype, comm) self.cshape = DistributedArray._find_cshape(shape, comm) # directly use the original local array. self.local = local self.topology = LinearTopology(local, comm) self.coffset = sum(comm.allgather(shape[0])[:comm.rank])
[docs] @classmethod def cempty(kls, cshape, dtype, comm): """ Create an empty array collectively """ dtype = DistributedArray._find_dtype(dtype, comm) cshape = tuple(cshape) llen = cshape[0] * (comm.rank + 1) // comm.size - cshape[0] * (comm.rank) // comm.size shape = tuple([llen] + list(cshape[1:])) cshape1 = DistributedArray._find_cshape(shape, comm) if cshape != cshape1: raise ValueError("input cshape is inconsistent %s %s" % (cshape, cshape1)) local = numpy.empty(shape, dtype=dtype) return DistributedArray(local, comm=comm)
[docs] @classmethod def concat(kls, *args, **kwargs): """ Append several distributed arrays into one. Parameters ---------- localsize : None """ localsize = kwargs.pop('localsize', None) comm = args[0].comm localsize_in = sum([len(arg.local) for arg in args]) if localsize is None: localsize = sum([len(arg.local) for arg in args]) eldtype = numpy.result_type(*[arg.local for arg in args]) dtype = [('index', 'intp'), ('el', eldtype)] inp = numpy.empty(localsize_in, dtype=dtype) out = numpy.empty(localsize, dtype=dtype) go = 0 o = 0 for arg in args: inp['index'][o:o + len(arg.local)] = go + arg.coffset + numpy.arange(len(arg.local), dtype='intp') inp['el'][o:o + len(arg.local)] = arg.local o = o + len(arg.local) go = go + arg.cshape[0] mpsort.sort(inp, orderby='index', out=out, comm=comm) return DistributedArray(out['el'].copy(), comm=comm)
[docs] def sort(self, orderby=None): """ Sort array globally by key orderby. Due to a limitation of mpsort, self[orderby] must be u8. """ mpsort.sort(self.local, orderby, comm=self.comm)
def __getitem__(self, key): return DistributedArray(self.local[key], self.comm)
[docs] def unique_labels(self): """ Assign unique labels to sorted local. .. warning :: local data must be globally sorted, and of simple type. (numpy.unique) Returns ------- label : :py:class:`DistributedArray` the new labels, starting from 0 """ prev, next = self.topology.prev(), self.topology.next() junk, label = numpy.unique(self.local, return_inverse=True) if len(label) == 0: # work around numpy bug (<=1.13.3) when label is empty it # spits out booleans?? booleans!! # this causes issues when type cast rules in numpy # are tighten up. label = numpy.int64(label) if len(self.local) == 0: Nunique = 0 else: # watch out: this is to make sure after shifting first # labels on the next rank is the same as my last label # when there is a spill-over. if next == self.local[-1]: Nunique = len(junk) - 1 else: Nunique = len(junk) label += numpy.sum(self.comm.allgather(Nunique)[:self.comm.rank], dtype='intp') return DistributedArray(label, self.comm)
[docs] def bincount(self, weights=None, local=False, shared_edges=True): """ Assign count numbers from sorted local data. .. warning :: local data must be globally sorted, and of integer type. (numpy.bincount) Parameters ---------- weights: array-like if given, count the weight instead of the number of objects. local : boolean if local is True, only count the local array. shared_edges : boolean if True, keep the counts at edges that are shared between ranks on both ranks. if False, keep the counts at shared edges to the rank on the left. Returns ------- N : :py:class:`DistributedArray` distributed counts array. If items of the same value spans other chunks of array, they are added to N as well. Examples -------- if the local array is [ (0, 0), (0, 1)], Then the counts array is [ (3, ), (3, 1)] """ prev = self.topology.prev() if prev is not EmptyRank: offset = prev # two cases: either start counting from the last bin # of prev rank, or from next bin, depending on the # first value of my local data. if len(self.local) > 0: if prev != self.local[0]: offset = prev + 1 else: offset = 0 # locally, we will bincount from offset to whereever we end N = numpy.bincount(self.local - offset, weights) if local: return N heads = self.topology.heads() tails = self.topology.tails() distN = DistributedArray(N, self.comm) headsN, tailsN = distN.topology.heads(), distN.topology.tails() if len(N) > 0: anyshared = False for i in reversed(range(self.comm.rank)): if tails[i] == self.local[0]: N[0] += tailsN[i] anyshared = True for i in range(self.comm.rank + 1, self.comm.size): if heads[i] == self.local[-1]: N[-1] += headsN[i] if not shared_edges: # remove the edge from me, as it s already on the left rank. if anyshared: N = N[1:] return DistributedArray(N, self.comm)
def _get_empty_rank(): return EmptyRank class EmptyRankType(object): def __repr__(self): return "EmptyRank" def __reduce__(self): return (_get_empty_rank, ()) EmptyRank = EmptyRankType()
[docs]class LinearTopology(object): """ Helper object for the topology of a distributed array """ def __init__(self, local, comm): self.local = local self.comm = comm
[docs] def heads(self): """ The first items on each rank. Returns ------- heads : list a list of first items, EmptyRank is used for empty ranks """ head = EmptyRank if len(self.local) > 0: head = self.local[0] return self.comm.allgather(head)
[docs] def tails(self): """ The last items on each rank. Returns ------- tails: list a list of last items, EmptyRank is used for empty ranks """ tail = EmptyRank if len(self.local) > 0: tail = self.local[-1] return self.comm.allgather(tail)
[docs] def prev(self): """ The item before the local data. This method fetches the last item before the local data. If the rank before is empty, the rank before is used. If no item is before this rank, EmptyRank is returned Returns ------- prev : scalar Item before local data, or EmptyRank if all ranks before this rank is empty. """ tails = self.tails() for prev in reversed(tails[:self.comm.rank]): if prev is not EmptyRank: return prev return EmptyRank
[docs] def next(self): """ The item after the local data. This method the first item after the local data. If the rank after current rank is empty, item after that rank is used. If no item is after local data, EmptyRank is returned. Returns ------- next : scalar Item after local data, or EmptyRank if all ranks after this rank is empty. """ heads = self.heads() for next in heads[self.comm.rank + 1:]: if next is not EmptyRank: return next return EmptyRank