On Demand IO via dask.array
¶
nbodykit uses the dask package to store the columns
in CatalogSource
objects. The dask
package implements
a dask.array.Array
object that mimics that interface of the more
familiar numpy array. In this section, we describe what exactly a dask array
is and how it is used in nbodykit.
What is a dask array?¶
In nbodykit, the dask array object is a data container that behaves nearly
identical to a numpy array, except for one key difference. When performing
manipulations on a numpy array, the operations are performed immediately.
This is not the case for dask arrays. Instead, dask arrays store these
operations in a task graph and only evaluate the operations when the user
specifies to via a call to a compute()
function. When using
nbodykit, often the first task in this graph is loading data from disk.
Thus, dask provides nbodykit with on-demand IO functionality, allowing the user
to control when data is read from disk.
It is useful to describe a bit more about the nuts and bolts of the dask array to illustrate its full power. The dask array object cuts up the full array into many smaller arrays and performs calculations on these smaller “chunks”. This allows array computations to be performed on large data that does not fit into memory (but can be stored on disk). Particularly useful on laptops and other systems with limited memory, it extends the maximum size of useable datasets from the size of memory to the size of the disk storage. For further details, please see the introduction to the dask array in the dask documentation.
By Example¶
The dask array functionality is best illustrated by example. Here, we
initialize a UniformCatalog
that generates objects with uniformly distributed position and velocity columns.
[1]:
from nbodykit.lab import UniformCatalog
cat = UniformCatalog(nbar=100, BoxSize=1.0, seed=42)
/home/yfeng1/anaconda3/install/lib/python3.6/site-packages/h5py/__init__.py:36: FutureWarning: Conversion of the second argument of issubdtype from `float` to `np.floating` is deprecated. In future, it will be treated as `np.float64 == np.dtype(float).type`.
from ._conv import register_converters as _register_converters
[2]:
print("catalog = ", cat)
catalog = UniformCatalog(size=96, seed=42)
[3]:
print("Position = ", cat['Position'])
Position = dask.array<array, shape=(96, 3), dtype=float64, chunksize=(96, 3)> first: [0.45470105 0.83263203 0.06905134] last: [0.62474599 0.15388738 0.84302209]
We see that the Position
column can be accessed by indexing the catalog
with the column name and that the returned object is not a numpy array
but a dask array. The dask array has the same shape
(96,3) and
dtype
(‘f8’) as the underlying numpy array but also includes the
chunksize
attribute. This attribute specifies the size of the internal
chunks that dask uses to examine arrays in smaller pieces. In this case,
the data size is small enough that only a single chunk is needed.
The dask.array
module¶
The dask.array
module provides much of the same functionality as the
numpy
module, but with functions optimized to perform operations on
dask arrays.
For example, we can easily compute the minimum and maximum position coordinates
using the dask.array.min()
and dask.array.max()
functions.
[4]:
import dask.array as da
pos = cat['Position']
minpos = da.min(pos, axis=0)
maxpos = da.max(pos, axis=0)
print("minimum position coordinates = ", minpos)
print("maximum position coordinates = ", maxpos)
minimum position coordinates = dask.array<amin-aggregate, shape=(3,), dtype=float64, chunksize=(3,)>
maximum position coordinates = dask.array<amax-aggregate, shape=(3,), dtype=float64, chunksize=(3,)>
Here, we see that the result of our calls to dask.array.min()
and
dask.array.max()
are also stored as dask arrays. The task
has not yet been performed but instead added to the internal dask task graph.
For a full list of the available functionality, please see the
dask array documentation.
A large subset of the most commonly used functions in numpy have
implementations in the dask.array
module. In addition to these
functions, dask arrays support the usual array arithmetic operations. For
example, to rescale the position coordinate array, use
[5]:
BoxSize = 2500.0
pos *= BoxSize
rescaled_minpos = da.min(pos, axis=0)
rescaled_maxpos = da.max(pos, axis=0)
Evaluating a dask array¶
The CatalogSource.compute()
function computes a dask array and returns
the result of the internal series of tasks, either a numpy array or float.
For example, we can compute the minimum and maximum of the position coordinates
using:
[6]:
minpos, maxpos = cat.compute(minpos, maxpos)
print("minimum position coordinates = ", minpos)
print("maximum position coordinates = ", maxpos)
minimum position coordinates = [0.00402579 0.00015685 0.00271747]
maximum position coordinates = [0.9927406 0.99610592 0.99925086]
And similarly, we see the result of the rescaling operation earlier:
[7]:
minpos, maxpos = cat.compute(rescaled_minpos, rescaled_maxpos)
print("minimum re-scaled position coordinates = ", minpos)
print("maximum re-scaled position coordinates = ", maxpos)
minimum re-scaled position coordinates = [10.06446279 0.39212416 6.79367111]
maximum re-scaled position coordinates = [2481.85149744 2490.26480949 2498.12715085]
Caching with Dask¶
nbodykit automatically caches tasks computed when evaluating dask arrays. The
global cache is controlled via the nbodykit.GlobalCache
class.
Often the most expensive task when evaluating a dask array is loading the data
from disk. By using dask’s caching features, CatalogSource
objects are
able to cache intermediate results, such that repeated calls to
CatalogSource.compute()
do not repeat expensive IO operations.
The global cache has a fixed size. By default, we set the value to a reasonable
(not too large) value. The default value is controlled by the global_cache_size
keyword, which can be controlled via the set_options
function.
Users can control the size of the global cache using:
from nbodykit import GlobalCache
GlobalCache.resize(2e9) # set cache size to 2 GB
Note
When accessing columns of a CatalogSource
, the returned dask
array also has a compute()
function. When using this function to
evaluate dask arrays, internal caching will also be used. So users have the
option of using CatalogSource.compute()
or the compute()
attached to each dask array.
Examining Larger-than-Memory Data¶
CatalogSource
objects automatically take advantage of the chunking
features of the dask array, greatly reducing the difficulties of
analyzing larger-than-memory data. When combined with the ability of the
CatalogSource
object to provide a continuous view of multiple files
at once, we can analyze large amounts of data from a single catalog with ease.
A common use case is examining a directory of large binary outputs from a
N-body simulation on a laptop. Often the user wishes to select a smaller subsample
of the catalog or perform some trivial data inspection to verify the accuracy of
the data. These tasks become straightforward with nbodykit, using the functionality
provided by the CatalogSource
object and the dask
package.
Regardless of the size of the data that the user is loading, the nbodykit
CatalogSource
interface remains the same.