On Demand IO via dask.array

nbodykit uses the dask package to store the columns in CatalogSource objects. The dask package implements a dask.array.Array object that mimics that interface of the more familiar numpy array. In this section, we describe what exactly a dask array is and how it is used in nbodykit.

What is a dask array?

In nbodykit, the dask array object is a data container that behaves nearly identical to a numpy array, except for one key difference. When performing manipulations on a numpy array, the operations are performed immediately. This is not the case for dask arrays. Instead, dask arrays store these operations in a task graph and only evaluate the operations when the user specifies to via a call to a compute() function. When using nbodykit, often the first task in this graph is loading data from disk. Thus, dask provides nbodykit with on-demand IO functionality, allowing the user to control when data is read from disk.

It is useful to describe a bit more about the nuts and bolts of the dask array to illustrate its full power. The dask array object cuts up the full array into many smaller arrays and performs calculations on these smaller “chunks”. This allows array computations to be performed on large data that does not fit into memory (but can be stored on disk). Particularly useful on laptops and other systems with limited memory, it extends the maximum size of useable datasets from the size of memory to the size of the disk storage. For further details, please see the introduction to the dask array in the dask documentation.

By Example

The dask array functionality is best illustrated by example. Here, we initialize a UniformCatalog that generates objects with uniformly distributed position and velocity columns.

In [1]: from nbodykit.lab import UniformCatalog

In [2]: cat = UniformCatalog(nbar=100, BoxSize=1.0, seed=42)

In [3]: print(cat)
UniformCatalog(size=96, seed=42)

In [4]: print(cat['Position'])
dask.array<array, shape=(96, 3), dtype=float64, chunksize=(96, 3)> first : [ 0.45470105  0.83263203  0.06905134] last: [ 0.62474599  0.15388738  0.84302209]

We see that the Position column can be accessed by indexing the catalog with the column name and that the returned object is not a numpy array but a dask array. The dask array has the same shape (96,3) and dtype (‘f8’) as the underlying numpy array but also includes the chunksize attribute. This attribute specifies the size of the internal chunks that dask uses to examine arrays in smaller pieces. In this case, the data size is small enough that only a single chunk is needed.

The dask.array module

The dask.array module provides much of the same functionality as the numpy module, but with functions optimized to perform operations on dask arrays.

For example, we can easily compute the minimum and maximum position coordinates using the dask.array.min() and dask.array.max() functions.

In [5]: import dask.array as da

In [6]: pos = cat['Position']

In [7]: minpos = da.min(pos, axis=0)

In [8]: maxpos = da.max(pos, axis=0)

In [9]: print("minimum position coordinates = ", minpos)
minimum position coordinates =  dask.array<amin-aggregate, shape=(3,), dtype=float64, chunksize=(3,)>

In [10]: print("maximum position coordinates = ", maxpos)
maximum position coordinates =  dask.array<amax-aggregate, shape=(3,), dtype=float64, chunksize=(3,)>

Here, we see that the result of our calls to dask.array.min() and dask.array.max() are also stored as dask arrays. The task has not yet been performed but instead added to the internal dask task graph.

For a full list of the available functionality, please see the dask array documentation. A large subset of the most commonly used functions in numpy have implementations in the dask.array module. In addition to these functions, dask arrays support the usual array arithmetic operations. For example, to rescale the position coordinate array, use

In [11]: BoxSize = 2500.0

In [12]: pos *= BoxSize

In [13]: rescaled_minpos = da.min(pos, axis=0)

In [14]: rescaled_maxpos = da.max(pos, axis=0)

Evaluating a dask array

The CatalogSource.compute() function computes a dask array and returns the result of the internal series of tasks, either a numpy array or float. For example, we can compute the minimum and maximum of the position coordinates using:

In [15]: minpos, maxpos = cat.compute(minpos, maxpos)

In [16]: print("minimum position coordinates = ", minpos)
minimum position coordinates =  [ 0.00402579  0.00015685  0.00271747]

In [17]: print("maximum position coordinates = ", maxpos)
maximum position coordinates =  [ 0.9927406   0.99610592  0.99925086]

And similarly, we see the result of the rescaling operation earlier:

In [18]: minpos, maxpos = cat.compute(rescaled_minpos, rescaled_maxpos)

In [19]: print("minimum re-scaled position coordinates = ", minpos)
minimum re-scaled position coordinates =  [ 10.06446279   0.39212416   6.79367111]

In [20]: print("maximum re-scaled position coordinates = ", maxpos)
maximum re-scaled position coordinates =  [ 2481.85149744  2490.26480949  2498.12715085]

Caching with Dask

Subclasses of CatalogSource accept the use_cache keyword, which can turn on an internal cache to use when evaluating dask arrays. Often the most expensive task when evaluating a dask array is loading the data from disk. With this feature is turned on, the CatalogSource object will cache intermediate results, such that repeated calls to CatalogSource.compute() do not repeat expensive IO operations.

Note

All dask arrays also have a built-in compute() function that can be called to evaluate the array. However, this function does not take advantage of any cache features. The built-in compute() function is useful for quick data inspection, but we recommend using CatalogSource.compute() when performing most calculations with nbodykit.

Examining Larger-than-Memory Data

CatalogSource objects automatically take advantage of the chunking features of the dask array, greatly reducing the difficulties of analyzing larger-than-memory data. When combined with the ability of the CatalogSource object to provide a continuous view of multiple files at once, we can analyze large amounts of data from a single catalog with ease.

A common use case is examining a directory of large binary outputs from a N-body simulation on a laptop. Often the user wishes to select a smaller subsample of the catalog or perform some trivial data inspection to verify the accuracy of the data. These tasks become straightforward with nbodykit, using the functionality provided by the CatalogSource object and the dask package. Regardless of the size of the data that the user is loading, the nbodykit CatalogSource interface remains the same.