import numpy as np
from scipy import interpolate
import math
from collections import namedtuple
[docs]class AcceptanceCounter(object):
'''
Class to keep track of acceptance rates.
'''
def __init__(self, n_replicas):
self.n_replicas = n_replicas
self.successes = None
self.attempts = None
self.reset()
def reset(self):
self.successes = np.zeros(self.n_replicas - 1)
self.attempts = np.zeros(self.n_replicas - 1)
def update(self, i, accepted):
assert i in range(self.n_replicas - 1)
self.attempts[i] += 1
if accepted:
self.successes[i] += 1
def get_acceptance_probabilities(self):
return self.successes / (self.attempts + 1e-9)
class NullAdaptor(AcceptanceCounter):
def __init__(self, n_replicas):
AcceptanceCounter.__init__(self, n_replicas)
self.reset()
def update(self, i, accepted):
AcceptanceCounter.update(self, i, accepted)
def adapt(self, previous_lambdas, step):
return previous_lambdas
def reset(self):
AcceptanceCounter.reset(self)
[docs]class EqualAcceptanceAdaptor(AcceptanceCounter):
'''
Adaptor based on making acceptance rates uniform.
:param n_replicas: number of replicas
:param min_acc_prob: all acceptence probabilities below this value will be
raised to this value
'''
def __init__(self, n_replicas, adaptation_policy, min_acc_prob=0.1):
AcceptanceCounter.__init__(self, n_replicas)
self.adaptation_policy = adaptation_policy
self.min_acc_prob = min_acc_prob
self.accept_probs = None
self.t_lens = None
self.reset()
[docs] def update(self, i, accepted):
'''
Update adaptor with exchange.
:param i: index of first replica; second replica is i+1
:param accepted: True if the exchange was accepted
'''
AcceptanceCounter.update(self, i, accepted)
[docs] def adapt(self, previous_lambdas, step):
'''
Compute new optimal values of lambda.
:param previous_lambdas: a list of the previous lambda values
:param step: the current simulation step
:return: a list of the new, optimized lambda values
'''
should_adapt = self.adaptation_policy.should_adapt(step)
if should_adapt.adapt_now:
self._compute_accept_probs()
self._compute_t_len()
# put the t_lens on a grid
lambda_grid = np.linspace(0., 1., 5000)
t_lens = np.interp(lambda_grid, previous_lambdas, self.t_lens)
# compute the desired t_lens based on equal spacing
even_spacing = np.linspace(0, t_lens[-1], self.n_replicas)
# compute the values of lambda that will give the desired evenly spaced
# t_lens
new_lambdas = np.interp(even_spacing[1:-1], t_lens, lambda_grid)
new_lambdas = [x for x in new_lambdas]
new_lambdas = [0.] + new_lambdas + [1.]
else:
new_lambdas = previous_lambdas
if should_adapt.reset_now:
self.reset()
return new_lambdas
[docs] def reset(self):
'''
Forget about any previous updates.
Resets all internal counters and statistics to zero.
'''
AcceptanceCounter.reset(self)
self.accept_probs = None
self.t_lens = None
def _compute_accept_probs(self):
# default to 50 percent if there hasn't been a trial
self.successes[self.attempts == 0] = 1.
self.attempts[self.attempts == 0] = 2.
self.accept_probs = self.successes / self.attempts
# set minimum percentage
self.accept_probs[self.accept_probs < self.min_acc_prob] = self.min_acc_prob
def _compute_t_len(self):
# compute the t_len between adjacent pairs
delta_ts = [math.sqrt(-2.0 * math.log(acc)) for acc in self.accept_probs]
# compute a running total
t_lens = [0.]
total = 0.
for dt in delta_ts:
total += dt
t_lens.append(total)
self.t_lens = t_lens
class FluxAdaptor(AcceptanceCounter):
def __init__(self, n_replicas, adaptation_policy, smooth_factor=0.5):
AcceptanceCounter.__init__(self, n_replicas)
self.adaptation_policy = adaptation_policy
assert smooth_factor > 0, 'A small, positive smoothing factor is required.'
self.smooth_factor = smooth_factor
self.up_state = None
self.down_state = None
self.n_up = None
self.n_down = None
self.reset()
def update(self, i, accepted):
# update the acceptance probabilities
AcceptanceCounter.update(self, i, accepted)
if accepted:
# swap the states
self.up_state[[i, i + 1]] = self.up_state[[i + 1, i]]
self.down_state[[i, i + 1]] = self.down_state[[i + 1, i]]
# set the values at the end
self.up_state[0] = 1
self.down_state[0] = 0
self.up_state[-1] = 0
self.down_state[-1] = 1
# increment the counts
self.n_up += self.up_state
self.n_down += self.down_state
def adapt(self, previous_lambdas, step):
should_adapt = self.adaptation_policy.should_adapt(step)
if should_adapt.adapt_now:
f = self._compute_f()
f = self._make_f_monotonic(f)
f = self._apply_smoothing(f)
gx, gy = self._compute_g(f, previous_lambdas)
new_lambdas = self._compute_new_lambdas(gx, gy).tolist()
else:
new_lambdas = previous_lambdas
if should_adapt.reset_now:
self.reset()
return new_lambdas
def reset(self):
AcceptanceCounter.reset(self)
self.up_state = np.zeros(self.n_replicas)
self.up_state[0] = 1
self.down_state = np.zeros(self.n_replicas)
self.down_state[-1] = 1
self.n_up = np.zeros(self.n_replicas)
self.n_down = np.zeros(self.n_replicas)
def _compute_f(self):
total = self.n_up + self.n_down
total[total == 0] = 1 # prevent division by zero if we have no samples
return self.n_down / total
def _make_f_monotonic(self, f):
# compute the derivative and force it to be positive
diff = np.zeros_like(f)
diff[1:] = f[1:] - f[:-1]
diff[diff < 0] = 0
# integrate and rescale into the correct range
f = np.cumsum(diff)
f = f / f[-1]
return f
def _apply_smoothing(self, f):
smooth = np.linspace(0, 1, self.n_replicas)
return (1.0 - self.smooth_factor) * f + self.smooth_factor * smooth
def _compute_g(self, f, previous_lambdas):
gy = np.array(previous_lambdas)
gx = f
return gx, gy
def _compute_new_lambdas(self, gx, gy):
f = interpolate.interp1d(gx, gy)
samples = np.linspace(0, 1, self.n_replicas)
return f(samples)
class SwitchingCompositeAdaptor(object):
def __init__(self, switching_time, first_adaptor, second_adaptor):
self.switching_time = switching_time
self.first_adaptor = first_adaptor
self.second_adaptor = second_adaptor
def update(self, i, accepted):
self.first_adaptor.update(i, accepted)
self.second_adaptor.update(i, accepted)
def adapt(self, previous_lambdas, step):
lambdas_from_first = self.first_adaptor.adapt(previous_lambdas, step)
lambdas_from_second = self.second_adaptor.adapt(previous_lambdas, step)
if step <= self.switching_time:
return lambdas_from_first
else:
return lambdas_from_second
def reset(self):
self.first_adaptor.reset()
self.second_adaptor.reset()
def get_acceptance_probabilities(self):
return self.first_adaptor.get_acceptance_probabilities()
[docs]class AdaptationPolicy(object):
'''
Repeat adaptation on a regular schedule with an optional burn-in and
increasing adaptation times.
:param growth_factor: increase adapt_every by a factor of growth_factor
every adaptation
:param burn_in: number of steps to ignore at the beginning
:param adapt_every: how frequently to adapt (in picoseconds)
'''
# named tuple to hold the results
AdaptationRequired = namedtuple('AdaptationRequired', 'adapt_now reset_now')
def __init__(self, growth_factor, burn_in, adapt_every):
self.growth_factor = growth_factor
self.burn_in = burn_in
self.adapt_every = adapt_every
self.next_adapt = adapt_every + burn_in
[docs] def should_adapt(self, step):
'''
Is adaptation required?
:param step: the current simulation step
:return: an :class:`AdaptationPolicy.AdaptationRequired` object
indicating if adaptation or resetting is necessary
'''
if self.burn_in:
if step >= self.burn_in:
self.burn_in = None
result = self.AdaptationRequired(False, True)
else:
result = self.AdaptationRequired(False, False)
else:
if step >= self.next_adapt:
result = self.AdaptationRequired(True, True)
self.adapt_every *= self.growth_factor
self.next_adapt += self.adapt_every
else:
result = self.AdaptationRequired(False, False)
return result