On Demand IO via dask.array

nbodykit uses the dask package to store the columns in CatalogSource objects. The dask package implements a dask.array.Array object that mimics that interface of the more familiar numpy array. In this section, we describe what exactly a dask array is and how it is used in nbodykit.

What is a dask array?

In nbodykit, the dask array object is a data container that behaves nearly identical to a numpy array, except for one key difference. When performing manipulations on a numpy array, the operations are performed immediately. This is not the case for dask arrays. Instead, dask arrays store these operations in a task graph and only evaluate the operations when the user specifies to via a call to a compute() function. When using nbodykit, often the first task in this graph is loading data from disk. Thus, dask provides nbodykit with on-demand IO functionality, allowing the user to control when data is read from disk.

It is useful to describe a bit more about the nuts and bolts of the dask array to illustrate its full power. The dask array object cuts up the full array into many smaller arrays and performs calculations on these smaller “chunks”. This allows array computations to be performed on large data that does not fit into memory (but can be stored on disk). Particularly useful on laptops and other systems with limited memory, it extends the maximum size of useable datasets from the size of memory to the size of the disk storage. For further details, please see the introduction to the dask array in the dask documentation.

By Example

The dask array functionality is best illustrated by example. Here, we initialize a UniformCatalog that generates objects with uniformly distributed position and velocity columns.

from nbodykit.lab import UniformCatalog

cat = UniformCatalog(nbar=100, BoxSize=1.0, seed=42)
/home/yfeng1/anaconda3/install/lib/python3.6/site-packages/h5py/__init__.py:36: FutureWarning: Conversion of the second argument of issubdtype from `float` to `np.floating` is deprecated. In future, it will be treated as `np.float64 == np.dtype(float).type`.
  from ._conv import register_converters as _register_converters
print("catalog = ", cat)
catalog =  UniformCatalog(size=96, seed=42)
print("Position = ", cat['Position'])
Position =  dask.array<array, shape=(96, 3), dtype=float64, chunksize=(96, 3)> first: [0.45470105 0.83263203 0.06905134] last: [0.62474599 0.15388738 0.84302209]

We see that the Position column can be accessed by indexing the catalog with the column name and that the returned object is not a numpy array but a dask array. The dask array has the same shape (96,3) and dtype (‘f8’) as the underlying numpy array but also includes the chunksize attribute. This attribute specifies the size of the internal chunks that dask uses to examine arrays in smaller pieces. In this case, the data size is small enough that only a single chunk is needed.

The dask.array module

The dask.array module provides much of the same functionality as the numpy module, but with functions optimized to perform operations on dask arrays.

For example, we can easily compute the minimum and maximum position coordinates using the dask.array.min() and dask.array.max() functions.

import dask.array as da

pos = cat['Position']
minpos = da.min(pos, axis=0)
maxpos = da.max(pos, axis=0)

print("minimum position coordinates = ", minpos)
print("maximum position coordinates = ", maxpos)
minimum position coordinates =  dask.array<amin-aggregate, shape=(3,), dtype=float64, chunksize=(3,)>
maximum position coordinates =  dask.array<amax-aggregate, shape=(3,), dtype=float64, chunksize=(3,)>

Here, we see that the result of our calls to dask.array.min() and dask.array.max() are also stored as dask arrays. The task has not yet been performed but instead added to the internal dask task graph.

For a full list of the available functionality, please see the dask array documentation. A large subset of the most commonly used functions in numpy have implementations in the dask.array module. In addition to these functions, dask arrays support the usual array arithmetic operations. For example, to rescale the position coordinate array, use

BoxSize = 2500.0
pos *= BoxSize

rescaled_minpos = da.min(pos, axis=0)
rescaled_maxpos = da.max(pos, axis=0)

Evaluating a dask array

The CatalogSource.compute() function computes a dask array and returns the result of the internal series of tasks, either a numpy array or float. For example, we can compute the minimum and maximum of the position coordinates using:

minpos, maxpos = cat.compute(minpos, maxpos)
print("minimum position coordinates = ", minpos)
print("maximum position coordinates = ", maxpos)
minimum position coordinates =  [0.00402579 0.00015685 0.00271747]
maximum position coordinates =  [0.9927406  0.99610592 0.99925086]

And similarly, we see the result of the rescaling operation earlier:

minpos, maxpos = cat.compute(rescaled_minpos, rescaled_maxpos)
print("minimum re-scaled position coordinates = ", minpos)
print("maximum re-scaled position coordinates = ", maxpos)
minimum re-scaled position coordinates =  [10.06446279  0.39212416  6.79367111]
maximum re-scaled position coordinates =  [2481.85149744 2490.26480949 2498.12715085]

Caching with Dask

nbodykit automatically caches tasks computed when evaluating dask arrays. The global cache is controlled via the nbodykit.GlobalCache class. Often the most expensive task when evaluating a dask array is loading the data from disk. By using dask’s caching features, CatalogSource objects are able to cache intermediate results, such that repeated calls to CatalogSource.compute() do not repeat expensive IO operations.

The global cache has a fixed size. By default, we set the value to a reasonable (not too large) value. The default value is controlled by the global_cache_size keyword, which can be controlled via the set_options function. Users can control the size of the global cache using:

from nbodykit import GlobalCache
GlobalCache.resize(2e9) # set cache size to 2 GB


When accessing columns of a CatalogSource, the returned dask array also has a compute() function. When using this function to evaluate dask arrays, internal caching will also be used. So users have the option of using CatalogSource.compute() or the compute() attached to each dask array.

Examining Larger-than-Memory Data

CatalogSource objects automatically take advantage of the chunking features of the dask array, greatly reducing the difficulties of analyzing larger-than-memory data. When combined with the ability of the CatalogSource object to provide a continuous view of multiple files at once, we can analyze large amounts of data from a single catalog with ease.

A common use case is examining a directory of large binary outputs from a N-body simulation on a laptop. Often the user wishes to select a smaller subsample of the catalog or perform some trivial data inspection to verify the accuracy of the data. These tasks become straightforward with nbodykit, using the functionality provided by the CatalogSource object and the dask package. Regardless of the size of the data that the user is loading, the nbodykit CatalogSource interface remains the same.