Created
November 3, 2022 22:16
-
-
Save jmbr/47c9ac5b20adbd3c1777fbaf844e2302 to your computer and use it in GitHub Desktop.
Code for experiments on Bayesian optimization in dimensionally-reduced models
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from abc import ABC, abstractmethod | |
from typing import Optional | |
import numpy as np | |
from diffusion_maps import diffusion_maps | |
from mapping import Mapping | |
class Embedding(ABC): | |
"""Embedding of a point cloud. | |
""" | |
points: Optional[np.ndarray] = None | |
images: Optional[np.ndarray] = None | |
@abstractmethod | |
def embed(self, points: np.ndarray) -> None: | |
pass | |
@abstractmethod | |
def __call__(self, X: np.ndarray) -> np.ndarray: | |
pass | |
@abstractmethod | |
def preimage(self, Y: np.ndarray) -> np.ndarray: | |
pass | |
class IdentityEmbedding(Embedding): | |
"""Trivial embedding. | |
""" | |
def __init__(self) -> None: | |
self.points: Optional[np.ndarray] = None | |
self.images: Optional[np.ndarray] = None | |
def embed(self, points: np.ndarray) -> None: | |
if self.points is None: | |
self.points = points.copy() | |
else: | |
self.points = np.concatenate((self.points, points)) | |
self.images = self.points | |
def __call__(self, X: np.ndarray) -> np.ndarray: | |
return np.atleast_2d(X) | |
def preimage(self, Y: np.ndarray) -> np.ndarray: | |
return np.atleast_2d(Y) | |
class DiffusionMapsEmbedding(Embedding): | |
"""Embedding using diffusion maps. | |
""" | |
def __init__(self, epsilon: float, | |
codomain_dim: int) -> None: | |
self.epsilon = epsilon | |
self.codomain_dim = codomain_dim | |
self.points: Optional[np.ndarray] = None | |
self.images: Optional[np.ndarray] = None | |
self.mapping: Optional[Mapping] = None | |
self.inverse_mapping: Optional[Mapping] = None | |
def embed(self, points: np.ndarray) -> None: | |
if self.points is None: | |
self.points = points.copy() | |
else: | |
self.points = np.concatenate((self.points, points)) | |
ew, ev = diffusion_maps(self.points, self.epsilon**2, | |
num_eigenpairs=1 + self.codomain_dim) | |
self.ew = ew.copy() | |
self.ev = ev.copy() | |
self.images = ev * ew[np.newaxis, :]**self.epsilon | |
self.mapping = Mapping(self.points, self.images) | |
self.inverse_mapping = Mapping(self.images, self.points) | |
def __call__(self, X: np.ndarray) -> np.ndarray: | |
"""Return diffusion map coordinates. | |
""" | |
assert self.mapping is not None | |
return self.mapping(X) | |
def preimage(self, Y: np.ndarray) -> np.ndarray: | |
"""Return preimage of diffusion map coordinates. | |
""" | |
assert self.inverse_mapping is not None | |
return self.inverse_mapping(Y) | |
def copy(self): | |
dme = DiffusionMapsEmbedding(self.epsilon, self.codomain_dim) | |
dme.embed(self.points.copy()) | |
return dme |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import List, Optional | |
import numpy as np | |
import scipy | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
from embedding import Embedding | |
from point_sampler import PointSampler | |
from optimization_problem import (AcquisitionFunctionFactory, | |
SurrogateFunction, | |
ObjectiveFunction) | |
DEFAULT_NUM_STEPS: int = 0 | |
DEFAULT_TIME_STEP_LENGTH: float = 1e-8 | |
def relax_points(objective_function: ObjectiveFunction, points: np.ndarray, | |
num_steps: int = DEFAULT_NUM_STEPS, | |
time_step_length: float = DEFAULT_TIME_STEP_LENGTH) \ | |
-> np.ndarray: | |
"""Relax points using gradient descent. | |
""" | |
relaxed_points = np.empty_like(points) | |
for i, point in enumerate(points): | |
for n in range(num_steps): | |
grad = objective_function.gradient(point).squeeze() | |
point -= time_step_length * grad | |
relaxed_points[i, :] = point | |
return relaxed_points | |
class ExperimentError(Exception): | |
pass | |
class BaseExperiment: | |
def __init__(self, embedding: Embedding, | |
point_sampler: PointSampler, | |
objective_function: ObjectiveFunction, | |
acquisition_function_factory: AcquisitionFunctionFactory, | |
points_per_iteration: int, | |
radius: float, | |
initial_point: np.ndarray) -> None: | |
self.embedding: Embedding = embedding | |
self.point_sampler: PointSampler = point_sampler | |
self.points_per_iteration: int = points_per_iteration | |
self.radius: float = radius # XXX radius belongs to PointSampler | |
self.initial_point: np.ndarray = initial_point.copy() | |
self.initial_points = [] | |
self.optima: List[np.ndarray] = [] | |
self.acquisition_function_factory: AcquisitionFunctionFactory \ | |
= acquisition_function_factory | |
self.objective_function: ObjectiveFunction = objective_function | |
def __iter__(self): | |
return self | |
def __next__(self): | |
new_points = self.sample_new_points() | |
self.embedding.embed(new_points) | |
points, images = self.embedding.points, self.embedding.images | |
self.acquisition_function = self.acquisition_function_factory.get( | |
self.objective_function, points, images) | |
bounds = list(zip(images.min(axis=0) - 0.0025, # XXX hardcoded | |
images.max(axis=0) + 0.0025)) | |
results = scipy.optimize.minimize( | |
self.acquisition_function, x0=images[-1, :], | |
jac=self.acquisition_function.gradient, | |
bounds=bounds) | |
print(results) | |
print() | |
if results.success is False: | |
raise ExperimentError(results) | |
optimum = self.embedding.preimage(results.x) | |
self.optima.append(relax_points( | |
self.objective_function, np.atleast_2d(optimum), 2, 1e-3)) | |
return self | |
class ExperimentMetropolis(BaseExperiment): | |
def sample_new_points(self): | |
initial_point = self.optima[-1] if self.optima else self.initial_point | |
self.latest_raw_points = self.point_sampler.sample_neighbors( | |
initial_point, self.radius, self.points_per_iteration) | |
return self.latest_raw_points.copy() | |
class ExperimentMetropolisPlusRelaxation(ExperimentMetropolis): | |
def sample_new_points(self): | |
self.latest_relaxed_points = relax_points( | |
self.objective_function, super().sample_new_points()) | |
return self.latest_relaxed_points.copy() | |
class ExperimentMetropolisPlusRelaxationOnlyOnce(ExperimentMetropolis): | |
def sample_new_points(self): | |
if self.optima: | |
# Just add the latest optimum into the available samples. | |
self.initial_points.append(self.optima[-1].squeeze()) | |
new_point = relax_points(self.objective_function, | |
self.optima[-1], | |
num_steps=1) | |
# return np.atleast_2d(self.optima[-1]) | |
return new_point.copy() | |
else: | |
# First iteration. | |
self.initial_points.append(self.initial_point.squeeze()) | |
new_points = self.point_sampler.sample_neighbors( | |
self.initial_point, self.radius, self.points_per_iteration) | |
self.latest_raw_points = new_points.copy() | |
new_points = relax_points(self.objective_function, new_points) | |
self.latest_relaxed_points = new_points.copy() | |
return new_points | |
class ExperimentPlainBayesianOptimization(BaseExperiment): | |
def sample_new_points(self): | |
print('points\n', self.embedding.points) | |
if self.optima: | |
# # Just add the latest optimum into the available samples. | |
# new_point = relax_points(self.objective_function, | |
# self.optima[-1], | |
# num_steps=1) | |
# # return np.atleast_2d(self.optima[-1]) | |
# return new_point.copy() | |
self.initial_points.append(self.optima[-1].squeeze()) | |
print('.') | |
return self.optima[-1].copy() | |
else: | |
# First iteration. | |
self.initial_points.append(self.initial_point.squeeze()) | |
new_points = self.point_sampler.sample_neighbors( | |
self.initial_point, self.radius, self.points_per_iteration) | |
self.latest_raw_points = new_points.copy() | |
new_points = relax_points(self.objective_function, new_points) | |
self.latest_relaxed_points = new_points.copy() | |
return new_points |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import List | |
import numpy as np | |
from sklearn.gaussian_process import GaussianProcessRegressor | |
class Mapping: | |
"""Mapping between two sets of points. | |
""" | |
def __init__(self, X: np.ndarray, Y: np.ndarray) -> None: | |
self.interpolators: List[GaussianProcessRegressor] = [] | |
for y in Y.T: | |
interpolator = GaussianProcessRegressor(copy_X_train=False) | |
interpolator.fit(X, y) | |
self.interpolators.append(interpolator) | |
def __call__(self, X: np.ndarray) -> np.ndarray: | |
X = np.atleast_2d(X) | |
values = np.zeros((X.shape[0], len(self.interpolators))) | |
for i, interpolator in enumerate(self.interpolators): | |
values[:, i] = interpolator.predict(X) | |
return values | |
def sample(self, X: np.ndarray, num_samples: int) -> np.ndarray: | |
Z = np.zeros((num_samples, len(self.interpolators))) | |
for i, interpolator in enumerate(self.interpolators): | |
Z[:, i] = interpolator.sample_y(np.atleast_2d(X), num_samples) | |
return Z |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Metropolis sampler module. | |
""" | |
import itertools | |
import math | |
from typing import Callable, Any | |
import numpy as np | |
class Metropolis: | |
"""Implementation of the Metropolis-Hastings algorithm. | |
""" | |
def __init__(self, potential: Callable[[np.ndarray, Any], float], | |
temperature: float, | |
delta: float, | |
initial_point: np.ndarray) -> None: | |
self.potential = potential | |
self.temperature = temperature | |
self.delta = delta | |
self.initial_point = initial_point.copy() | |
self.current_point = initial_point.copy() | |
self.dim = initial_point.shape[1] | |
self.probability = self._compute_probability(initial_point) | |
self.accepted = 1 | |
self.total = 1 | |
def __repr__(self) -> str: | |
return (f'{self.__class__.__name__}(potential={self.potential}, ' | |
+ f'temperature={self.temperature}, delta={self.delta}, ' | |
+ f'initial_point={self.current_point})') | |
def __iter__(self) -> 'Metropolis': | |
return self | |
def __next__(self) -> np.ndarray: | |
candidate = self._generate_candidate() | |
p_old = self.probability | |
p_new = self._compute_probability(candidate) | |
if p_new >= p_old or np.random.rand() * p_old < p_new: | |
self.current_point = candidate | |
self.probability = p_new | |
self.accepted += 1 | |
self.total += 1 | |
return self.current_point | |
def _generate_candidate(self) -> np.ndarray: | |
r = self.delta * (2 * np.random.rand(self.dim) - np.ones(self.dim)) | |
return self.current_point + r | |
def get_acceptance_ratio(self) -> float: | |
"""Return current acceptance ratio. | |
""" | |
return self.accepted / self.total | |
def _compute_probability(self, point: np.ndarray) -> float: | |
return math.exp(-self.potential(point) / self.temperature) | |
def draw_samples(self, num_samples: int, step: int = 1) -> np.ndarray: | |
"""Sample a number of points from the chain. | |
""" | |
points = np.zeros((num_samples // step, self.dim)) | |
iterator = itertools.islice(self, 0, num_samples, step) | |
for i, point in enumerate(iterator): | |
points[i, :] = point | |
return points |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from sklearn.neighbors import DistanceMetric | |
from sklearn.gaussian_process import GaussianProcessRegressor | |
class ObjectiveFunction: | |
def __init__(self, spring_constant1: float, spring_constant2: float, | |
angle: float) -> None: | |
self.k1 = spring_constant1 | |
self.k2 = spring_constant2 | |
self.theta = angle | |
def __call__(self, X: np.ndarray) -> np.ndarray: | |
X = np.atleast_2d(X) | |
return (self.k1 * (np.arctan2(X[:, 1], X[:, 0]) - self.theta)**2 | |
+ self.k2 * (X[:, 0]**2 + X[:, 1]**2 - 1.0)**2) | |
def gradient(self, X: np.ndarray) -> np.ndarray: | |
X = np.atleast_2d(X) | |
k1, k2 = self.k1, self.k2 | |
return np.array([-k1 * (4 * np.arctan2(X[:, 1], X[:, 0]) - np.pi) | |
* X[:, 1] / (2 * X[:, 0]**2 + 2 * X[:, 1]**2) | |
+ 4 * k2 * (X[:, 0]**2 + X[:, 1]**2 - 1) * X[:, 0], | |
k1 * (4 * np.arctan2(X[:, 1], X[:, 0]) - np.pi) | |
* X[:, 0] / (2 * X[:, 0]**2 + 2 * X[:, 1]**2) | |
+ 4 * k2 * (X[:, 0]**2 + X[:, 1]**2 - 1) * X[:, 1]]) | |
class SurrogateFunction: | |
def __init__(self, X, y): | |
# self.gpr = GaussianProcessRegressor(copy_X_train=False) | |
self.gpr = GaussianProcessRegressor(normalize_y=True) | |
self.gpr.fit(X, y) | |
self.length_scale = self.gpr.kernel_.get_params()['k2__length_scale'] | |
def __call__(self, X: np.ndarray, return_std: bool = False): | |
result = self.gpr.predict(np.atleast_2d(X), | |
return_std=return_std) | |
if return_std is True: | |
return (result[0].squeeze(), | |
result[1].squeeze()) | |
else: | |
return result.squeeze() | |
def gradient(self, X: np.ndarray) -> np.ndarray: | |
"""Evaluate the gradient of a Gaussian process at a single point. | |
""" | |
print('gradient', X) | |
x = np.atleast_2d(X) | |
gpr = self.gpr | |
kxX = gpr.kernel_(x, gpr.X_train_) | |
length_scale = gpr.kernel_.get_params()['k2__length_scale'] | |
x_minus_X = x - gpr.X_train_ | |
return ((kxX * gpr.alpha_ * (-x_minus_X.T / length_scale**2)).sum(axis=1) | |
* gpr._y_train_std) | |
class AcquisitionFunction: | |
def __init__(self, | |
objective_function: ObjectiveFunction, | |
points: np.ndarray, images: np.ndarray, | |
kappa: float, tau: float) -> None: | |
self.points = points | |
self.images = images | |
self.objective_function = objective_function | |
self.surrogate_function = SurrogateFunction( | |
images, objective_function(points)) | |
self.kappa = kappa | |
self.tau = tau | |
self.metric = DistanceMetric.get_metric('euclidean') | |
self.analyze_distances() | |
def analyze_distances(self) -> None: | |
distances = np.triu(self.metric.pairwise(self.images)) | |
distances = distances[distances > 0.0] | |
self.median_distance = np.median(distances) | |
self.std_distance = np.std(distances) | |
def _distance_to_data(self, Y: np.ndarray) -> float: | |
return self.metric.pairwise(np.atleast_2d(Y), self.images).min() | |
def __call__(self, Y: np.ndarray) -> float: | |
Y = np.atleast_2d(Y) | |
mean, std = self.surrogate_function(Y, True) | |
distance_to_data = self._distance_to_data(Y[-100:, :]) # XXX constant | |
self.analyze_distances() | |
return (mean - self.kappa * std | |
+ self.tau * distance_to_data / self.median_distance) | |
def gradient(self, X: np.ndarray) -> np.ndarray: | |
"""Evaluate gradient of a Gaussian process at a single point.""" | |
assert self.kappa == 0.0 and self.tau == 0.0 | |
return self.surrogate_function.gradient(X) | |
class AcquisitionFunctionFactory: | |
def __init__(self, kappa: float, tau: float) -> None: | |
self.kappa = kappa | |
self.tau = tau | |
def get(self, objective_function: ObjectiveFunction, | |
points: np.ndarray, images: np.ndarray) -> AcquisitionFunction: | |
return AcquisitionFunction( | |
objective_function, points, images, self.kappa, self.tau) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Optional | |
from abc import ABC, abstractmethod | |
import random | |
import numpy as np | |
import scipy.spatial | |
from metropolis import Metropolis | |
from optimization_problem import ObjectiveFunction | |
class PointSampler(ABC): | |
@abstractmethod | |
def sample_points(self, num_points: int) -> np.ndarray: | |
pass | |
@abstractmethod | |
def sample_neighbors(self, point: np.ndarray, radius: float, | |
max_neighbors: Optional[int] = None) -> np.ndarray: | |
pass | |
def make_synthetic_data(num_points, noise_variance: float, | |
shuffle: bool = True) -> np.ndarray: | |
"""Sample num_points from quarter-circle with additive noise. | |
""" | |
t = np.linspace(0.0, np.pi / 2.0, num_points) | |
points = np.array([np.cos(t), np.sin(t)]).T | |
covariance_matrix = noise_variance * np.eye(points.shape[-1]) | |
noise = np.random.multivariate_normal([0, 0], covariance_matrix, | |
size=num_points) | |
permutation = np.arange(num_points) | |
if shuffle is True: | |
random.shuffle(permutation) | |
points = points[permutation, :] | |
return points + noise | |
class PoolPointSampler(PointSampler): | |
"""Sample points from a generated data set. | |
""" | |
def __init__(self, total_points: int, noise_variance: float) \ | |
-> None: | |
self.pool = make_synthetic_data(total_points, noise_variance, | |
shuffle=False) | |
self.kdtree = scipy.spatial.cKDTree(self.pool) | |
def sample_points(self, num_points: int) -> np.ndarray: | |
permutation = np.random.permutation(self.pool.shape[0]) | |
return self.pool[permutation[:num_points], :] | |
def sample_neighbors(self, point: np.ndarray, radius: float, | |
max_neighbors: Optional[int] = None) -> np.ndarray: | |
point = np.atleast_2d(point) | |
new_indices = self.kdtree.query_ball_point(point.squeeze(), r=radius) | |
random.shuffle(new_indices) | |
return self.pool[new_indices[:max_neighbors], :] | |
class MetropolisPointSampler(PointSampler): | |
def __init__(self, objective_function: ObjectiveFunction, | |
temperature: float, delta: float, | |
initial_point: np.ndarray) -> None: | |
self.objective_function = objective_function | |
self.temperature = temperature | |
self.delta = delta | |
self.initial_point = np.atleast_2d(initial_point) | |
def sample_points(self, num_points: int) -> np.ndarray: | |
metropolis = Metropolis( | |
self.objective_function, self.temperature, self.delta, | |
self.initial_point) | |
return metropolis.draw_samples(10 * num_points, step=10) | |
def sample_neighbors(self, point: np.ndarray, | |
radius: float = None, | |
max_neighbors: Optional[int] = None) -> np.ndarray: | |
self.initial_point = np.atleast_2d(point) | |
metropolis = Metropolis( | |
self.objective_function, self.temperature, self.delta, | |
np.atleast_2d(point)) | |
return metropolis.draw_samples(10 * max_neighbors, step=10) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment