Skip to content

Instantly share code, notes, and snippets.

@PaperclipBadger
Created November 18, 2016 11:52
Show Gist options
  • Save PaperclipBadger/6cd29e122686bd2b0b4d5ac981caeb78 to your computer and use it in GitHub Desktop.
Save PaperclipBadger/6cd29e122686bd2b0b4d5ac981caeb78 to your computer and use it in GitHub Desktop.
"""Uses tensorflow to approximate a sin wave with a Gaussian Process"""
from __future__ import print_function
from functools import partial
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import inspect
import numbers
try:
# Use xrange instead of range in python2
range = xrange
except NameError:
pass
def logspace(exp, *args):
"""Converts the given arguments from logspace.
Returns a wrapper that when its wrapped function is called, replaces
the specified arguments with their exponentials.
Args:
exp (Callable[Number]): the exponential function to use.
*args (List[int|str]): the arguments to replace. Integers will be
interpreted as the index (from 0) of a positional argument, while
strings will be interpreted as the names of arguments.
Returs:
(Callable[[Callable[A, B]], Callable[A, B]]): a wrapper that takes
the given arguments out of logspace when the wrapped function is
called.
Examples:
You can specify positional arguments by their index (from 0)...
>>> @logspace(np.exp, 0)
... def floats(x, y):
... print("{:.3f} {:.3f}".format(x, y))
>>> floats(0, 0)
1.000 0.000
...or by their name
>>> @logspace(np.exp, "chainlink")
... def fences(picket, chainlink):
... print("{:.3f} {:.3f}".format(picket, chainlink))
>>> fences(0, 0)
0.000 1.000
You can specify keyword arguments by their index...
>>> @logspace(np.exp, 1)
... def farce(chaos, humour=0):
... print("{:.3f} humour={:.3f}".format(chaos, humour))
>>> farce(0)
0.000 humour=1.000
...or by their name...
>>> @logspace(np.exp, "banana")
... def fruits(orange=0, banana=0):
... print("orange={:.3f} banana={:.3f}".format(orange, banana))
>>> fruits()
orange=0.000 banana=1.000
...and they still come out of logspace when specified positionally.
>>> fruits(1, 1)
orange=1.000 banana=2.718
You can do both at the same time!
>>> @logspace(np.exp, 1, "chicken")
... def farms(dairy, wheat, rice=0, chicken=0):
... print("{:.3f} {:.3f} rice={:.3f} chicken={:.3f}"
... .format(dairy, wheat, rice, chicken))
>>> farms(0, 0)
0.000 1.000 rice=0.000 chicken=1.000
"""
for x in args:
if isinstance(x, int):
assert x >= 0
def wrapper(func):
def wrapped(*fargs, **fkwargs):
spec = inspect.getargspec(func)
newargs = list(fargs)
newkwargs = dict(fkwargs)
for arg in set(args):
if isinstance(arg, int):
assert spec.varargs or arg < len(spec.args)
if arg < len(fargs):
# normal positional argument
newargs[arg] = exp(newargs[arg])
else:
# argument has defaulted
assert arg > len(spec.args) - len(spec.defaults) - 1
default = spec.defaults[len(spec.args) - arg - 1]
newkwargs[spec.args[arg]] = exp(default)
elif isinstance(arg, str):
assert spec.keywords or arg in spec.args
if arg in spec.args:
# arg might be positional or keyword
i = spec.args.index(arg)
if arg in newkwargs:
# arg passed as keyword
newkwargs[arg] = exp(newkwargs[arg])
elif i > len(newargs):
# arg defaulted
assert i > len(spec.args)-len(spec.defaults)
default = spec.defaults[len(spec.args) - i - 1]
newkwargs[spec.args[i]] = exp(default)
else:
newargs[i] = exp(newargs[i])
else:
# arg is keyword-only
newkwargs[arg] = exp(newkwargs[arg])
return func(*newargs, **newkwargs)
return wrapped
return wrapper
def placeholder_inputs(n_train, n_test):
"""Creates placeholder ops for the training and test inputs.
Args:
n_train (int): The number of training inputs.
n_test (int): The number of test inputs.
Returns:
(Tuple[tf.placeholder, tf.placeholder, tf.placeholder]): A tuple whose
first element is the placeholder for training inputs,
whose second element is the placeholder for training outputs
and whose third element is the placeholder for test inputs.
"""
train_in = tf.placeholder(tf.float64, [n_train, 1], name="train_in")
train_out = tf.placeholder(tf.float64, [n_train, 1], name="train_out")
test_in = tf.placeholder(tf.float64, [n_test, 1], name="test_in")
return train_in, train_out, test_in
def fill_feed_dict(train_in, train_out, test_in, train_ul, test_ul, target):
"""Fills the feed dictionary for the training / test inputs.
Takes uniform samples from the given range.
Args:
train_in (tf.placeholder): The training input placeholder.
train_out (tf.placeholder): The training output placeholder.
test_in (tf.placeholder): The test input placeholder.
train_ul (Tuple[float, float]): The lower and upper bounds between which
to sample the training inputs.
test_ul (Tuple[float, float]): The lower and upper bounds between which
to sample the test inputs.
target (Callable[[float], float]): The function to train against.
Returns:
(Dict[tf.placeholder, tf.Tensor]): A dictionary from placeholders to
tensors.
"""
def train_inputs(n):
return np.random.uniform(train_ul[0], train_ul[1], n).reshape(n, 1)
def test_inputs(n):
return np.linspace(test_ul[0], test_ul[1], n).reshape(n, 1)
train_inputs_ = train_inputs(train_in.get_shape()[0])
return { train_in: train_inputs_
, train_out: target(train_inputs_)
, test_in: test_inputs(test_in.get_shape()[0])
}
def eye_matrix(shape, dtype=tf.float64):
"""Constructs the identity matrix of order n.
Args:
n (int): The order of the identity matrix.
Returns:
(tf.Tensor): The identity matrix of order n.
"""
n = tf.minimum(shape[0], shape[1])
identity = tf.diag(tf.ones([n], dtype=dtype), name="identity_matrix")
padrows = tf.to_int32(shape[0]) - n
padcolumns = tf.to_int32(shape[1]) - n
paddings = [[0, padrows], [0, padcolumns]]
return tf.pad(identity, paddings, name="eye_pad")
class GP(object):
"""A Guassian Process."""
#class Cache(object):
# """A cache for storing the inverse and determinant of K_f.
#
# Attributes:
# valid (bool): If `True`, then the values for chol and alpha are
# valid for the current values of the hyperparameters.
# chol (tf.Variable[tf.Tensor[tf.float64]]):
# The cholesky decomposition of K_f.
# alpha (tf.Variable[tf.Tensor[tf.float64]]):
# chol.T \ (chol \ self._y)
#
# """
# def __init__(self, train_in):
# n = train_in.get_shape()[0]
# self.chol = tf.Variable(tf.zeros((n, n), dtype=tf.float64))
# self.alpha = tf.Variable(tf.zeros((n, 1), dtype=tf.float64))
# self.valid = tf.Variable(False, dtype=tf.bool)
# self.all = [self.chol, self.alpha, self.valid]
def __init__(self, train_in, train_out,
ln_l=tf.Variable(0., dtype=tf.float64),
ln_v_f=tf.Variable(0., dtype=tf.float64),
ln_v_n=tf.Variable(0., dtype=tf.float64),
session=tf.Session()):
"""Initialiser.
Args:
train_in (tf.Tensor[tf.float64]): The training inputs.
train_out (tf.Tensor[tf.float64]): The training outputs.
ln_l (tf.Variable[tf.float64]): The natural log of the
length-scale parameter, defaults to a new
tf.Variable[tf.float64] containing 0.
ln_v_f (tf.Variable[tf.float64]): The natural log of the
variance of the signal, defaults to a new
tf.Variable[tf.float64] containing 0.
ln_v_n (tf.Variable[tf.float64]): The natural log of the
variance of the noise in the signal,
defaults to a new tf.Variable[tf.float64] containing 0.
session (tf.Session | None): The session to run ops in. If None,
certain functions will raise. Defaults to tf.Session().
"""
self._X = train_in
self._y = train_out
self._l = ln_l
self._vf = ln_v_f
self._vn = ln_v_n
self._sess = session
# self._cache = self.Cache(self._X)
# self._sess.run(tf.initialize_variables(self._cache.all))
def __enter__(self):
if self._sess:
self._sess.__enter__()
else:
raise ValueError("no session was provided")
return self
def __exit__(self, type_, value, traceback):
if self._sess:
self._sess.__exit__(type_, value, traceback)
else:
raise ValueError("no session was provided")
return self
def close_session(self):
if self._sess:
self._sess.close()
else:
raise ValueError("no session was provided")
@property
def ln_length_scale(self):
if self._sess:
return self._l.eval(self._sess)
else:
return self._l
@ln_length_scale.setter
def ln_length_scale(self, value, feed_dict=None):
if isinstance(value, tf.Tensor):
if self._sess:
self._sess.run(self._l.assign(value), feed_dict=feed_dict)
else:
raise ValueError("cannot set variable with type {}, \
no session was provided".format(type(value)))
elif isinstance(value, tf.Variable):
self._l = value
elif isinstance(value, numbers.Real):
if self._sess:
c = tf.constant(value, dtype=tf.float64)
self._sess.run(self._l.assign(c), feed_dict=feed_dict)
else:
raise ValueError("cannot set variable with type {}, \
no session was provided".format(type(value)))
else:
raise TypeError("Unsupported type {} for property, supported types\
are tensorflow.Tensor, tensorflow.Variable and numbers.Real"
.format(type(value)))
@property
def ln_signal_variance(self):
if self._sess:
return self._vf.eval(self._sess)
else:
return self._vf
@ln_signal_variance.setter
def ln_signal_variance(self, value, feed_dict=None):
if isinstance(value, tf.Tensor):
if self._sess:
self._sess.run(self._vf.assign(value), feed_dict=feed_dict)
else:
raise ValueError("cannot set variable with type {}, \
no session was provided".format(type(value)))
elif isinstance(value, tf.Variable):
self._vf = value
elif isinstance(value, numbers.Real):
if self._sess:
c = tf.constant(value, dtype=tf.float64)
self._sess.run(self._vf.assign(c), feed_dict=feed_dict)
else:
raise ValueError("cannot set variable with type {}, \
no session was provided".format(type(value)))
else:
raise TypeError("Unsupported type {} for property, supported types\
are tensorflow.Tensor, tensorflow.Variable and numbers.Real"
.format(type(value)))
@property
def ln_noise_variance(self):
if self._sess:
return self._vn.eval(self._sess)
else:
return self._vn
@ln_noise_variance.setter
def ln_noise_variance(self, value, feed_dict=None):
if isinstance(value, tf.Tensor):
if self._sess:
self._sess.run(self._vn.assign(value), feed_dict=feed_dict)
else:
raise ValueError("cannot set variable with type {}, \
no session was provided".format(type(value)))
elif isinstance(value, tf.Variable):
self._vn = value
elif isinstance(value, numbers.Real):
if self._sess:
c = tf.constant(value, dtype=tf.float64)
self._sess.run(self._vn.assign(c), feed_dict=feed_dict)
else:
raise ValueError("cannot set variable with type {}, \
no session was provided".format(type(value)))
else:
raise TypeError("Unsupported type {} for property, supported types\
are tensorflow.Tensor, tensorflow.Variable and numbers.Real"
.format(type(value)))
def initialize_hyperparameters(self):
"""Initialises the variables containing the hyperparameters."""
return tf.initialize_variables([self._l, self._vf, self._vn])
def K(self, X, Y):
"""RBF Kernel. Computes the covariance matrix between X and Y.
Args:
X (tf.Tensor[tf.float64]): A rank 2 tensor representing a list of
points from the sample space.
Y (tf.Tensor[tf.float64]): A rank 2 tensor representing a list of
points from the sample space.
Returns:
(tf.Tensor[tf.float64]): The covariance matrix between X and Y.
"""
l = tf.exp(self._l) ; vf = tf.exp(self._vf)
X = tf.expand_dims(X, 1)
Y = tf.expand_dims(Y, 0)
norms = tf.reduce_sum(tf.square(X - Y), 2)
scaled = vf**2 * tf.exp(-(1.0/l**2) * norms)
return scaled
def prior_mean_var(self, inputs):
"""Calculates the prior mean and variance at the test inputs.
Preconditions:
inputs.get_shape()[0] > 0
Args:
inputs (tf.Tensor[tf.float64]): The test inputs.
Returns:
(Tuple[tf.Tensor[tf.float64], tf.Tensor[tf.float64]]): A tuple
containing the prior mean and variance at the test inputs.
"""
# establish preconditions
asst_te_in = tf.Assert\
( tf.greater(inputs.get_shape()[0], 0)
, [inputs.get_shape()]
, name="mean_cov_assert_inputs"
)
with tf.control_dependencies([asst_te_in]):
inputs = tf.identity(inputs)
n = inputs.get_shape()[0]
cov = self.prior_cov(inputs)
return tf.zeros(n, dtype=tf.float64), tf.diag_part(cov)
def prior_cov(self, inputs):
"""Calculates the prior mean and covariance matrix at the test inputs.
Preconditions:
inputs.get_shape()[0] > 0
Args:
inputs (tf.Tensor[tf.float64]): The test inputs.
Returns:
(tf.Tensor[tf.float64]): the covariance matrix variance for the
test inputs.
"""
# establish preconditions
asst_te_in = tf.Assert\
( tf.greater(inputs.get_shape()[0], 0)
, [inputs.get_shape()]
, name="mean_cov_assert_inputs"
)
with tf.control_dependencies([asst_te_in]):
inputs = tf.identity(inputs)
return self.K(inputs, inputs)
def prior_sample(inputs):
"""Generates values from the prior distribution for the given inputs.
Preconditions:
inputs.get_shape()[0] > 0
Args:
inputs (tf.Tensor[tf.float64]): The test inputs.
Returns:
(tf.Tensor[tf.float64]): Values from the prior ditribution for the
given inputs.
"""
n = inputs.get_shape()[0]
mean = tf.zeros(n, dtype=tf.float64)
cov = self.prior_cov(inputs)
dist = tf.contrib.distributions.MultivariateNormal(mu=mean, sigma=cov)
return tf.transpose(dist.sample(1))
#def cached_L_alpha(self):
# """Retrieves L and alpha from the cache if necessary."""
# def calculate():
# vn = tf.exp(self._vn)
# n = self._X.get_shape()[0]
# identity = eye_matrix((n, n))
# K_f = self.K(self._X, self._X) + vn * identity
# L = tf.cholesky(K_f)
# alpha = tf.matrix_solve(tf.transpose(L),tf.matrix_solve(L, self._y))
# asgn_L = self._cache.chol.assign(L)
# asgn_alpha = self._cache.alpha.assign(alpha)
# asgn_valid = self._cache.valid.assign(True)
# with tf.control_dependencies([asgn_L, asgn_alpha, asgn_valid]):
# L = tf.identity(L)
# alpha = tf.identity(alpha)
# return L, alpha
# return tf.cond(self._cache.valid,
# lambda: [self._cache.chol, self._cache.alpha],
# calculate)
def posterior_mean_var(self, points):
"""Calculates the posterior mean and variance at the given points.
Args:
points (tf.Tensor[tf.float64]): Points in the sample space.
Returns:
(Tuple[tf.Tensor[tf.float64], tf.Tensor[tf.float64]]): The
predictive means and variances at the given points.
"""
# establish preconditions
asst_tr_in = tf.Assert\
( tf.greater(self._X.get_shape()[0], 0)
, [self._X.get_shape()]
, name="mean_cov_assert_self._X"
)
asst_tr_out = tf.Assert\
( tf.equal(self._y.get_shape()[0], self._X.get_shape()[0])
, [self._X.get_shape(), self._y.get_shape()]
, name="mean_cov_assert_self._y")
asst_te_in = tf.Assert\
( tf.greater(points.get_shape()[0], 0)
, [points.get_shape()]
, name="mean_cov_assert_point"
)
with tf.control_dependencies([asst_tr_in, asst_tr_out, asst_te_in]):
self._X = tf.identity(self._X)
# L, alpha = self.cached_L_alpha()
vn = tf.exp(self._vn)
n = self._X.get_shape()[0]
identity = eye_matrix((n, n))
K_f = self.K(self._X, self._X) + vn * identity
L = tf.cholesky(K_f)
alpha = tf.matrix_solve(tf.transpose(L), tf.matrix_solve(L, self._y))
K_ft = self.K(self._X, points)
K_t = self.K(points, points)
# K_ft = tf.Print(K_ft, [K_ft], summarize=10)
n = self._X.get_shape()[0]
identity = eye_matrix((n, n))
v = tf.matrix_solve(L, K_ft)# + 0.001*identity, K_ft)
#v = tf.Print(v, [v], "v ")
#subtract_term = tf.matmul(v, v, transpose_a=True)
#subtract_term = tf.Print(subtract_term, [tf.diag_part(subtract_term)], "sub ")
#K_t = tf.Print(K_t, [tf.diag_part(K_t)], "K_t")
mean = tf.squeeze(tf.matmul(K_ft, alpha, transpose_a=True))
var = tf.diag_part(K_t) - tf.reduce_sum(tf.transpose(v**2), 1)# tf.matmul(v, v, transpose_a=True))
asst = tf.Assert(tf.equal(mean.get_shape()[0], var.get_shape()[0]),
[mean.get_shape(), var.get_shape()],
name="mean_var_dim_assert")
with tf.control_dependencies([asst]):
mean = tf.identity(mean)
var = tf.identity(var)
return mean, var
return tf.squeeze(tf.matmul(K_ft, alpha, transpose_a=True)) \
, tf.diag_part(K_t) - tf.reduce_sum(tf.transpose(v**2), 1)# tf.matmul(v, v, transpose_a=True))
#@logspace(tf.exp, "l", "s_n", "s_f")
def nlml(self):
"""Computes the negative log marginal likelihood.
-log(p(self._y|self._X,l,s_n,s_f)) = data_fit+complexity+normaliser
where
n = len(self._X)
K = K(l, s_n, s_f, self._X, self._X)
data_fit = 0.5 * (self._y^T) * (K^-1) * self._y
complexity = 0.5 * log(|K|)
normaliser = (n / 2) * log(2 * pi)
Returns:
(tf.float64) The negative of the log marginal likelihood of the
training outputs given the training inputs and the hyperparameters.
"""
# establish preconditions
asst_tr_in = tf.Assert\
( tf.greater(self._X.get_shape()[0], 0)
, [self._X.get_shape()]
, name="mean_cov_assert_test_in"
)
with tf.control_dependencies([asst_tr_in]):
self._X = tf.identity(self._X)
two_pi = tf.constant(2 * np.pi, dtype=tf.float64)
half = tf.constant(.5, dtype=tf.float64)
#L, alpha = self.cached_L_alpha()
vn = tf.exp(self._vn)
n = self._X.get_shape()[0]
identity = eye_matrix((n, n))
K_f = self.K(self._X, self._X) + vn * identity
L = tf.cholesky(K_f)
alpha = tf.matrix_solve(tf.transpose(L), tf.matrix_solve(L, self._y))
data_fit = half * tf.matmul(self._y, alpha, transpose_a=True)
complexity = tf.reduce_sum(tf.log(tf.diag_part(L)))
n = tf.to_double(self._X.get_shape()[0])
normalisation = half * n * tf.log(two_pi)
return tf.squeeze(data_fit + complexity + normalisation)
def train(self, steps=100, learning_rate=0.0001, feed_dict=None,
print_status=False, session=None):
"""Trains the hyperparameters to best fit the data.
Minimises the negative log marginal likelihood of the parameters
given the observations using a gradient descent optimiser.
Args:
steps (int): The number of training steps to perform.
learning_rate (float): The rate at which to apply gradients.
feed_dict (Dict[tf.Tensor, np.
"""
if not session:
session = self._sess
opt = tf.train.GradientDescentOptimizer(learning_rate=learning_rate)
loss = self.nlml()
opt_op = opt.minimize(loss, var_list=[self._l, self._vf, self._vn])
session.run(self.initialize_hyperparameters(), feed_dict=feed_dict)
for step in range(steps + 1):
if step % (steps // 10) == 0 and print_status:
nlml = session.run(loss, feed_dict=feed_dict)
print("Step {}: nlml={:.3f} l={:.3f}, v_f={:.3f}, v_n={:.3f}"
.format(step, nlml, np.exp(self.ln_length_scale),
np.exp(self.ln_signal_variance),
np.exp(self.ln_noise_variance)))
session.run(opt_op, feed_dict=feed_dict)
if __name__ == '__main__':
# CONSTANTS
N_TRAIN = 1000
N_TEST = 200
TARGET = np.vectorize(lambda x: 2.5*np.sin(x) + np.random.normal(0, 1))
ln_l = tf.Variable(.17, dtype=tf.float64, name="log_l")
ln_v_f = tf.Variable(.2, dtype=tf.float64, name="log_s_f")
ln_v_n = tf.Variable(-1., dtype=tf.float64, name="log_s_n")
train_in, train_out, test_in = placeholder_inputs(N_TRAIN, N_TEST)
feed_dict = fill_feed_dict(train_in, train_out, test_in, (-1, +7), (-10, +7),
TARGET)
with tf.Session() as sess:
gp = GP(train_in, train_out, ln_l, ln_v_f, ln_v_n, sess)
gp.train(print_status=True, steps=50, learning_rate=0.001, feed_dict=feed_dict)
mean, var = sess.run(gp.posterior_mean_var(test_in), feed_dict=feed_dict)
sigma = np.sqrt(var)
_, var_prior = sess.run(gp.prior_mean_var(test_in), feed_dict=feed_dict)
sigma_prior = np.sqrt(var_prior)
#mean = mean.reshape(mean.shape[0])
#sigma = np.sqrt(np.diagonal(cov))
X_star = feed_dict[test_in].reshape(feed_dict[test_in].shape[0])
plt.plot(feed_dict[train_in], feed_dict[train_out], 'bx')
plt.plot(X_star, mean, 'r-')
plt.plot(X_star, 2*sigma_prior, 'g-', X_star, -2*sigma_prior, 'g-',
label="prior")
plt.fill_between(X_star, mean+2*sigma, mean-2*sigma, facecolor='grey')
plt.axis([-10,7,-5,5])
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment