Source code for nbodykit.source.catalog.uniform

from nbodykit.base.catalog import CatalogSource, column
from nbodykit import CurrentMPIComm

from numpy.random import RandomState
import numpy
import functools
import contextlib

N_PER_SEED = 100000

def _mpi_enabled_rng(func):
    A decorator that handles generating random numbers in parallel
    in a manner that is independent of the number of ranks

    Designed to be used with :class:`MPIRandomState`
    def func_wrapper(*args, **kwargs):

        self = func.__self__
        size = kwargs.get('size', None)
        if size is not None and isinstance(size, int):
            size = (size,)

        # do nothing if size not provided or wrong
        if size is None or size[0] != self.size:
            return func(*args, **kwargs)

        # size matches the "size" attribute
            toret = []

            # loop through chunks that this rank is responsible for
            for chunk, (start, stop) in zip(self._chunks, self._slices):
                with self.seeded_context(self._seeds[chunk]):

                    kwargs['size'] = (N_PER_SEED,) + size[1:]
                    if chunk == self.N // N_PER_SEED:
                        kwargs['size'] = (self.N % N_PER_SEED,) + size[1:]
                    toret.append(func(*args, **kwargs)[start:stop])
            return numpy.concatenate(toret, axis=0)

    func_wrapper.mpi_enabled = True
    return func_wrapper

[docs]class MPIRandomState(RandomState): """ A wrapper around :class:`numpy.random.RandomState` that can return random numbers in parallel, independent of the number of ranks. Parameters ---------- comm : MPI communicator the MPI communicator seed : int the global seed that seeds all other random seeds N : int the total size of the random numbers to generate; we return chunks of the total on each CPU, based on the CPU's rank """ def __init__(self, comm, seed, N): RandomState.__init__(self, seed=seed) # the number of seeds to generate N particles n_seeds = N // N_PER_SEED if N % N_PER_SEED: n_seeds += 1 self.comm = comm self.global_seed = seed self.N = N start = N * comm.rank // comm.size stop = N * (comm.rank + 1) // comm.size self.size = stop - start # generate the full set of seeds from the global seed rng = numpy.random.RandomState(seed=seed) self._seeds = rng.randint(0, high=0xffffffff, size=n_seeds) # sizes of each chunk sizes = [N_PER_SEED]*(N//N_PER_SEED) if N % N_PER_SEED: sizes.append(N % N_PER_SEED) # the local chunks this rank is responsible for cumsizes = numpy.insert(numpy.cumsum(sizes), 0, 0) chunk_range = numpy.searchsorted(cumsizes[1:], [start, stop]) self._chunks = list(range(chunk_range[0], chunk_range[1]+1)) # and the slices for each local chunk self._slices = [] for chunk in self._chunks: start_size = cumsizes[chunk] sl = (max(start-start_size, 0), min(stop-start_size, sizes[chunk])) self._slices.append(sl)
[docs] def __dir__(self): """ Explicitly set the attributes as those of the RandomState class too """ d1 = set(RandomState().__dir__()) d2 = set(RandomState.__dir__(self)) return list(d1|d2)
[docs] def __getattribute__(self, name): """ Decorate callable functions of RandomState such that they return chunks of the total `N` random numbers generated """ attr = RandomState.__getattribute__(self, name) if callable(attr) and not getattr(attr, 'mpi_enabled', False): attr = _mpi_enabled_rng(attr) return attr
[docs] @contextlib.contextmanager def seeded_context(self, seed): """ A context manager to set and then restore the random seed """ startstate = self.get_state() self.seed(seed) yield self.set_state(startstate)
[docs]class RandomCatalog(CatalogSource): """ A CatalogSource that can have columns added via a collective random number generator. The random number generator stored as :attr:`rng` behaves as :class:`numpy.random.RandomState` but generates random numbers only on the local rank in a manner independent of the number of ranks. Parameters ---------- csize : int the desired collective size of the Source seed : int, optional the global seed for the random number generator comm : MPI communicator the MPI communicator; set automatically if None """ def __repr__(self): args = (self.size, self.attrs['seed']) return "RandomCatalog(size=%d, seed=%s)" % args @CurrentMPIComm.enable def __init__(self, csize, seed=None, comm=None, use_cache=False): self.comm = comm # set the seed randomly if it is None if seed is None: if self.comm.rank == 0: seed = numpy.random.randint(0, 4294967295) seed = self.comm.bcast(seed) self.attrs['seed'] = seed # generate the seeds from the global seed if csize == 0: raise ValueError("no random particles generated!") self._rng = MPIRandomState(comm, seed, csize) self._size = self.rng.size # init the base class CatalogSource.__init__(self, comm=comm, use_cache=use_cache) @property def size(self): """The number of particles on the local rank""" return self._size @property def rng(self): """ A :class:`MPIRandomState` that behaves as :class:`numpy.random.RandomState` but generates random numbers in a manner independent of the number of ranks. """ return self._rng
[docs]class UniformCatalog(RandomCatalog): """ A CatalogSource that has uniformly-distributed ``Position`` and ``Velocity`` columns. The random numbers generated do not depend on the number of available ranks. Parameters ---------- nbar : float the desired number density of particles in the box BoxSize : float, 3-vector the size of the box seed : int, optional the random seed comm : the MPI communicator use_cache : bool, optional whether to cache data on disk """ def __repr__(self): args = (self.size, self.attrs['seed']) return "UniformCatalog(size=%d, seed=%s)" % args @CurrentMPIComm.enable def __init__(self, nbar, BoxSize, seed=None, comm=None, use_cache=False): self.comm = comm _BoxSize = numpy.empty(3, dtype='f8') _BoxSize[:] = BoxSize self.attrs['BoxSize'] = _BoxSize rng = numpy.random.RandomState(seed) N = rng.poisson(nbar *['BoxSize'])) if N == 0: raise ValueError("no uniform particles generated, try increasing `nbar` parameter") RandomCatalog.__init__(self, N, seed=seed, comm=comm, use_cache=use_cache) self._pos = self.rng.uniform(size=(self._size, 3)) * self.attrs['BoxSize'] self._vel = self.rng.uniform(size=(self._size, 3)) * self.attrs['BoxSize'] * 0.01
[docs] @column def Position(self): """ The position of particles, uniformly distributed in :attr:`BoxSize` """ return self.make_column(self._pos)
[docs] @column def Velocity(self): """ The velocity of particles, uniformly distributed in ``0.01 x BoxSize`` """ return self.make_column(self._vel)