from pmesh.domain import GridND
from nbodykit.utils import split_size_3d
import numpy
[docs]def log_decomposition(comm, logger, N1, N2, pos1, pos2):
"""
Log the distribution of particles to correlate across ranks that
resulted after doing the domain decomposition.
Parameters
----------
comm :
the MPI communicator
logger :
the current logger being used
N1 : int
the total number of objects in the first source being correlated
N2 : int
the total number of objects in the second source being correlated
pos1 : array_like
the domain-decomposed data of the first source
pos2 : array_like
the domain-decomposed data of the second source
"""
# global counts
if comm.rank == 0:
logger.info('correlating %d x %d objects in total' %(N1, N2))
# sizes for all ranks
sizes1 = comm.gather(len(pos1), root=0)
sizes2 = comm.gather(len(pos2), root=0)
# rank 0 logs
if comm.rank == 0:
args = (numpy.median(sizes1), numpy.median(sizes2))
logger.info("correlating A x B = %d x %d objects (median) per rank" % args)
global_min = numpy.min(sizes1)
logger.info("min A load per rank = %d" % global_min)
global_max = numpy.max(sizes1)
logger.info("max A load per rank = %d" % global_max)
args = (N1//comm.size, N2)
logger.info("(even distribution would result in %d x %d)" % args)
[docs]def decompose_box_data(first, second, attrs, logger, smoothing):
"""
Perform a domain decomposition on simulation box data, returning the
domain-demposed position and weight arrays for each object in the
correlating pair.
No load balancing is required since the particles in are assumed to
be in a box.
The implementation follows:
1. Decompose the first source such that the objects are spatially
tight on a given rank.
2. Decompose the second source, ensuring a given rank holds all
particles within the desired maximum separation.
Parameters
----------
first : CatalogSource
the first source we are correlating
second : CatalogSource
the second source we are correlating
attrs : dict
dict of parameters from the pair counting algorithm
logger :
the current active logger
smoothing :
the maximum Cartesian separation implied by the user's binning
Returns
-------
(pos1, w1), (pos2, w2) : array_like
the (decomposed) set of positions and weights to correlate
"""
comm = first.comm
# determine processor division for domain decomposition
np = split_size_3d(comm.size)
if comm.rank == 0:
logger.info("using cpu grid decomposition: %s" %str(np))
# get the (periodic-enforced) position for first
pos1 = first[attrs['position']]
if attrs['periodic']:
pos1 %= attrs['BoxSize']
pos1, w1 = first.compute(pos1, first[attrs['weight']])
N1 = comm.allreduce(len(pos1))
# get the (periodic-enforced) position for second
if second is not None:
pos2 = second[attrs['position']]
if attrs['periodic']:
pos2 %= attrs['BoxSize']
pos2, w2 = second.compute(pos2, second[attrs['weight']])
N2 = comm.allreduce(len(pos2))
else:
pos2 = pos1
w2 = w1
N2 = N1
# domain decomposition
grid = [
numpy.linspace(0, attrs['BoxSize'][0], np[0] + 1, endpoint=True),
numpy.linspace(0, attrs['BoxSize'][1], np[1] + 1, endpoint=True),
numpy.linspace(0, attrs['BoxSize'][2], np[2] + 1, endpoint=True),
]
domain = GridND(grid, comm=comm)
# exchange first particles
layout = domain.decompose(pos1, smoothing=0)
pos1 = layout.exchange(pos1)
w1 = layout.exchange(w1)
# exchange second particles
if smoothing > attrs['BoxSize'].max() * 0.25:
pos2 = numpy.concatenate(comm.allgather(pos2), axis=0)
w2 = numpy.concatenate(comm.allgather(w2), axis=0)
else:
layout = domain.decompose(pos2, smoothing=smoothing)
pos2 = layout.exchange(pos2)
w2 = layout.exchange(w2)
# log the decomposition breakdown
log_decomposition(comm, logger, N1, N2, pos1, pos2)
return (pos1, w1), (pos2, w2)
[docs]def decompose_survey_data(first, second, attrs, logger, smoothing, domain_factor=2,
angular=False, return_cartesian=False):
"""
Perform a domain decomposition on survey data, returning the
domain-demposed position and weight arrays for each object in the
correlating pair.
The domain decomposition is based on the Cartesian coordinates of
the input data (assumed to be in sky coordinates).
Load balancing is required since the distribution in Cartesian space
will likely not be uniform.
The implementation follows:
1. Decompose the first source and balance the particle load, such that
the first source is evenly distributed across all ranks and the
objects are spatially tight on a given rank.
2. Decompose the second source, ensuring a given rank holds all
particles within the desired maximum separation.
Parameters
----------
first : CatalogSource
the first source we are correlating
second : CatalogSource
the second source we are correlating
attrs : dict
dict of parameters from the pair counting algorithm
logger :
the current active logger
smoothing :
the maximum Cartesian separation implied by the user's binning
domain_factor : int, optional
the factor by which we over-sample the mesh with cells in a given
direction; higher values can lead to better performance
angular : bool, optional
if ``True``, the Cartesian positions used in the domain
decomposition are on the unit sphere
return_cartesian : bool, optional
whether to return the pos as (ra, dec, z), or the Cartesian (x, y, z)
Returns
-------
(pos1, w1), (pos2, w2) : array_like
the (decomposed) set of positions and weights to correlate
"""
from nbodykit.transform import StackColumns
comm = first.comm
# either (ra,dec) or (ra,dec,redshift)
poscols = [attrs['ra'], attrs['dec']]
if not angular: poscols += [attrs['redshift']]
# determine processor division for domain decomposition
np = split_size_3d(comm.size)
if comm.rank == 0:
logger.info("using cpu grid decomposition: %s" %str(np))
# stack position and compute
pos1 = StackColumns(*[first[col] for col in poscols])
pos1, w1 = first.compute(pos1, first[attrs['weight']])
N1 = comm.allreduce(len(pos1))
# only need cosmo if not angular
cosmo = attrs.get('cosmo', None) if not angular else None
if not angular and cosmo is None:
raise ValueError("need a cosmology to decompose non-angular survey data")
cpos1, cpos1_min, cpos1_max, rdist1 = get_cartesian(comm, pos1, cosmo=cosmo)
# pass in comoving dist to Corrfunc instead of redshift
if not angular:
pos1 = pos1.copy() # we need to overwrite it; dask doesn't always return a copy after 0.18.1
pos1[:,2] = rdist1
# set up position for second too
if second is not None:
# stack position and compute for "second"
pos2 = StackColumns(*[second[col] for col in poscols])
pos2, w2 = second.compute(pos2, second[attrs['weight']])
N2 = comm.allreduce(len(pos2))
# get comoving dist and boxsize
cpos2, cpos2_min, cpos2_max, rdist2 = get_cartesian(comm, pos2, cosmo=cosmo)
# pass in comoving distance instead of redshift
if not angular:
pos2 = pos2.copy() # we need to overwrite it; dask doesn't always return a copy after 0.18.1
pos2[:,2] = rdist2
else:
pos2 = pos1
w2 = w1
N2 = N1
cpos2_min = cpos1_min
cpos2_max = cpos1_max
cpos2 = cpos1
# determine global boxsize
if second is None:
cpos_min = cpos1_min
cpos_max = cpos1_max
else:
cpos_min = numpy.min(numpy.vstack([cpos1_min, cpos2_min]), axis=0)
cpos_max = numpy.max(numpy.vstack([cpos1_max, cpos2_max]), axis=0)
boxsize = cpos_max - cpos_min
if comm.rank == 0:
logger.info("position variable range on rank 0 (max, min) = %s, %s" % (cpos_max, cpos_min))
# initialize the domain
# NOTE: over-decompose by factor of 2 to trigger load balancing
grid = [
numpy.linspace(cpos_min[0], cpos_max[0], domain_factor*np[0] + 1, endpoint=True),
numpy.linspace(cpos_min[1], cpos_max[1], domain_factor*np[1] + 1, endpoint=True),
numpy.linspace(cpos_min[2], cpos_max[2], domain_factor*np[2] + 1, endpoint=True),
]
domain = GridND(grid, comm=comm, periodic=False)
# balance the load
domain.loadbalance(domain.load(cpos1))
if comm.rank == 0:
logger.info("Load balance done")
# if we want to return cartesian, redefine pos
if return_cartesian:
pos1 = cpos1
pos2 = cpos2
# decompose based on cartesian positions
layout = domain.decompose(cpos1, smoothing=0)
pos1 = layout.exchange(pos1)
w1 = layout.exchange(w1)
# get the position/weight of the secondaries
if smoothing > boxsize.max() * 0.25:
pos2 = numpy.concatenate(comm.allgather(pos2), axis=0)
w2 = numpy.concatenate(comm.allgather(w2), axis=0)
else:
layout = domain.decompose(cpos2, smoothing=smoothing)
pos2 = layout.exchange(pos2)
w2 = layout.exchange(w2)
# log the decomposition breakdown
log_decomposition(comm, logger, N1, N2, pos1, pos2)
return (pos1, w1), (pos2, w2)
[docs]def get_cartesian(comm, pos, cosmo=None):
"""
Utility function to convert sky coordinates to Cartesian coordinates and
return the implied box size from the position bounds.
If ``cosmo`` is not provided, return coordinates on the unit sphere.
"""
from nbodykit.utils import get_data_bounds
# get RA,DEC in degrees
ra, dec = numpy.deg2rad(pos[:,0]), numpy.deg2rad(pos[:,1])
# cartesian position
x = numpy.cos( dec ) * numpy.cos( ra )
y = numpy.cos( dec ) * numpy.sin( ra )
z = numpy.sin( dec )
cpos = numpy.vstack([x,y,z]).T
# multiply by comoving distance?
if cosmo is not None:
assert pos.shape[-1] == 3
rdist = cosmo.comoving_distance(pos[:,2]) # in Mpc/h
cpos = rdist[:,None] * cpos
else:
rdist = None
# min/max of position
cpos_min, cpos_max = get_data_bounds(cpos, comm)
boxsize = abs(cpos_max - cpos_min)
# some padding to avoid weird effects with domain decomposition
# like sitting on an edge and goes out of bound due to round off errors.
return cpos, cpos_min - 1e-3 * boxsize, cpos_max + 1e-3 * boxsize, rdist