import os
import traceback
import logging
import numpy
from mpi4py import MPI
from nbodykit import CurrentMPIComm
[docs]def split_ranks(N_ranks, N, include_all=False):
"""
Divide the ranks into chunks, attempting to have `N` ranks
in each chunk. This removes the master (0) rank, such
that `N_ranks - 1` ranks are available to be grouped
Parameters
----------
N_ranks : int
the total number of ranks available
N : int
the desired number of ranks per worker
include_all : bool, optional
if `True`, then do not force each group to have
exactly `N` ranks, instead including the remainder as well;
default is `False`
"""
available = list(range(1, N_ranks)) # available ranks to do work
total = len(available)
extra_ranks = total % N
if include_all:
for i, chunk in enumerate(numpy.array_split(available, max(total//N, 1))):
yield i, list(chunk)
else:
for i in range(total//N):
yield i, available[i*N:(i+1)*N]
i = total // N
if extra_ranks and extra_ranks >= N//2:
remove = extra_ranks % 2 # make it an even number
ranks = available[-extra_ranks:]
if remove: ranks = ranks[:-remove]
if len(ranks):
yield i+1, ranks
[docs]def enum(*sequential, **named):
"""
Enumeration values to serve as status tags passed
between processeseee
"""
enums = dict(zip(sequential, range(len(sequential))), **named)
return type('Enum', (), enums)
[docs]class TaskManager(object):
"""
An MPI task manager that distributes tasks over a set of MPI processes,
using a specified number of independent workers to compute each task.
Given the specified number of independent workers (which compute
tasks in parallel), the total number of available CPUs will be
divided evenly.
The main function is ``iterate`` which iterates through a set of tasks,
distributing the tasks in parallel over the available ranks.
Parameters
----------
cpus_per_task : int
the desired number of ranks assigned to compute
each task
comm : MPI communicator, optional
the global communicator that will be split so each worker
has a subset of CPUs available; default is COMM_WORLD
debug : bool, optional
if `True`, set the logging level to `DEBUG`, which prints
out much more information; default is `False`
use_all_cpus : bool, optional
if `True`, use all available CPUs, including the remainder
if `cpus_per_task` is not divide the total number of CPUs
evenly; default is `False`
"""
logger = logging.getLogger('TaskManager')
@CurrentMPIComm.enable
def __init__(self, cpus_per_task, comm=None, debug=False, use_all_cpus=False):
if debug:
self.logger.setLevel(logging.DEBUG)
self.cpus_per_task = cpus_per_task
self.use_all_cpus = use_all_cpus
# the base communicator
self.basecomm = MPI.COMM_WORLD if comm is None else comm
self.rank = self.basecomm.rank
self.size = self.basecomm.size
# need at least one
if self.size == 1:
raise ValueError("need at least two processes to use a TaskManager")
# communication tags
self.tags = enum('READY', 'DONE', 'EXIT', 'START')
# the task communicator
self.comm = None
# store a MPI status
self.status = MPI.Status()
[docs] def __enter__(self):
"""
Split the base communicator such that each task gets allocated
the specified number of cpus to perform the task with
"""
chain_ranks = []
color = 0
total_ranks = 0
nworkers = 0
# split the ranks
for i, ranks in split_ranks(self.size, self.cpus_per_task, include_all=self.use_all_cpus):
chain_ranks.append(ranks[0])
if self.rank in ranks: color = i+1
total_ranks += len(ranks)
nworkers = nworkers + 1
self.workers = nworkers # store the total number of workers
# check for no workers!
if self.workers == 0:
raise ValueError("no pool workers available; try setting `use_all_cpus` = True")
leftover = (self.size - 1) - total_ranks
if leftover and self.rank == 0:
args = (self.cpus_per_task, self.size-1, leftover)
self.logger.warning("with `cpus_per_task` = %d and %d available rank(s), %d rank(s) will do no work" %args)
self.logger.warning("set `use_all_cpus=True` to use all available cpus")
# crash if we only have one process or one worker
if self.size <= self.workers:
args = (self.size, self.workers+1, self.workers)
raise ValueError("only have %d ranks; need at least %d to use the desired %d workers" %args)
# ranks that will do work have a nonzero color now
self._valid_worker = color > 0
# split the comm between the workers
self.comm = self.basecomm.Split(color, 0)
CurrentMPIComm.push(self.comm)
return self
[docs] def is_root(self):
"""
Is the current process the root process?
Root is responsible for distributing the tasks to the other available ranks
"""
return self.rank == 0
[docs] def is_worker(self):
"""
Is the current process a valid worker?
Workers wait for instructions from the master
"""
try:
return self._valid_worker
except:
raise ValeuError("workers are only defined when inside the ``with TaskManager()`` context")
def _get_tasks(self):
"""
Internal generator that yields the next available task from a worker
"""
if self.is_root():
raise RuntimeError("Root rank mistakenly told to await tasks")
# logging info
if self.comm.rank == 0:
args = (self.rank, MPI.Get_processor_name(), self.comm.size)
self.logger.debug("worker master rank is %d on %s with %d processes available" %args)
# continously loop and wait for instructions
while True:
args = None
tag = -1
# have the master rank of the subcomm ask for task and then broadcast
if self.comm.rank == 0:
self.basecomm.send(None, dest=0, tag=self.tags.READY)
args = self.basecomm.recv(source=0, tag=MPI.ANY_TAG, status=self.status)
tag = self.status.Get_tag()
# bcast to everyone in the worker subcomm
args = self.comm.bcast(args) # args is [task_number, task_value]
tag = self.comm.bcast(tag)
# yield the task
if tag == self.tags.START:
# yield the task value
yield args
# wait for everyone in task group before telling master this task is done
self.comm.Barrier()
if self.comm.rank == 0:
self.basecomm.send([args[0], None], dest=0, tag=self.tags.DONE)
# see ya later
elif tag == self.tags.EXIT:
break
# wait for everyone in task group and exit
self.comm.Barrier()
if self.comm.rank == 0:
self.basecomm.send(None, dest=0, tag=self.tags.EXIT)
# debug logging
self.logger.debug("rank %d process is done waiting" %self.rank)
def _distribute_tasks(self, tasks):
"""
Internal function that distributes the tasks from the root to the workers
"""
if not self.is_root():
raise ValueError("only the root rank should distribute the tasks")
ntasks = len(tasks)
task_index = 0
closed_workers = 0
# logging info
args = (self.workers, ntasks)
self.logger.debug("master starting with %d worker(s) with %d total tasks" %args)
# loop until all workers have finished with no more tasks
while closed_workers < self.workers:
# look for tags from the workers
data = self.basecomm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=self.status)
source = self.status.Get_source()
tag = self.status.Get_tag()
# worker is ready, so send it a task
if tag == self.tags.READY:
# still more tasks to compute
if task_index < ntasks:
this_task = [task_index, tasks[task_index]]
self.basecomm.send(this_task, dest=source, tag=self.tags.START)
self.logger.debug("sending task `%s` to worker %d" %(str(tasks[task_index]), source))
task_index += 1
# all tasks sent -- tell worker to exit
else:
self.basecomm.send(None, dest=source, tag=self.tags.EXIT)
# store the results from finished tasks
elif tag == self.tags.DONE:
self.logger.debug("received result from worker %d" %source)
# track workers that exited
elif tag == self.tags.EXIT:
closed_workers += 1
self.logger.debug("worker %d has exited, closed workers = %d" %(source, closed_workers))
[docs] def iterate(self, tasks):
"""
A generator that iterates through a series of tasks in parallel.
Notes
-----
This is a collective operation and should be called by
all ranks
Parameters
----------
tasks : iterable
an iterable of `task` items that will be yielded in parallel
across all ranks
Yields
-------
task :
the individual items of `tasks`, iterated through in parallel
"""
# master distributes the tasks and tracks closed workers
if self.is_root():
self._distribute_tasks(tasks)
# workers will wait for instructions
elif self.is_worker():
for tasknum, args in self._get_tasks():
yield args
[docs] def map(self, function, tasks):
"""
Like the built-in :func:`map` function, apply a function to all
of the values in a list and return the list of results.
If ``tasks`` contains tuples, the arguments are passed to
``function`` using the ``*args`` syntax
Notes
-----
This is a collective operation and should be called by
all ranks
Parameters
----------
function : callable
The function to apply to the list.
tasks : list
The list of tasks
Returns
-------
results : list
the list of the return values of :func:`function`
"""
results = []
# master distributes the tasks and tracks closed workers
if self.is_root():
self._distribute_tasks(tasks)
# workers will wait for instructions
elif self.is_worker():
# iterate through tasks in parallel
for tasknum, args in self._get_tasks():
# make function arguments consistent with *args
if not isinstance(args, tuple):
args = (args,)
# compute the result (only worker root needs to save)
result = function(*args)
if self.comm.rank == 0:
results.append((tasknum, result))
# put the results in the correct order
results = self.basecomm.allgather(results)
results = [item for sublist in results for item in sublist]
return [r[1] for r in sorted(results, key=lambda x: x[0])]
[docs] def __exit__(self, exc_type, exc_value, exc_traceback):
"""
Exit gracefully by closing and freeing the MPI-related variables
"""
if exc_value is not None:
trace = ''.join(traceback.format_exception(exc_type, exc_value, exc_traceback, limit=5))
self.logger.error("an exception has occurred on rank %d:\n%s" %(self.rank, trace))
# bit of hack that forces mpi4py to exit all ranks
# see https://groups.google.com/forum/embed/#!topic/mpi4py/RovYzJ8qkbc
os._exit(1)
# wait and exit
self.logger.debug("rank %d process finished" %self.rank)
self.basecomm.Barrier()
if self.is_root():
self.logger.debug("master is finished; terminating")
CurrentMPIComm.pop()
if self.comm is not None:
self.comm.Free()