from __future__ import print_function
import numpy
import logging
from mpi4py import MPI
from nbodykit.source.catalog import ArrayCatalog
from nbodykit.utils import split_size_3d
from nbodykit.utils import DistributedArray
[docs]class FOF(object):
"""
A friends-of-friends halo finder that computes the label for
each particle, denoting which halo it belongs to.
Friends-of-friends was first used by Davis et al 1985 to define
halos in hierachical structure formation of cosmological simulations.
The algorithm is also known as DBSCAN in computer science.
The subroutine here implements a parallel version of the FOF.
The underlying local FOF algorithm is from :mod:`kdcount.cluster`,
which is an adaptation of the implementation in Volker Springel's
Gadget and Martin White's PM.
Results are computed when the object is inititalized. See the documenation
of :func:`~FOF.run` for the attributes storing the results.
For returning a CatalogSource of the FOF halos, see :func:`find_features`
and for computing a halo catalog with added analytic information for
a specific redshift and cosmology, see :func:`to_halos`.
Parameters
----------
source : CatalogSource
the source to run the FOF algorithm on; must support 'Position'
linking_length : float
the linking length, either in absolute units, or relative
to the mean particle separation
nmin : int
halo with fewer particles are ignored
absolute : bool, optional
If `True`, the linking length is in absolute units, otherwise it is
relative to the mean particle separation; default is `False`
"""
logger = logging.getLogger('FOF')
def __init__(self, source, linking_length, nmin, absolute=False, periodic=True, domain_factor=1):
self.comm = source.comm
self._source = source
if 'Position' not in source:
raise ValueError("cannot compute FOF without 'Position' column")
self.attrs = {}
self.attrs['linking_length'] = linking_length
self.attrs['nmin'] = nmin
self.attrs['absolute'] = absolute
self.attrs['periodic'] = periodic
self.attrs['domain_factor'] = domain_factor
if periodic and 'BoxSize' not in source.attrs:
raise ValueError("Periodic FOF requires BoxSize in .attrs['BoxSize']")
# linking length relative to mean separation
if not absolute:
if 'Nmesh' in source.attrs:
ndim = len(source.attrs['Nmesh'])
elif 'Position' in source:
ndim = source['Position'].shape[1]
else:
raise AttributeError('Missing attributes to infer the dimension')
mean_separation = pow(numpy.prod(source.attrs['BoxSize']) / source.csize, 1.0 / ndim)
linking_length *= mean_separation
self._linking_length = linking_length
# and run
self.run()
[docs] def run(self):
"""
Run the FOF algorithm. This function returns nothing, but does
attach several attributes to the class instance:
- attr:`labels`
- :attr:`max_labels`
.. note::
The :attr:`labels` array is scattered evenly across all ranks.
Attributes
----------
labels : array_like, length: :attr:`size`
an array the label that specifies which FOF halo each particle
belongs to
max_label : int
the maximum label across all ranks; this represents the total
number of FOF halos found
"""
# run the FOF
minid = fof(self._source, self._linking_length, self.comm, self.attrs['periodic'], self.attrs['domain_factor'], self.logger)
# the sorted labels
self.labels = _assign_labels(minid, comm=self.comm, thresh=self.attrs['nmin'])
self.max_label = self.comm.allgather(self.labels.max())
[docs] def find_features(self, peakcolumn=None):
"""
Based on the particle labels, identify the groups, and return
the center-of-mass ``CMPosition``, ``CMVelocity``, and Length of each
feature.
If a ``peakcolumn`` is given, the ``PeakPosition`` and ``PeakVelocity``
is also calculated for the particle at the peak value of the column.
Data is scattered evenly across all ranks.
Returns
-------
:class:`~nbodykit.source.catalog.array.ArrayCatalog` :
a source holding the ('CMPosition', 'CMVelocity', 'Length')
of each feature; optionaly, ``PeakPosition``, ``PeakVelocity`` are
also included if ``peakcolumn`` is not None
"""
# the center-of-mass (Position, Velocity, Length)
halos = fof_catalog(self._source, self.labels, self.comm, peakcolumn=peakcolumn, periodic=self.attrs['periodic'])
attrs = self._source.attrs.copy()
attrs.update(self.attrs)
return ArrayCatalog(halos, comm=self.comm, **attrs)
[docs] def to_halos(self, particle_mass, cosmo, redshift, mdef='vir',
posdef='cm', peakcolumn='Density'):
"""
Return a :class:`~nbodykit.source.catalog.halos.HaloCatalog`, holding
the center-of-mass position and velocity of each FOF halo, as well as
the properly scaled mass, for a given cosmology and redshift.
The returned catalog also has default analytic prescriptions for
halo radius and concentration.
The data is scattered evenly across all ranks.
Parameters
----------
particle_mass : float
the particle mass, which is used to convert the number of particles
in each halo to a total mass
cosmo : :class:`nbodykit.cosmology.core.Cosmology`
the cosmology of the catalog
redshift : float
the redshift of the catalog
mdef : str, optional
string specifying mass definition, used for computing default
halo radii and concentration; should be 'vir' or 'XXXc' or
'XXXm' where 'XXX' is an int specifying the overdensity
posdef : str, optional
position, can be cm (center of mass) or peak (particle with maximum value
on a column)
peakcolumn : str , optional
when posdef is 'peak', this is the column in source for identifying
particles at the peak for the position and velocity.
Returns
-------
:class:`~nbodykit.source.catalog.halos.HaloCatalog`
a HaloCatalog at the specified cosmology and redshift
"""
from nbodykit.source.catalog.halos import HaloCatalog
assert posdef in ['cm', 'peak'], "``posdef`` should be 'cm' or 'peak'"
# meta-data
attrs = self._source.attrs.copy()
attrs.update(self.attrs)
attrs['particle_mass'] = particle_mass
if posdef == 'cm':
# using the center-of-mass (Position, Velocity, Length) for each halo
# not needing a column for peaks.
peakcolumn = None
else:
pass
data = fof_catalog(self._source, self.labels, self.comm, peakcolumn=peakcolumn, periodic=self.attrs['periodic'])
data = data[data['Length'] > 0]
halos = ArrayCatalog(data, comm=self.comm, **attrs)
if posdef == 'cm':
halos['Position'] = halos['CMPosition']
halos['Velocity'] = halos['CMVelocity']
elif posdef == 'peak':
halos['Position'] = halos['PeakPosition']
halos['Velocity'] = halos['PeakVelocity']
# add the halo mass column
halos['Mass'] = particle_mass * halos['Length']
coldefs = {'mass':'Mass', 'velocity':'Velocity', 'position':'Position'}
return HaloCatalog(halos, cosmo, redshift, mdef=mdef, **coldefs)
def _assign_labels(minid, comm, thresh):
"""
Convert minid to sequential labels starting from 0.
This routine is used to assign halo label to particles with
the same minid.
Halos with less than thresh particles are reclassified to 0.
Parameters
----------
minid : array_like, ('i8')
The minimum particle id of the halo. All particles of a halo
have the same minid
comm : py:class:`MPI.Comm`
communicator. since this is a collective operation
thresh : int
halo with less than thresh particles are merged into halo 0
Returns
-------
labels : array_like ('i8')
The new labels of particles. Note that this is ordered
by the size of halo, with the exception 0 represents all
particles that are in halos that contain less than thresh particles.
"""
from mpi4py import MPI
dtype = numpy.dtype([
('origind', 'u8'),
('fofid', 'u8'),
])
data = numpy.empty(len(minid), dtype=dtype)
# assign origind for recovery of ordering, since
# we need to work in sorted fofid
data['fofid'] = minid
data['origind'] = numpy.arange(len(data), dtype='u4')
data['origind'] += numpy.sum(comm.allgather(len(data))[:comm.rank], dtype='intp') \
data = DistributedArray(data, comm)
# first attempt is to assign fofid for each group
data.sort('fofid')
label = data['fofid'].unique_labels()
N = label.bincount()
# now eliminate those with less than thresh particles
small = N.local <= thresh
Nlocal = label.bincount(local=True)
# mask == True for particles in small halos
mask = numpy.repeat(small, Nlocal)
# globally shift halo id by one
label.local += 1
label.local[mask] = 0
data['fofid'].local[:] = label.local[:]
del label
data.sort('fofid')
data['fofid'].local[:] = data['fofid'].unique_labels().local[:]
# unique_labels may miss the 0 index representing disconnected
# particles if there are no such particles.
# shift the fofoid by 1 in that case.
anysmall = comm.allreduce(small.sum()) != 0
if not anysmall: data['fofid'].local[:] += 1
data.sort('origind')
label = data['fofid'].local.view('i8').copy()
del data
Nhalo0 = max(comm.allgather(label.max())) + 1
Nlocal = numpy.bincount(label, minlength=Nhalo0)
comm.Allreduce(MPI.IN_PLACE, Nlocal, op=MPI.SUM)
# sort the labels by halo size
arg = Nlocal[1:].argsort()[::-1] + 1
if Nhalo0 > 2**31:
dtype = 'i8'
else:
dtype = 'i4'
P = numpy.arange(Nhalo0, dtype=dtype)
P[arg] = numpy.arange(len(arg), dtype=dtype) + 1
label = P[label]
return label
def _fof_local(layout, pos, boxsize, ll, comm):
from kdcount import cluster
N = len(pos)
pos = layout.exchange(pos)
if boxsize is not None:
pos %= boxsize
data = cluster.dataset(pos, boxsize=boxsize)
fof = cluster.fof(data, linking_length=ll, np=0)
labels = fof.labels
del fof
PID = numpy.arange(N, dtype='intp')
PID += numpy.sum(comm.allgather(N)[:comm.rank], dtype='intp')
PID = layout.exchange(PID)
# initialize global labels
minid = equiv_class(labels, PID, op=numpy.fmin)[labels]
return minid
def _fof_merge(layout, minid, comm):
# generate global halo id
while True:
# merge, if a particle belongs to several ranks
# use the global label of the minimal
minid_new = layout.gather(minid, mode=numpy.fmin)
minid_new = layout.exchange(minid_new)
# on my rank, these particles have been merged
merged = minid_new != minid
# if no rank has merged any, we are done
# gl is the global label (albeit with some holes)
total = comm.allreduce(merged.sum())
if total == 0:
del minid_new
break
old = minid[merged]
new = minid_new[merged]
arg = old.argsort()
new = new[arg]
old = old[arg]
replacesorted(minid, old, new, out=minid)
minid = layout.gather(minid, mode=numpy.fmin)
return minid
[docs]def fof(source, linking_length, comm, periodic, domain_factor, logger):
"""
Run Friends-of-friends halo finder.
Friends-of-friends was first used by Davis et al 1985 to define
halos in hierachical structure formation of cosmological simulations.
The algorithm is also known as DBSCAN in computer science.
The subroutine here implements a parallel version of the FOF.
The underlying local FOF algorithm is from `kdcount.cluster`,
which is an adaptation of the implementation in Volker Springel's
Gadget and Martin White's PM. It could have been done faster.
Parameters
----------
source: CatalogSource
the input source of particles; must support 'Position' column;
``source.attrs['BoxSize']`` is also used
linking_length: float
linking length in data units. (Usually Mpc/h).
comm: MPI.Comm
The mpi communicator.
Returns
-------
minid: array_like
A unique label of each position. The label is not ranged from 0.
"""
from pmesh.domain import GridND
np = split_size_3d(comm.size)
nd = np * domain_factor
if periodic:
BoxSize = source.attrs.get('BoxSize', None)
if BoxSize is None:
raise ValueError("cannot compute FOF clustering of source without 'BoxSize' in ``attrs`` dict")
if numpy.isscalar(BoxSize):
BoxSize = [BoxSize, BoxSize, BoxSize]
left = [0, 0, 0]
right = BoxSize
else:
BoxSize = None
left = numpy.min(comm.allgather(source['Position'].min(axis=0).compute()), axis=0)
right = numpy.max(comm.allgather(source['Position'].max(axis=0).compute()), axis=0)
grid = [
numpy.linspace(left[0], right[0], nd[0] + 1, endpoint=True),
numpy.linspace(left[1], right[1], nd[1] + 1, endpoint=True),
numpy.linspace(left[2], right[2], nd[2] + 1, endpoint=True),
]
domain = GridND(grid, comm=comm, periodic=periodic)
Position = source.compute(source['Position'])
np = comm.allgather(len(Position))
if comm.rank == 0:
logger.info("Number of particles max/min = %d / %d before spatial decomposition" % (max(np), min(np)))
# balance the load
domain.loadbalance(domain.load(Position))
layout = domain.decompose(Position, smoothing=linking_length * 1)
np = comm.allgather(layout.recvlength)
if comm.rank == 0:
logger.info("Number of particles max/min = %d / %d after spatial decomposition" % (max(np), min(np)))
comm.barrier()
minid = _fof_local(layout, Position, BoxSize, linking_length, comm)
comm.barrier()
minid = _fof_merge(layout, minid, comm)
return minid
[docs]def fof_find_peaks(source, label, comm,
position='Position', column='Density'):
"""
Find position of the peak (maximum) from a given column for a fof result.
"""
Nhalo0 = max(comm.allgather(label.max())) + 1
N = numpy.bincount(label, minlength=Nhalo0)
comm.Allreduce(MPI.IN_PLACE, N, op=MPI.SUM)
return hpos
[docs]def fof_catalog(source, label, comm,
position='Position', velocity='Velocity', initposition='InitialPosition',
peakcolumn=None, periodic=True):
"""
Catalog of FOF groups based on label from a parent source
This is a collective operation -- the returned halo catalog will be
equally distributed across all ranks
Notes
-----
This computes the center-of-mass position and velocity in the same
units as the corresponding columns ``source``
Parameters
----------
source: CatalogSource
the parent source of particles from which the center-of-mass
position and velocity are computed for each halo
label : array_like
the label for each particle that identifies which halo it
belongs to
comm: MPI.Comm
the mpi communicator. Must agree with the datasource
position : str, optional
the column name specifying the position
velocity : str, optional
the column name specifying the velocity
initposition : str, optional
the column name specifying the initial position; this is only
computed if available
peakcolumn : str, optional
if not None, find PeakPostion and PeakVelocity based on the
value of peakcolumn
Returns
-------
catalog: array_like
A 1-d array of type 'Position', 'Velocity', 'Length'.
The center mass position and velocity of the FOF halo, and
Length is the number of particles in a halo. The catalog is
sorted such that the most massive halo is first. ``catalog[0]``
does not correspond to any halo.
"""
from nbodykit.utils import ScatterArray
# make sure all of the columns are there
for col in [position, velocity]:
if col not in source:
raise ValueError("the column '%s' is missing from parent source; cannot compute halos" %col)
dtype=[('CMPosition', ('f4', 3)),('CMVelocity', ('f4', 3)),('Length', 'i4')]
N = count(label, comm=comm)
if periodic:
# make sure BoxSize is there
boxsize = source.attrs.get('BoxSize', None)
if boxsize is None:
raise ValueError("cannot compute halo catalog from source without 'BoxSize' in ``attrs`` dict")
else:
boxsize = None
# center of mass position
hpos = centerofmass(label, source.compute(source[position]), boxsize=boxsize, comm=comm)
# center of mass velocity
hvel = centerofmass(label, source.compute(source[velocity]), boxsize=None, comm=comm)
# center of mass initial position
if initposition in source:
dtype.append(('InitialPosition', ('f4', 3)))
hpos_init = centerofmass(label, source.compute(source[initposition]), boxsize=boxsize, comm=comm)
if peakcolumn is not None:
assert peakcolumn in source
dtype.append(('PeakPosition', ('f4', 3)))
dtype.append(('PeakVelocity', ('f4', 3)))
density = source[peakcolumn].compute()
dmax = equiv_class(label, density, op=numpy.fmax, dense_labels=True, minlength=len(N), identity=-numpy.inf)
comm.Allreduce(MPI.IN_PLACE, dmax, op=MPI.MAX)
# remove any non-peak particle from the labels
label1 = label * (density >= dmax[label])
# compute the center of mass on the new labels
ppos = centerofmass(label1, source.compute(source[position]), boxsize=boxsize, comm=comm)
pvel = centerofmass(label1, source.compute(source[velocity]), boxsize=None, comm=comm)
dtype = numpy.dtype(dtype)
if comm.rank == 0:
catalog = numpy.empty(shape=len(N), dtype=dtype)
catalog['CMPosition'] = hpos
catalog['CMVelocity'] = hvel
catalog['Length'] = N
catalog['Length'][0] = 0
if 'InitialPosition' in dtype.names:
catalog['InitialPosition'] = hpos_init
if peakcolumn is not None:
catalog['PeakPosition'] = ppos
catalog['PeakVelocity'] = pvel
else:
catalog = None
return ScatterArray(catalog, comm, root=0)
# -----------------------
# Helpers
# -----------------------
[docs]def equiv_class(labels, values, op, dense_labels=False, identity=None, minlength=None):
"""
apply operation to equivalent classes by label, on values
Parameters
----------
labels : array_like
the label of objects, starting from 0.
values : array_like
the values of objects (len(labels), ...)
op : :py:class:`numpy.ufunc`
the operation to apply
dense_labels : boolean
If the labels are already dense (from 0 to Nobjects - 1)
If False, :py:meth:`numpy.unique` is used to convert
the labels internally
Returns
-------
result :
the value of each equivalent class
Examples
--------
>>> x = numpy.arange(10)
>>> print equiv_class(x, x, numpy.fmin, dense_labels=True)
[0 1 2 3 4 5 6 7 8 9]
>>> x = numpy.arange(10)
>>> v = numpy.arange(20).reshape(10, 2)
>>> x[1] = 0
>>> print equiv_class(x, 1.0 * v, numpy.fmin, dense_labels=True, identity=numpy.inf)
[[ 0. 1.]
[ inf inf]
[ 4. 5.]
[ 6. 7.]
[ 8. 9.]
[ 10. 11.]
[ 12. 13.]
[ 14. 15.]
[ 16. 17.]
[ 18. 19.]]
"""
# dense labels
if not dense_labels:
junk, labels = numpy.unique(labels, return_inverse=True)
del junk
N = numpy.bincount(labels)
offsets = numpy.concatenate([[0], N.cumsum()], axis=0)[:-1]
arg = labels.argsort()
if identity is None: identity = op.identity
if minlength is None:
minlength = len(N)
# work around numpy dtype reference counting bugs
# be a simple man and never steal anything.
dtype = numpy.dtype((values.dtype, values.shape[1:]))
result = numpy.empty(minlength, dtype=dtype)
result[:len(N)] = op.reduceat(values[arg], offsets)
if (N == 0).any():
result[:len(N)][N == 0] = identity
if len(N) < minlength:
result[len(N):] = identity
return result
[docs]def replacesorted(arr, sorted, b, out=None):
"""
replace a with corresponding b in arr
Parameters
----------
arr : array_like
input array
sorted : array_like
sorted
b : array_like
out : array_like,
output array
Result
------
newarr : array_like
arr with a replaced by corresponding b
Examples
--------
>>> print replacesorted(numpy.arange(10), numpy.arange(5), numpy.ones(5))
[1 1 1 1 1 5 6 7 8 9]
"""
if out is None:
out = arr.copy()
if len(sorted) == 0:
return out
ind = sorted.searchsorted(arr)
ind.clip(0, len(sorted) - 1, out=ind)
arr = numpy.array(arr)
found = sorted[ind] == arr
out[found] = b[ind[found]]
return out
[docs]def centerofmass(label, pos, boxsize, comm=MPI.COMM_WORLD):
"""
Calulate the center of mass of particles of the same label.
The center of mass is defined as the mean of positions of particles,
but care has to be taken regarding to the periodic boundary.
This is a collective operation, and after the call, all ranks
will have the position of halos.
Parameters
----------
label : array_like (integers)
Halo label of particles, >=0
pos : array_like (float, 3)
position of particles.
boxsize : float or None
size of the periodic box, or None if no periodic boundary is assumed.
comm : :py:class:`MPI.Comm`
communicator for the collective operation.
Returns
-------
hpos : array_like (float, 3)
the center of mass position of the halos.
"""
Nhalo0 = max(comm.allgather(label.max())) + 1
N = numpy.bincount(label, minlength=Nhalo0)
comm.Allreduce(MPI.IN_PLACE, N, op=MPI.SUM)
if boxsize is not None:
posmin = equiv_class(label, pos, op=numpy.fmin, dense_labels=True, identity=numpy.inf,
minlength=len(N))
comm.Allreduce(MPI.IN_PLACE, posmin, op=MPI.MIN)
dpos = pos - posmin[label]
for i in range(dpos.shape[-1]):
bhalf = boxsize[i] * 0.5
dpos[..., i][dpos[..., i] < -bhalf] += boxsize[i]
dpos[..., i][dpos[..., i] >= bhalf] -= boxsize[i]
else:
dpos = pos
dpos = equiv_class(label, dpos, op=numpy.add, dense_labels=True, minlength=len(N))
comm.Allreduce(MPI.IN_PLACE, dpos, op=MPI.SUM)
dpos /= N[:, None]
if boxsize is not None:
hpos = posmin + dpos
hpos %= boxsize
else:
hpos = dpos
return hpos
[docs]def count(label, comm=MPI.COMM_WORLD):
"""
Count the number of particles of the same label.
This is a collective operation, and after the call, all ranks
will have the particle count.
Parameters
----------
label : array_like (integers)
Halo label of particles, >=0
comm : :py:class:`MPI.Comm`
communicator for the collective operation.
Returns
-------
count : array_like
the count of number of particles in each halo
"""
Nhalo0 = max(comm.allgather(label.max())) + 1
N = numpy.bincount(label, minlength=Nhalo0)
comm.Allreduce(MPI.IN_PLACE, N, op=MPI.SUM)
return N