import os
import numpy as np
import platform
from collections import defaultdict, namedtuple
import logging
from meld.util import log_timing
import sys
logger = logging.getLogger(__name__)
# setup exception handling to abort when there is unhandled exception
sys_excepthook = sys.excepthook
def mpi_excepthook(type, value, traceback):
sys_excepthook(type, value, traceback)
get_mpi_comm_world().Abort(1)
sys.excepthook = mpi_excepthook
[docs]class MPICommunicator(object):
"""
Class to handle communications between master and slaves using MPI.
:param n_atoms: number of atoms
:param n_replicas: number of replicas
.. note::
creating an MPI communicator will not actually initialize MPI. To do that,
call :meth:`initialize`.
"""
def __init__(self, n_atoms, n_replicas):
# We're not using n_atoms and n_replicas, but if we switch
# to more efficient buffer-based MPI routines, we'll need them.
self._n_atoms = n_atoms
self._n_replicas = n_replicas
self._mpi_comm = None
def __getstate__(self):
# don't pickle _mpi_comm
return dict((k, v) for (k, v) in self.__dict__.iteritems() if not k == '_mpi_comm')
def __setstate__(self, state):
# set _mpi_comm to None
self.__dict__ = state
self._mpi_comm = None
[docs] def initialize(self):
"""
Initialize and start MPI
"""
self._mpi_comm = get_mpi_comm_world()
self._my_rank = self._mpi_comm.Get_rank()
[docs] def is_master(self):
"""
Is this the master node?
:returns: :const:`True` if we are the master, otherwise :const:`False`
"""
if self._my_rank == 0:
return True
else:
return False
@log_timing(logger)
def barrier(self):
self._mpi_comm.barrier()
@log_timing(logger)
[docs] def broadcast_alphas_to_slaves(self, alphas):
"""
broadcast_alphas_to_slaves(alphas)
Send the alpha values to the slaves.
:param alphas: a list of alpha values, one for each replica.
The master node's alpha value should be included in this list.
The master node will always be at alpha=0.0
:returns: :const:`None`
"""
self._mpi_comm.scatter(alphas, root=0)
@log_timing(logger)
[docs] def receive_alpha_from_master(self):
"""
receive_alpha_from_master()
Receive alpha value from master node.
:returns: a floating point value for alpha in ``[0,1]``
"""
return self._mpi_comm.scatter(None, root=0)
@log_timing(logger)
[docs] def broadcast_states_to_slaves(self, states):
"""
broadcast_states_to_slaves(states)
Send a state to each slave.
:param states: a list of states. The list of states should include
the state for the master node. These are the states that will
be simulated on each replica for each step.
:returns: the state to run on the master node
"""
return self._mpi_comm.scatter(states, root=0)
@log_timing(logger)
[docs] def receive_state_from_master(self):
"""
receive_state_from_master()
Get state to run for this step
:returns: the state to run for this step
"""
return self._mpi_comm.scatter(None, root=0)
@log_timing(logger)
[docs] def gather_states_from_slaves(self, state_on_master):
"""
gather_states_from_slaves(state_on_master)
Receive states from all slaves
:param state_on_master: the state on the master after simulating
:returns: A list of states, one from each replica.
The returned states are the states after simulating.
"""
return self._mpi_comm.gather(state_on_master, root=0)
@log_timing(logger)
[docs] def send_state_to_master(self, state):
"""
send_state_to_master(state)
Send state to master
:param state: State to send to master. This is the state after
simulating this step.
:returns: :const:`None`
"""
self._mpi_comm.gather(state, root=0)
@log_timing(logger)
[docs] def broadcast_states_for_energy_calc_to_slaves(self, states):
"""
broadcast_states_for_energy_calc_to_slaves(states)
Broadcast states to all slaves. Send all results from this step to every
slave so that we can calculate the energies and do replica exchange.
:param states: a list of states
:returns: :const:`None`
"""
self._mpi_comm.bcast(states, root=0)
@log_timing(logger)
[docs] def exchange_states_for_energy_calc(self, state):
"""
exchange_states_for_energy_calc(state)
Exchange states between all processes.
:param state: the state for this node
:returns: a list of states from all nodes
"""
return self._mpi_comm.allgather(state)
@log_timing(logger)
[docs] def receive_states_for_energy_calc_from_master(self):
"""
receive_states_for_energy_calc_from_master()
Receive all states from master.
:returns: a list of states to calculate the energy of
"""
return self._mpi_comm.bcast(None, root=0)
@log_timing(logger)
[docs] def gather_energies_from_slaves(self, energies_on_master):
"""
gather_energies_from_slaves(energies_on_master)
Receive a list of energies from each slave.
:param energies_on_master: a list of energies from the master
:returns: a square matrix of every state on every replica to be used
for replica exchange
"""
energies = self._mpi_comm.gather(energies_on_master, root=0)
return np.array(energies)
@log_timing(logger)
[docs] def send_energies_to_master(self, energies):
"""
send_energies_to_master(energies)
Send a list of energies to the master.
:param energies: a list of energies to send to the master
:returns: :const:`None`
"""
return self._mpi_comm.gather(energies, root=0)
@log_timing(logger)
def negotiate_device_id(self):
hostname = platform.node()
try:
visible_devices = os.environ['CUDA_VISIBLE_DEVICES']
logger.debug('%s found cuda devices: %s', hostname, visible_devices)
visible_devices = visible_devices.split(',')
if visible_devices:
visible_devices = [int(dev) for dev in visible_devices]
else:
raise RuntimeError('No cuda devices available')
except KeyError:
logger.debug('%s CUDA_VISIBLE_DEVICES is not set.', hostname)
visible_devices = None
hosts = self._mpi_comm.gather(HostInfo(hostname, visible_devices), root=0)
# the master computes the device ids
if self._my_rank == 0:
if hosts[0].devices is None:
# if CUDA_VISIBLE_DEVICES isn't set on the master, we assume it
# isn't set for any node
# create an empty default dict to count hosts
host_counts = defaultdict(int)
# list of device ids
# this assumes that available devices for each node
# are numbered starting from 0
device_ids = []
for host in hosts:
assert host.devices is None
device_ids.append(host_counts[host.host_name])
host_counts[host.host_name] += 1
else:
# CUDA_VISIBLE_DEVICES is set on the master, so we
# assume it is set for all nodes
# create a dict to hold the device ids available on each host
available_devices = {}
# store the available devices on each node
for host in hosts:
if host.host_name in available_devices:
assert host.devices == available_devices[host.host_name]
else:
available_devices[host.host_name] = host.devices
# CUDA numbers the devices from 0 always,
# e.g. if CUDA_VISIBLE_DEVICES=2,3 we still need to ask for
# devices 0 and 1 to get physical devices 2 and 3.
# So, we subtract the minimum value from each each to make it zero
for host in hosts:
min_device_id = min(available_devices[host.host_name])
available_devices[host.host_name] = [device_id - min_device_id for device_id in available_devices[host.host_name]]
# device ids for each node
device_ids = []
for host in hosts:
# pop off the first device_id for this host name
device_ids.append(available_devices[host.host_name].pop(0))
# receive device id from master
else:
device_ids = None
# do the communication
device_id = self._mpi_comm.scatter(device_ids, root=0)
logger.debug('hostname: %s, device_id: %d', hostname, device_id)
return device_id
@property
def n_replicas(self):
return self._n_replicas
@property
def n_atoms(self):
return self._n_atoms
@property
def rank(self):
return self._my_rank
def get_mpi_comm_world():
"""
Helper function to import mpi4py and return the comm_world.
"""
from mpi4py import MPI
return MPI.COMM_WORLD
# namedtuple to hold results for negotiate id
HostInfo = namedtuple('HostInfo', 'host_name devices')