Source code for meld.vault

import contextlib
import os
import time
import cPickle as pickle
import netCDF4 as cdf
import numpy as np
import shutil
from meld.system import state

[docs]class DataStore(object): """ Class to handle storing data from MELD runs. :param n_atoms: number of atoms :param n_replicas: number of replicas :param block_size: size of netcdf blocks and frequency to do backups Data will be stored in the 'Data' subdirectory. Backups will be stored in 'Data/Backup'. Some information is stored as python pickled files: - data_store.dat -- the DataStore object - communicator.dat -- the MPICommunicator object - remd_runner.dat -- the MasterReplicaExchangeRunner object Other data (positions, velocities, etc) is stored in the file. """ # # data paths # data_dir = 'Data' backup_dir = os.path.join(data_dir, 'Backup') blocks_dir = os.path.join(data_dir, 'Blocks') data_store_filename = 'data_store.dat' data_store_path = os.path.join(data_dir, data_store_filename) data_store_backup_path = os.path.join(backup_dir, data_store_filename) communicator_filename = 'communicator.dat' communicator_path = os.path.join(data_dir, communicator_filename) communicator_backup_path = os.path.join(backup_dir, communicator_filename) remd_runner_filename = 'remd_runner.dat' remd_runner_path = os.path.join(data_dir, remd_runner_filename) remd_runner_backup_path = os.path.join(backup_dir, remd_runner_filename) system_filename = 'system.dat' system_path = os.path.join(data_dir, system_filename) system_backup_path = os.path.join(backup_dir, system_filename) run_options_filename = 'run_options.dat' run_options_path = os.path.join(data_dir, run_options_filename) run_options_backup_path = os.path.join(backup_dir, run_options_filename) net_cdf_filename_template = 'block_{:06d}.nc' net_cdf_path_template = os.path.join(blocks_dir, net_cdf_filename_template) traj_filename = 'trajectory.pdb' traj_path = os.path.join(data_dir, traj_filename) traj_backup_path = os.path.join(backup_dir, traj_filename) def __init__(self, n_atoms, n_replicas, pdb_writer, block_size=100): self._n_atoms = n_atoms self._n_replicas = n_replicas self._block_size = block_size self._cdf_data_set = None self._readonly_mode = False self._pdb_writer = pdb_writer self._current_stage = None self._current_block = None self._max_safe_block = -1 self._readonly_mode = False def __getstate__(self): # don't save some fields to disk excluded = ['_cdf_data_set'] return dict((k, v) for (k, v) in self.__dict__.iteritems() if not k in excluded) def __setstate__(self, state): # set _cdf_data_set to None self.__dict__ = state self._cdf_data_set = None def __del__(self): # close the _cdf_data_set when we go out of scope if self._cdf_data_set: self._cdf_data_set.close() # # properties # @property def n_replicas(self): return self._n_replicas @property def n_atoms(self): return self._n_atoms # # public methods #
[docs] def initialize(self, mode): """ Prepare to use the DataStore object. :param mode: mode to open in. Available modes are: - 'w' -- create a new directory structure and initialize the hd5 file - 'a' -- append to the existing files - 'r' -- open the file in read-only mode """ if mode == 'w': if os.path.exists(self.data_dir): raise RuntimeError('Data directory already exists') os.mkdir(self.data_dir) os.mkdir(self.blocks_dir) os.mkdir(self.backup_dir) self._current_block = 0 self._current_stage = 0 self._create_cdf_file() elif mode == 'a': block_path = self.net_cdf_path_template.format(self._current_block) if os.path.exists(block_path): self._cdf_data_set = cdf.Dataset(block_path, 'a') else: self._create_cdf_file() elif mode == 'r': self._current_block = 0 self._readonly_mode = True self._load_cdf_file_readonly() else: raise RuntimeError('Unknown value for mode={}'.format(mode))
[docs] def close(self): """Close the DataStore""" if self._cdf_data_set: self._cdf_data_set.close() self._cdf_data_set = None
[docs] def save_data_store(self): """Save this object to disk.""" with open(self.data_store_path, 'w') as store_file: pickle.dump(self, store_file)
[docs] def load_data_store(cls, load_backup=False): """Load the DataStore object from disk.""" path = cls.data_store_backup_path if load_backup else cls.data_store_path with open(path) as store_file: return pickle.load(store_file)
[docs] def save_communicator(self, comm): """Save the communicator to disk""" self._can_save() with open(self.communicator_path, 'w') as comm_file: pickle.dump(comm, comm_file)
[docs] def load_communicator(self): """Load the communicator from disk""" if self._readonly_mode: path = self.communicator_backup_path else: path = self.communicator_path with open(path) as comm_file: return pickle.load(comm_file)
[docs] def save_positions(self, positions, stage): """ Save the positions to disk. :param positions: n_replicas x n_atoms x 3 array :param stage: int stage to store """ self._can_save() self._handle_save_stage(stage) self._cdf_data_set.variables['positions'][..., stage] = positions
[docs] def load_positions(self, stage): """ Load positions from disk. :param stage: int stage to load """ self._handle_load_stage(stage) return self._cdf_data_set.variables['positions'][..., stage]
[docs] def load_positions_random_access(self, stage): """ Load positions from disk. :param stage: int stage to load This differs from :meth:`load_positions` in that you can positions from any stage, while :meth:`load_positions` can only move forward in time. However, this comes at a performance penalty. """ # get the block for this stage block = self._block_for_stage(stage) # if it's the current block, then just return the positions if block == self._current_block: return self._cdf_data_set.variables['positions'][..., stage] # otherwise open the file, grab the positions, and then close it else: path = self.net_cdf_path_template.format(block) with contextlib.closing(cdf.Dataset(path, 'r')) as dataset: return dataset.variables['positions'][..., stage]
[docs] def load_all_positions(self): """ Load all positions from disk. Warning, this could use a lot of memory. """ return np.concatenate([np.array(self.load_positions(i))[..., np.newaxis] for i in range(self.max_safe_frame())], axis=-1)
[docs] def iterate_positions(self, start=None, end=None): """ Iterate over the positions from disk. """ if start is None: start = 0 if end is None: end = self.max_safe_frame() for i in range(start, end): yield self.load_positions(i)
[docs] def save_velocities(self, velocities, stage): """ Save velocities to disk. :param velocities: n_replicas x n_atoms x 3 array :param stage: int stage to store """ self._can_save() self._handle_save_stage(stage) self._cdf_data_set.variables['velocities'][..., stage] = velocities
[docs] def load_velocities(self, stage): """ Load velocities from disk. :param stage: int stage to load """ self._handle_load_stage(stage) return self._cdf_data_set.variables['velocities'][..., stage]
[docs] def load_all_velocities(self): """ Load all velocities from disk. Warning, this could use a lot of memory. """ return np.concatenate([np.array(self.load_velocities(i))[..., np.newaxis] for i in range(self.max_safe_frame())], axis=-1)
[docs] def save_states(self, states, stage): """ Save states to disk. :param states: list of SystemStage objects to store :param stage: int stage to store """ self._can_save() self._handle_save_stage(stage) positions = np.array([s.positions for s in states]) velocities = np.array([s.velocities for s in states]) alphas = np.array([s.alpha for s in states]) energies = np.array([ for s in states]) self.save_positions(positions, stage) self.save_velocities(velocities, stage) self.save_alphas(alphas, stage) self.save_energies(energies, stage)
[docs] def load_states(self, stage): """ Load states from disk :param stage: integer stage to load :return: list of SystemState objects """ self._handle_load_stage(stage) positions = self.load_positions(stage) velocities = self.load_velocities(stage) alphas = self.load_alphas(stage) energies = self.load_energies(stage) states = [] for i in range(self._n_replicas): s = state.SystemState(positions[i], velocities[i], alphas[i], energies[i]) states.append(s) return states
def append_traj(self, state, stage): pdb_string = self._pdb_writer.get_pdb_string(state.positions, stage) with open(self.traj_path, 'a') as traj_file: traj_file.write(pdb_string)
[docs] def save_alphas(self, alphas, stage): """ Save alphas to disk. :param alphas: n_replicas array :param stage: int stage to store """ self._can_save() self._handle_save_stage(stage) self._cdf_data_set.variables['alphas'][..., stage] = alphas
[docs] def load_alphas(self, stage): """ Load alphas from disk. :param stage: int stage to load from disk :return: n_replicas array """ self._handle_load_stage(stage) return self._cdf_data_set.variables['alphas'][..., stage]
[docs] def load_all_alphas(self): """ Load all alphas from disk. Warning, this could use a lot of memory. """ return np.concatenate([np.array(self.load_alphas(i))[..., np.newaxis] for i in range(self.max_safe_frame())], axis=-1)
[docs] def save_energies(self, energies, stage): """ Save energies to disk. :param energies: n_replicas array :param stage: int stage to save """ self._can_save() self._handle_save_stage(stage) self._cdf_data_set.variables['energies'][..., stage] = energies
[docs] def load_energies(self, stage): """ Load energies from disk. :param stage: int stage to load :return: n_replicas array """ self._handle_load_stage(stage) return self._cdf_data_set.variables['energies'][..., stage]
[docs] def load_all_energies(self): """ Load all energies from disk. Warning, this could use a lot of memory """ return np.concatenate([np.array(self.load_energies(i))[..., np.newaxis] for i in range(self.max_safe_frame())], axis=-1)
def save_energy_matrix(self, energy_matrix, stage): self._can_save() self._handle_save_stage(stage) self._cdf_data_set.variables['energy_matrix'][..., stage] = energy_matrix def load_energy_matrix(self, stage): self._handle_load_stage(stage) return self._cdf_data_set.variables['energy_matrix'][..., stage] def load_all_energy_matrices(self): return np.concatenate([np.array(self.load_energy_matrix(i))[..., np.newaxis] for i in range(self.max_safe_frame())], axis=-1)
[docs] def save_permutation_vector(self, perm_vec, stage): """ Save permutation vector to disk. :param perm_vec: n_replicas array of int :param stage: int stage to store """ self._can_save() self._handle_save_stage(stage) self._cdf_data_set.variables['permutation_vectors'][..., stage] = perm_vec
[docs] def load_permutation_vector(self, stage): """ Load permutation vector from disk. :param stage: int stage to load :return: n_replicas array of int """ self._handle_load_stage(stage) return self._cdf_data_set.variables['permutation_vectors'][..., stage]
[docs] def load_all_permutation_vectors(self): """ Load all permutation vector from disk. Warning, this might take a lot of memory """ return np.concatenate([np.array(self.load_permutation_vector(i))[..., np.newaxis] for i in range(self.max_safe_frame())], axis=-1)
[docs] def iterate_permutation_vectors(self, start=None, end=None): """ Iterate over the permutation vectors from disk. """ if start is None: start = 0 if end is None: end = self.max_safe_frame() for i in range(start, end): yield self.load_permutation_vector(i)
[docs] def save_acceptance_probabilities(self, accept_probs, stage): """ Save acceptance probabilities vector to disk. :param accept_probs: n_replicas array of int :param stage: int stage to store """ self._can_save() self._handle_save_stage(stage) self._cdf_data_set.variables['acceptance_probabilities'][..., stage] = accept_probs
[docs] def load_acceptance_probabilities(self, stage): """ Load acceptance probability vector from disk. :param stage: int stage to load :return: n_replica_pairs array of int """ self._handle_load_stage(stage) return self._cdf_data_set.variables['acceptance_probabilities'][..., stage]
[docs] def load_all_acceptance_probabilities(self): """ Load all acceptance probabilities from disk Warning, this might take a lot of memory """ return np.concatenate([np.array(self.load_acceptance_probabilities(i))[..., np.newaxis] for i in range(self.max_safe_frame())], axis=-1)
[docs] def save_remd_runner(self, runner): """Save replica runner to disk""" self._can_save() with open(self.remd_runner_path, 'w') as runner_file: pickle.dump(runner, runner_file)
[docs] def load_remd_runner(self): """Load replica runner from disk""" path = self.remd_runner_backup_path if self._readonly_mode else self.remd_runner_path with open(path) as runner_file: return pickle.load(runner_file)
def save_system(self, system): self._can_save() with open(self.system_path, 'w') as system_file: pickle.dump(system, system_file) def load_system(self): path = self.system_backup_path if self._readonly_mode else self.system_path with open(path) as system_file: return pickle.load(system_file) def save_run_options(self, run_options): self._can_save() with open(self.run_options_path, 'w') as options_file: pickle.dump(run_options, options_file) def load_run_options(self): path = self.run_options_backup_path if self._readonly_mode else self.run_options_path with open(path) as options_file: return pickle.load(options_file)
[docs] def backup(self, stage): """ Backup all files to Data/Backup. :param stage: int stage Backup will occur if `stage % backup_freq == 0` """ self._can_save() if not stage % self._block_size: self._backup(self.communicator_path, self.communicator_backup_path) self._backup(self.data_store_path, self.data_store_backup_path) self._backup(self.remd_runner_path, self.remd_runner_backup_path) self._backup(self.system_path, self.system_backup_path) self._backup(self.run_options_path, self.run_options_backup_path) # # private methods #
def _create_cdf_file(self): # create the file path = self.net_cdf_path_template.format(self._current_block) self._cdf_data_set = cdf.Dataset(path, 'w', format='NETCDF4') # setup dimensions self._cdf_data_set.createDimension('n_replicas', self._n_replicas) self._cdf_data_set.createDimension('n_replica_pairs', self._n_replicas - 1) self._cdf_data_set.createDimension('n_atoms', self._n_atoms) self._cdf_data_set.createDimension('cartesian', 3) self._cdf_data_set.createDimension('timesteps', None) # setup variables self._cdf_data_set.createVariable('positions', float, ['n_replicas', 'n_atoms', 'cartesian', 'timesteps'], zlib=True, fletcher32=True, shuffle=True, complevel=9) self._cdf_data_set.createVariable('velocities', float, ['n_replicas', 'n_atoms', 'cartesian', 'timesteps'], zlib=True, fletcher32=True, shuffle=True, complevel=9) self._cdf_data_set.createVariable('alphas', float, ['n_replicas', 'timesteps'], zlib=True, fletcher32=True, shuffle=True, complevel=9) self._cdf_data_set.createVariable('energies', float, ['n_replicas', 'timesteps'], zlib=True, fletcher32=True, shuffle=True, complevel=9) self._cdf_data_set.createVariable('permutation_vectors', int, ['n_replicas', 'timesteps'], zlib=True, fletcher32=True, shuffle=True, complevel=9) self._cdf_data_set.createVariable('energy_matrix', float, ['n_replicas', 'n_replicas', 'timesteps'], zlib=True, fletcher32=True, shuffle=True, complevel=9) self._cdf_data_set.createVariable('acceptance_probabilities', float, ['n_replica_pairs', 'timesteps'], zlib=True, fletcher32=True, shuffle=True, complevel=9) def _backup(self, src, dest): if os.path.exists(src): try: shutil.copy(src, dest) except IOError: # if we encounter an error, wait five seconds and try again time.sleep(5) shutil.copy(src, dest) def _can_save(self): if self._readonly_mode: raise RuntimeError('Cannot save in safe mode.') def _handle_save_stage(self, stage): if stage < self._current_stage: raise RuntimeError('Cannot go back in time') self._current_stage = stage block_index = self._block_for_stage(stage) if block_index > self._current_block: self.close() self._max_safe_block = self._current_block self._current_block = block_index self._create_cdf_file() def _handle_load_stage(self, stage): block_index = self._block_for_stage(stage) if self._readonly_mode: if block_index > self._max_safe_block: raise RuntimeError('Tried to read an unsafe block') else: if block_index < self._current_block: raise RuntimeError('Cannot go back in time') if block_index != self._current_block: self.close() self._current_block = block_index self._load_cdf_file_readonly() def _block_for_stage(self, stage): return stage / self._block_size def _load_cdf_file_readonly(self): path = self.net_cdf_path_template.format(self._current_block) self._cdf_data_set = cdf.Dataset(path, 'r') def max_safe_frame(self): return (self._max_safe_block + 1) * self._block_size